KubeRay API Reference¶
KubeRay integration components for running Ray on Kubernetes. Learn how to use it here.
Client Mode Resources¶
These resources initialize Ray client connection with a remote cluster.
dagster_ray.kuberay.KubeRayInteractiveJob
pydantic-model
¶
Bases: RayResource
, BaseKubeRayResourceConfig
Provides a Ray Job for Dagster steps.
Is the recommended way to run Ray workloads with automatic cluster management. It creates a Ray Job, connects to it in client mode and sets the jobId
field. Cleanup is handled by the KubeRay controller or by the resource lifecycle logic.
Info
Image defaults to dagster/image
run tag.
Tip
Make sure ray[full]
is available in the image.
Fields:
-
timeout
(float
) -
ray_init_options
(dict[str, Any]
) -
data_execution_options
(RayDataExecutionOptions
) -
redis_port
(int
) -
dashboard_port
(int
) -
env_vars
(dict[str, str] | None
) -
enable_tracing
(bool
) -
enable_actor_task_logging
(bool
) -
enable_debug_post_mortem
(bool
) -
enable_legacy_debugger
(bool
) -
_context
(BaseContext | None
)
Attributes¶
lifecycle
class-attribute
instance-attribute
¶
lifecycle: Lifecycle = Field(
default_factory=lambda: Lifecycle(cleanup="on_exception"), description="Actions to perform during resource setup."
)
ray_job
class-attribute
instance-attribute
¶
ray_job: InteractiveRayJobConfig = Field(
default_factory=InteractiveRayJobConfig, description="Configuration for the Kubernetes `RayJob` CR"
)
client
class-attribute
instance-attribute
¶
client: ResourceDependency[RayJobClient] = Field(
default_factory=KubeRayJobClientResource, description="Kubernetes `RayJob` client"
)
log_cluster_conditions
class-attribute
instance-attribute
¶
log_cluster_conditions: bool = Field(
default=True,
description="Whether to log `RayCluster` conditions while waiting for the RayCluster to become ready. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/cluster/kubernetes/user-guides/observability.html#raycluster-status-conditions).",
)
dagster_ray.kuberay.KubeRayCluster
pydantic-model
¶
Bases: BaseKubeRayResourceConfig
, RayResource
Provides a Ray Cluster for Dagster steps.
It is advised to use KubeRayInteractiveJob
with KubeRay >= 1.3.0 instead.
Info
Image defaults to dagster/image
run tag.
Tip
Make sure ray[full]
is available in the image.
Fields:
-
timeout
(float
) -
ray_init_options
(dict[str, Any]
) -
data_execution_options
(RayDataExecutionOptions
) -
redis_port
(int
) -
dashboard_port
(int
) -
env_vars
(dict[str, str] | None
) -
enable_tracing
(bool
) -
enable_actor_task_logging
(bool
) -
enable_debug_post_mortem
(bool
) -
enable_legacy_debugger
(bool
) -
_context
(BaseContext | None
)
Attributes¶
lifecycle
class-attribute
instance-attribute
¶
lifecycle: Lifecycle = Field(
default_factory=lambda: Lifecycle(cleanup="always"), description="Actions to perform during resource setup."
)
ray_cluster
class-attribute
instance-attribute
¶
ray_cluster: RayClusterConfig = Field(
default_factory=RayClusterConfig, description="Kubernetes `RayCluster` CR configuration."
)
client
class-attribute
instance-attribute
¶
client: ResourceDependency[RayClusterClient] = Field(
default_factory=KubeRayClusterClientResource, description="Kubernetes `RayCluster` client"
)
log_cluster_conditions
class-attribute
instance-attribute
¶
log_cluster_conditions: bool = Field(
default=True,
description="Whether to log RayCluster conditions while waiting for the RayCluster to become ready. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/cluster/kubernetes/user-guides/observability.html#raycluster-status-conditions).",
)
Pipes¶
dagster_ray.kuberay.PipesKubeRayJobClient ¶
PipesKubeRayJobClient(
client: RayJobClient | None = None,
context_injector: PipesContextInjector | None = None,
message_reader: PipesMessageReader | None = None,
forward_termination: bool = True,
timeout: float = 600,
poll_interval: float = 1,
port_forward: bool = False,
)
Bases: PipesClient
, TreatAsResourceParam
A pipes client for running RayJob
on Kubernetes.
Parameters:
-
context_injector
(Optional[PipesContextInjector]
, default:None
) –A context injector to use to inject context into the
RayJob
. Defaults toPipesEnvContextInjector
. -
message_reader
(Optional[PipesMessageReader]
, default:None
) –A message reader to use to read messages from the glue job run. Defaults to
PipesRayJobMessageReader
. -
client
(Optional[client]
, default:None
) –The Kubernetes API client.
-
forward_termination
(bool
, default:True
) –Whether to terminate the Ray job when the Dagster process receives a termination signal, or if the startup timeout is reached. Defaults to
True
. -
timeout
(int
, default:600
) –Timeout for various internal interactions with the Kubernetes RayJob.
-
poll_interval
(int
, default:1
) –Interval at which to poll Kubernetes for status updates.
-
port_forward
(bool
, default:False
) –Whether to use Kubernetes port-forwarding to connect to the KubeRay cluster. Is useful when running in a local environment.
Info
Image defaults to dagster/image
run tag.
Tip
Make sure ray[full]
is available in the image.
Source code in src/dagster_ray/kuberay/pipes.py
Functions¶
run ¶
run(
*, context: OpOrAssetExecutionContext, ray_job: dict[str, Any], extras: PipesExtras | None = None
) -> PipesClientCompletedInvocation
Execute a RayJob, enriched with the Pipes protocol.
Parameters:
-
context
(OpExecutionContext
) –Current Dagster op or asset context.
-
ray_job
(Dict[str, Any]
) –RayJob specification.
API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>
_. -
extras
(Optional[Dict[str, Any]]
, default:None
) –Additional information to pass to the Pipes session.
Source code in src/dagster_ray/kuberay/pipes.py
Configuration and Types¶
dagster_ray.kuberay.configs.RayJobConfig
pydantic-model
¶
Bases: Config
Attributes¶
metadata
class-attribute
instance-attribute
¶
metadata: dict[str, Any] = Field(
default_factory=dict,
description="Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
)
Functions¶
to_k8s ¶
to_k8s(
context: AnyDagsterContext,
image: str | None = None,
labels: Mapping[str, str] | None = None,
annotations: Mapping[str, str] | None = None,
env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]
Convert into Kubernetes manifests in camelCase format and inject additional information
Source code in src/dagster_ray/kuberay/configs.py
dagster_ray.kuberay.configs.RayJobSpec ¶
Bases: PermissiveConfig
RayJob spec configuration options. A few sensible defaults are provided for convenience.
Attributes¶
ray_cluster_spec
class-attribute
instance-attribute
¶
ray_cluster_spec: RayClusterSpec | None = Field(default_factory=RayClusterSpec)
Functions¶
to_k8s ¶
to_k8s(
context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]
Convert into Kubernetes manifests in camelCase format and inject additional information
Source code in src/dagster_ray/kuberay/configs.py
dagster_ray.kuberay.resources.rayjob.InteractiveRayJobConfig
pydantic-model
¶
Bases: RayJobConfig
Same as RayJobConfig
, but spec.submission_mode
mode has to be InteractiveMode
Attributes¶
metadata
class-attribute
instance-attribute
¶
metadata: dict[str, Any] = Field(
default_factory=dict,
description="Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
)
spec
class-attribute
instance-attribute
¶
spec: InteractiveRayJobSpec = Field(default_factory=InteractiveRayJobSpec)
Functions¶
to_k8s ¶
to_k8s(
context: AnyDagsterContext,
image: str | None = None,
labels: Mapping[str, str] | None = None,
annotations: Mapping[str, str] | None = None,
env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]
Convert into Kubernetes manifests in camelCase format and inject additional information
Source code in src/dagster_ray/kuberay/configs.py
dagster_ray.kuberay.resources.rayjob.InteractiveRayJobSpec ¶
Bases: RayJobSpec
Same as RayJobSpec
, but mode
has to be InteractiveMode
Attributes¶
submission_mode
class-attribute
instance-attribute
¶
submission_mode: Literal['InteractiveMode'] = 'InteractiveMode'
active_deadline_seconds
class-attribute
instance-attribute
¶
active_deadline_seconds: int = 60 * 60 * 24
ray_cluster_spec
class-attribute
instance-attribute
¶
ray_cluster_spec: RayClusterSpec | None = Field(default_factory=RayClusterSpec)
submitter_pod_template
class-attribute
instance-attribute
¶
cluster_selector
class-attribute
instance-attribute
¶
deletion_strategy
class-attribute
instance-attribute
¶
deletion_strategy: dict[str, Any] | None = Field(
default_factory=lambda: {"onFailure": {"policy": "DeleteCluster"}, "onSuccess": {"policy": "DeleteCluster"}}
)
ttl_seconds_after_finished
class-attribute
instance-attribute
¶
ttl_seconds_after_finished: int | None = 5 * 60
shutdown_after_job_finishes
class-attribute
instance-attribute
¶
shutdown_after_job_finishes: bool = True
Functions¶
to_k8s ¶
to_k8s(
context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]
Convert into Kubernetes manifests in camelCase format and inject additional information
Source code in src/dagster_ray/kuberay/configs.py
dagster_ray.kuberay.configs.RayClusterConfig
pydantic-model
¶
Bases: Config
Attributes¶
metadata
class-attribute
instance-attribute
¶
metadata: dict[str, Any] = Field(
default_factory=dict,
description="Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
)
spec
class-attribute
instance-attribute
¶
spec: RayClusterSpec = Field(default_factory=RayClusterSpec)
Functions¶
to_k8s ¶
to_k8s(
context: AnyDagsterContext,
image: str | None = None,
labels: Mapping[str, str] | None = None,
annotations: Mapping[str, str] | None = None,
env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/configs.py
dagster_ray.kuberay.configs.RayClusterSpec ¶
Bases: PermissiveConfig
RayCluster spec configuration options. A few sensible defaults are provided for convenience.
Attributes¶
autoscaler_options
class-attribute
instance-attribute
¶
head_service_annotations
class-attribute
instance-attribute
¶
enable_in_tree_autoscaling
class-attribute
instance-attribute
¶
enable_in_tree_autoscaling: bool = False
gcs_fault_tolerance_options
class-attribute
instance-attribute
¶
head_group_spec
class-attribute
instance-attribute
¶
worker_group_specs
class-attribute
instance-attribute
¶
Functions¶
to_k8s ¶
to_k8s(
context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]
Convert into Kubernetes manifests in camelCase format and inject additional information
Source code in src/dagster_ray/kuberay/configs.py
--
dagster_ray.kuberay.resources.base.BaseKubeRayResourceConfig
pydantic-model
¶
Bases: Config
Attributes¶
image
class-attribute
instance-attribute
¶
image: str | None = Field(
default=None,
description="Image to inject into the `RayCluster` spec. Defaults to `dagster/image` run tag. Images already provided in the `RayCluster` spec won't be overridden.",
)
Resources¶
dagster_ray.kuberay.KubeRayJobClientResource
pydantic-model
¶
Bases: ConfigurableResource[RayJobClient]
This configurable resource provides a dagster_ray.kuberay.client.RayJobClient
.
dagster_ray.kuberay.KubeRayClusterClientResource
pydantic-model
¶
Bases: ConfigurableResource[RayClusterClient]
This configurable resource provides a dagster_ray.kuberay.client.RayClusterClient
.
Kubernetes API Clients¶
dagster_ray.kuberay.client.RayClusterClient ¶
RayClusterClient(kube_config: str | None = None, kube_context: str | None = None, api_client: ApiClient | None = None)
Bases: BaseKubeRayClient[RayClusterStatus]
Source code in src/dagster_ray/kuberay/client/raycluster/client.py
Functions¶
create ¶
Source code in src/dagster_ray/kuberay/client/base.py
delete ¶
get ¶
Source code in src/dagster_ray/kuberay/client/base.py
list ¶
Source code in src/dagster_ray/kuberay/client/base.py
update ¶
dagster_ray.kuberay.client.RayJobClient ¶
RayJobClient(kube_config: str | None = None, kube_context: str | None = None, api_client: ApiClient | None = None)
Bases: BaseKubeRayClient[RayJobStatus]