KubeRay API Reference¶
KubeRay integration components for running Ray on Kubernetes. Learn how to use it here.
Client Mode Resources¶
These resources initialize Ray client connection with a remote cluster.
dagster_ray.kuberay.KubeRayInteractiveJob
pydantic-model
¶
Bases: BaseKubeRayResource
Provides a Ray Job for Dagster steps.
Is the recommended way to run Ray workloads with automatic cluster management. It creates a Ray Job, connects to it in client mode and sets the jobId field. Cleanup is handled by the KubeRay controller or by the resource lifecycle logic.
Info
Image defaults to dagster/image run tag.
Tip
Make sure ray[full] is available in the image.
Show JSON schema:
{
"$defs": {
"ExecutionOptionsConfig": {
"properties": {
"cpu": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Cpu"
},
"gpu": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Gpu"
},
"object_store_memory": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Object Store Memory"
}
},
"title": "ExecutionOptionsConfig",
"type": "object"
},
"InteractiveRayJobConfig": {
"description": "Same as [`RayJobConfig`][dagster_ray.kuberay.configs.RayJobConfig], but `spec.submission_mode` mode has to be `InteractiveMode`",
"properties": {
"kind": {
"default": "RayJob",
"title": "Kind",
"type": "string"
},
"api_version": {
"default": "ray.io/v1",
"title": "Api Version",
"type": "string"
},
"metadata": {
"description": "Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
"title": "Metadata",
"type": "object"
},
"spec": {
"$ref": "#/$defs/InteractiveRayJobSpec"
}
},
"title": "InteractiveRayJobConfig",
"type": "object"
},
"InteractiveRayJobSpec": {
"additionalProperties": true,
"description": "Same as [`RayJobSpec`][dagster_ray.kuberay.configs.RayJobSpec], but `mode` has to be `InteractiveMode`",
"properties": {
"active_deadline_seconds": {
"default": 86400,
"title": "Active Deadline Seconds",
"type": "integer"
},
"backoff_limit": {
"default": 0,
"title": "Backoff Limit",
"type": "integer"
},
"ray_cluster_spec": {
"anyOf": [
{
"$ref": "#/$defs/RayClusterSpec"
},
{
"type": "null"
}
]
},
"submitter_pod_template": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Submitter Pod Template"
},
"metadata": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Metadata"
},
"cluster_selector": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Cluster Selector"
},
"managed_by": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Managed By"
},
"deletion_strategy": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"title": "Deletion Strategy"
},
"runtime_env_yaml": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Runtime Env Yaml"
},
"job_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Job Id"
},
"submission_mode": {
"const": "InteractiveMode",
"default": "InteractiveMode",
"title": "Submission Mode",
"type": "string"
},
"entrypoint_resources": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Entrypoint Resources"
},
"entrypoint_num_cpus": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"title": "Entrypoint Num Cpus"
},
"entrypoint_memory": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"title": "Entrypoint Memory"
},
"entrypoint_num_gpus": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"title": "Entrypoint Num Gpus"
},
"ttl_seconds_after_finished": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": 300,
"title": "Ttl Seconds After Finished"
},
"shutdown_after_job_finishes": {
"default": true,
"title": "Shutdown After Job Finishes",
"type": "boolean"
},
"suspend": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"title": "Suspend"
}
},
"title": "InteractiveRayJobSpec",
"type": "object"
},
"Lifecycle": {
"properties": {
"create": {
"default": true,
"description": "Whether to create the resource. If set to `False`, the user can manually call `.create` instead.",
"title": "Create",
"type": "boolean"
},
"wait": {
"default": true,
"description": "Whether to wait for the remote Ray cluster to become ready to accept connections. If set to `False`, the user can manually call `.wait` instead.",
"title": "Wait",
"type": "boolean"
},
"connect": {
"default": true,
"description": "Whether to run `ray.init` against the remote Ray cluster. If set to `False`, the user can manually call `.connect` instead.",
"title": "Connect",
"type": "boolean"
},
"cleanup": {
"default": "always",
"description": "Resource cleanup policy. Determines when the resource should be deleted after Dagster step execution or during interruption.",
"enum": [
"never",
"always",
"on_exception"
],
"title": "Cleanup",
"type": "string"
}
},
"title": "Lifecycle",
"type": "object"
},
"RayClusterSpec": {
"additionalProperties": true,
"description": "[RayCluster spec](https://ray-project.github.io/kuberay/reference/api/#rayclusterspec) configuration options. A few sensible defaults are provided for convenience.",
"properties": {
"suspend": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"title": "Suspend"
},
"managed_by": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Managed By"
},
"autoscaler_options": {
"default": {
"upscalingMode": "Default",
"idleTimeoutSeconds": 60,
"env": [],
"envFrom": [],
"resources": {
"limits": {
"cpu": "50m",
"memory": "0.1Gi"
},
"requests": {
"cpu": "50m",
"memory": "0.1Gi"
}
}
},
"title": "Autoscaler Options",
"type": "object"
},
"head_service_annotations": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Head Service Annotations"
},
"enable_in_tree_autoscaling": {
"default": false,
"title": "Enable In Tree Autoscaling",
"type": "boolean"
},
"gcs_fault_tolerance_options": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Gcs Fault Tolerance Options"
},
"head_group_spec": {
"default": {
"serviceType": "ClusterIP",
"rayStartParams": {},
"metadata": {
"annotations": {},
"labels": {}
},
"template": {
"spec": {
"affinity": {},
"containers": [
{
"imagePullPolicy": "Always",
"name": "head",
"volumeMounts": [
{
"mountPath": "/tmp/ray",
"name": "ray-logs"
}
]
}
],
"imagePullSecrets": [],
"nodeSelector": {},
"tolerations": [],
"volumes": [
{
"emptyDir": {},
"name": "ray-logs"
}
]
}
}
},
"title": "Head Group Spec",
"type": "object"
},
"ray_version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Ray Version"
},
"worker_group_specs": {
"default": [
{
"groupName": "workers",
"replicas": 0,
"minReplicas": 0,
"maxReplicas": 1,
"rayStartParams": {},
"template": {
"metadata": {
"annotations": {},
"labels": {}
},
"spec": {
"affinity": {},
"containers": [
{
"imagePullPolicy": "Always",
"name": "worker",
"volumeMounts": [
{
"mountPath": "/tmp/ray",
"name": "ray-logs"
}
]
}
],
"imagePullSecrets": [],
"nodeSelector": {},
"tolerations": [],
"volumes": [
{
"emptyDir": {},
"name": "ray-logs"
}
]
}
}
}
],
"items": {
"type": "object"
},
"title": "Worker Group Specs",
"type": "array"
}
},
"title": "RayClusterSpec",
"type": "object"
},
"RayDataExecutionOptions": {
"properties": {
"execution_options": {
"$ref": "#/$defs/ExecutionOptionsConfig"
},
"cpu_limit": {
"default": 5000,
"title": "Cpu Limit",
"type": "integer"
},
"gpu_limit": {
"default": 0,
"title": "Gpu Limit",
"type": "integer"
},
"verbose_progress": {
"default": true,
"title": "Verbose Progress",
"type": "boolean"
},
"use_polars": {
"default": true,
"title": "Use Polars",
"type": "boolean"
}
},
"title": "RayDataExecutionOptions",
"type": "object"
}
},
"description": "Provides a Ray Job for Dagster steps.\n\nIs the recommended way to run Ray workloads with automatic cluster management. It creates a Ray Job, connects to it in client mode and sets the `jobId` field. Cleanup is handled by the KubeRay controller or by the resource lifecycle logic.\n\nInfo:\n Image defaults to `dagster/image` run tag.\n\nTip:\n Make sure `ray[full]` is available in the image.",
"properties": {
"lifecycle": {
"$ref": "#/$defs/Lifecycle",
"description": "Actions to perform during resource setup."
},
"timeout": {
"default": 600.0,
"description": "Timeout for Ray readiness in seconds",
"title": "Timeout",
"type": "number"
},
"ray_init_options": {
"description": "Additional keyword arguments to pass to `ray.init()` call, such as `runtime_env`, `num_cpus`, etc. Dagster's `EnvVar` is supported. More details in [Ray docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).",
"title": "Ray Init Options",
"type": "object"
},
"data_execution_options": {
"$ref": "#/$defs/RayDataExecutionOptions"
},
"redis_port": {
"default": 10001,
"description": "Redis port for connection. Make sure to match with the actual available port.",
"title": "Redis Port",
"type": "integer"
},
"dashboard_port": {
"default": 8265,
"description": "Dashboard port for connection. Make sure to match with the actual available port.",
"title": "Dashboard Port",
"type": "integer"
},
"env_vars": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"description": "Environment variables to pass to the Ray cluster.",
"title": "Env Vars"
},
"enable_tracing": {
"default": false,
"description": "Enable tracing: inject `RAY_PROFILING=1` and `RAY_task_events_report_interval_ms=0` into the Ray cluster configuration. This allows using `ray.timeline()` to fetch recorded task events. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.timeline.html#ray-timeline)",
"title": "Enable Tracing",
"type": "boolean"
},
"enable_actor_task_logging": {
"default": false,
"description": "Enable actor task logging: inject `RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1` into the Ray cluster configuration.",
"title": "Enable Actor Task Logging",
"type": "boolean"
},
"enable_debug_post_mortem": {
"default": false,
"description": "Enable post-mortem debugging: inject `RAY_DEBUG_POST_MORTEM=1` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/ray-distributed-debugger.html)",
"title": "Enable Debug Post Mortem",
"type": "boolean"
},
"enable_legacy_debugger": {
"default": false,
"description": "Enable legacy debugger: inject `RAY_DEBUG=legacy` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/user-guides/debug-apps/ray-debugging.html#using-the-ray-debugger)",
"title": "Enable Legacy Debugger",
"type": "boolean"
},
"image": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Image to inject into the `RayCluster` spec. Defaults to `dagster/image` run tag. Images already provided in the `RayCluster` spec won't be overridden.",
"title": "Image"
},
"deployment_name": {
"default": "dev",
"description": "Dagster deployment name. Is used as a prefix for the Kubernetes resource name. Dagster Cloud variables are used to determine the default value.",
"title": "Deployment Name",
"type": "string"
},
"failure_tolerance_timeout": {
"default": 0.0,
"description": "The period in seconds to wait for the cluster to transition out of `failed` state if it reaches it. This state can be transient under certain conditions. With the default value of 0, the first `failed` state appearance will raise an exception immediately.",
"title": "Failure Tolerance Timeout",
"type": "number"
},
"poll_interval": {
"default": 1.0,
"description": "Poll interval for various API requests",
"title": "Poll Interval",
"type": "number"
},
"ray_job": {
"$ref": "#/$defs/InteractiveRayJobConfig",
"description": "Configuration for the Kubernetes `RayJob` CR"
},
"client": {
"description": "Kubernetes `RayJob` client",
"title": "Client"
},
"log_cluster_conditions": {
"default": true,
"description": "Whether to log `RayCluster` conditions while waiting for the RayCluster to become ready. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/cluster/kubernetes/user-guides/observability.html#raycluster-status-conditions).",
"title": "Log Cluster Conditions",
"type": "boolean"
}
},
"title": "KubeRayInteractiveJob",
"type": "object"
}
Fields:
-
timeout(float) -
ray_init_options(dict[str, Any]) -
data_execution_options(RayDataExecutionOptions) -
redis_port(int) -
dashboard_port(int) -
env_vars(dict[str, str] | None) -
enable_tracing(bool) -
enable_actor_task_logging(bool) -
enable_debug_post_mortem(bool) -
enable_legacy_debugger(bool) -
_context(BaseContext | None) -
_creation_verb(str) -
image(str | None) -
deployment_name(str) -
failure_tolerance_timeout(float) -
poll_interval(float) -
lifecycle(Lifecycle) -
ray_job(InteractiveRayJobConfig) -
client(ResourceDependency[RayJobClient]) -
log_cluster_conditions(bool) -
_name(str) -
_cluster_name(str) -
_host(str)
Attributes¶
ray_job
pydantic-field
¶
ray_job: InteractiveRayJobConfig
Configuration for the Kubernetes RayJob CR
failure_tolerance_timeout
pydantic-field
¶
failure_tolerance_timeout: float = 0.0
The period in seconds to wait for the cluster to transition out of failed state if it reaches it. This state can be transient under certain conditions. With the default value of 0, the first failed state appearance will raise an exception immediately.
log_cluster_conditions
pydantic-field
¶
log_cluster_conditions: bool = True
Whether to log RayCluster conditions while waiting for the RayCluster to become ready. Learn more: KubeRay docs.
dagster_ray.kuberay.KubeRayCluster
pydantic-model
¶
Bases: BaseKubeRayResource
Provides a Ray Cluster for Dagster steps.
It is advised to use KubeRayInteractiveJob with KubeRay >= 1.3.0 instead.
Info
Image defaults to dagster/image run tag.
Tip
Make sure ray[full] is available in the image.
Show JSON schema:
{
"$defs": {
"ClusterSharing": {
"description": "Defines the strategy for sharing `RayCluster` resources with other Dagster steps.\n\nBy default, the cluster is expected to be created by Dagster during one of the previously executed steps.",
"properties": {
"enabled": {
"default": false,
"description": "Whether to enable sharing of RayClusters.",
"title": "Enabled",
"type": "boolean"
},
"match_dagster_labels": {
"$ref": "#/$defs/MatchDagsterLabels",
"description": "Configuration for matching on Dagster-generated labels."
},
"match_labels": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"description": "Additional user-provided labels to match on.",
"title": "Match Labels"
},
"ttl_seconds": {
"default": 1800.0,
"description": "Time to live for the lock placed on the `RayCluster` resource, marking it as in use by the current Dagster step.",
"title": "Ttl Seconds",
"type": "number"
}
},
"title": "ClusterSharing",
"type": "object"
},
"ExecutionOptionsConfig": {
"properties": {
"cpu": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Cpu"
},
"gpu": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Gpu"
},
"object_store_memory": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Object Store Memory"
}
},
"title": "ExecutionOptionsConfig",
"type": "object"
},
"Lifecycle": {
"properties": {
"create": {
"default": true,
"description": "Whether to create the resource. If set to `False`, the user can manually call `.create` instead.",
"title": "Create",
"type": "boolean"
},
"wait": {
"default": true,
"description": "Whether to wait for the remote Ray cluster to become ready to accept connections. If set to `False`, the user can manually call `.wait` instead.",
"title": "Wait",
"type": "boolean"
},
"connect": {
"default": true,
"description": "Whether to run `ray.init` against the remote Ray cluster. If set to `False`, the user can manually call `.connect` instead.",
"title": "Connect",
"type": "boolean"
},
"cleanup": {
"default": "always",
"description": "Resource cleanup policy. Determines when the resource should be deleted after Dagster step execution or during interruption.",
"enum": [
"never",
"always",
"on_exception"
],
"title": "Cleanup",
"type": "string"
}
},
"title": "Lifecycle",
"type": "object"
},
"MatchDagsterLabels": {
"properties": {
"cluster_sharing": {
"default": true,
"description": "Whether to match on `dagster/cluster-sharing=true` label.",
"title": "Cluster Sharing",
"type": "boolean"
},
"code_location": {
"default": true,
"description": "Whether to match on `dagster/code-location` label. The value will be taken from the current Dagster code location.",
"title": "Code Location",
"type": "boolean"
},
"resource_key": {
"default": true,
"description": "Whether to match on `dagster/resource-key` label. The value will be taken from the current Dagster resource key.",
"title": "Resource Key",
"type": "boolean"
},
"git_sha": {
"default": true,
"description": "Whether to match on `dagster/git-sha` label. The value will be taken from `DAGSTER_CLOUD_GIT_SHA` environment variable.",
"title": "Git Sha",
"type": "boolean"
},
"run_id": {
"default": false,
"description": "Whether to match on `dagster/run-id` label. The value will be taken from the current Dagster run ID.",
"title": "Run Id",
"type": "boolean"
}
},
"title": "MatchDagsterLabels",
"type": "object"
},
"RayClusterConfig": {
"properties": {
"kind": {
"default": "RayCluster",
"title": "Kind",
"type": "string"
},
"api_version": {
"default": "ray.io/v1",
"title": "Api Version",
"type": "string"
},
"metadata": {
"description": "Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
"title": "Metadata",
"type": "object"
},
"spec": {
"$ref": "#/$defs/RayClusterSpec"
}
},
"title": "RayClusterConfig",
"type": "object"
},
"RayClusterSpec": {
"additionalProperties": true,
"description": "[RayCluster spec](https://ray-project.github.io/kuberay/reference/api/#rayclusterspec) configuration options. A few sensible defaults are provided for convenience.",
"properties": {
"suspend": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"title": "Suspend"
},
"managed_by": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Managed By"
},
"autoscaler_options": {
"default": {
"upscalingMode": "Default",
"idleTimeoutSeconds": 60,
"env": [],
"envFrom": [],
"resources": {
"limits": {
"cpu": "50m",
"memory": "0.1Gi"
},
"requests": {
"cpu": "50m",
"memory": "0.1Gi"
}
}
},
"title": "Autoscaler Options",
"type": "object"
},
"head_service_annotations": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Head Service Annotations"
},
"enable_in_tree_autoscaling": {
"default": false,
"title": "Enable In Tree Autoscaling",
"type": "boolean"
},
"gcs_fault_tolerance_options": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Gcs Fault Tolerance Options"
},
"head_group_spec": {
"default": {
"serviceType": "ClusterIP",
"rayStartParams": {},
"metadata": {
"annotations": {},
"labels": {}
},
"template": {
"spec": {
"affinity": {},
"containers": [
{
"imagePullPolicy": "Always",
"name": "head",
"volumeMounts": [
{
"mountPath": "/tmp/ray",
"name": "ray-logs"
}
]
}
],
"imagePullSecrets": [],
"nodeSelector": {},
"tolerations": [],
"volumes": [
{
"emptyDir": {},
"name": "ray-logs"
}
]
}
}
},
"title": "Head Group Spec",
"type": "object"
},
"ray_version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Ray Version"
},
"worker_group_specs": {
"default": [
{
"groupName": "workers",
"replicas": 0,
"minReplicas": 0,
"maxReplicas": 1,
"rayStartParams": {},
"template": {
"metadata": {
"annotations": {},
"labels": {}
},
"spec": {
"affinity": {},
"containers": [
{
"imagePullPolicy": "Always",
"name": "worker",
"volumeMounts": [
{
"mountPath": "/tmp/ray",
"name": "ray-logs"
}
]
}
],
"imagePullSecrets": [],
"nodeSelector": {},
"tolerations": [],
"volumes": [
{
"emptyDir": {},
"name": "ray-logs"
}
]
}
}
}
],
"items": {
"type": "object"
},
"title": "Worker Group Specs",
"type": "array"
}
},
"title": "RayClusterSpec",
"type": "object"
},
"RayDataExecutionOptions": {
"properties": {
"execution_options": {
"$ref": "#/$defs/ExecutionOptionsConfig"
},
"cpu_limit": {
"default": 5000,
"title": "Cpu Limit",
"type": "integer"
},
"gpu_limit": {
"default": 0,
"title": "Gpu Limit",
"type": "integer"
},
"verbose_progress": {
"default": true,
"title": "Verbose Progress",
"type": "boolean"
},
"use_polars": {
"default": true,
"title": "Use Polars",
"type": "boolean"
}
},
"title": "RayDataExecutionOptions",
"type": "object"
}
},
"description": "Provides a Ray Cluster for Dagster steps.\n\nIt is advised to use [`KubeRayInteractiveJob`][dagster_ray.kuberay.resources.KubeRayInteractiveJob] with KubeRay >= 1.3.0 instead.\n\nInfo:\n Image defaults to `dagster/image` run tag.\n\nTip:\n Make sure `ray[full]` is available in the image.",
"properties": {
"lifecycle": {
"$ref": "#/$defs/Lifecycle",
"description": "Actions to perform during resource setup."
},
"timeout": {
"default": 600.0,
"description": "Timeout for Ray readiness in seconds",
"title": "Timeout",
"type": "number"
},
"ray_init_options": {
"description": "Additional keyword arguments to pass to `ray.init()` call, such as `runtime_env`, `num_cpus`, etc. Dagster's `EnvVar` is supported. More details in [Ray docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).",
"title": "Ray Init Options",
"type": "object"
},
"data_execution_options": {
"$ref": "#/$defs/RayDataExecutionOptions"
},
"redis_port": {
"default": 10001,
"description": "Redis port for connection. Make sure to match with the actual available port.",
"title": "Redis Port",
"type": "integer"
},
"dashboard_port": {
"default": 8265,
"description": "Dashboard port for connection. Make sure to match with the actual available port.",
"title": "Dashboard Port",
"type": "integer"
},
"env_vars": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"description": "Environment variables to pass to the Ray cluster.",
"title": "Env Vars"
},
"enable_tracing": {
"default": false,
"description": "Enable tracing: inject `RAY_PROFILING=1` and `RAY_task_events_report_interval_ms=0` into the Ray cluster configuration. This allows using `ray.timeline()` to fetch recorded task events. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.timeline.html#ray-timeline)",
"title": "Enable Tracing",
"type": "boolean"
},
"enable_actor_task_logging": {
"default": false,
"description": "Enable actor task logging: inject `RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1` into the Ray cluster configuration.",
"title": "Enable Actor Task Logging",
"type": "boolean"
},
"enable_debug_post_mortem": {
"default": false,
"description": "Enable post-mortem debugging: inject `RAY_DEBUG_POST_MORTEM=1` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/ray-distributed-debugger.html)",
"title": "Enable Debug Post Mortem",
"type": "boolean"
},
"enable_legacy_debugger": {
"default": false,
"description": "Enable legacy debugger: inject `RAY_DEBUG=legacy` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/user-guides/debug-apps/ray-debugging.html#using-the-ray-debugger)",
"title": "Enable Legacy Debugger",
"type": "boolean"
},
"image": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Image to inject into the `RayCluster` spec. Defaults to `dagster/image` run tag. Images already provided in the `RayCluster` spec won't be overridden.",
"title": "Image"
},
"deployment_name": {
"default": "dev",
"description": "Dagster deployment name. Is used as a prefix for the Kubernetes resource name. Dagster Cloud variables are used to determine the default value.",
"title": "Deployment Name",
"type": "string"
},
"failure_tolerance_timeout": {
"default": 0.0,
"description": "The period in seconds to wait for the cluster to transition out of `failed` state if it reaches it. This state can be transient under certain conditions. With the default value of 0, the first `failed` state appearance will raise an exception immediately.",
"title": "Failure Tolerance Timeout",
"type": "number"
},
"poll_interval": {
"default": 1.0,
"description": "Poll interval for various API requests",
"title": "Poll Interval",
"type": "number"
},
"cluster_sharing": {
"$ref": "#/$defs/ClusterSharing",
"description": "Configuration for sharing the `RayCluster` across Dagster steps. Existing clusters matching this configuration will be reused without recreating them. A `dagster/sharing=true` label will be applied to the `RayCluster`, and a `dagster/lock-<run-id>-<step-id>=<lock>` annotation will be placed on the `RayCluster` to mark it as being used by this step. Cleanup will only proceed if the `RayCluster` is not being used by any other steps, therefore cluster sharing should be used in conjunction with [dagster_ray.kuberay.sensors.cleanup_expired_kuberay_clusters][] sensor."
},
"ray_cluster": {
"$ref": "#/$defs/RayClusterConfig",
"description": "Kubernetes `RayCluster` CR configuration."
},
"client": {
"description": "Kubernetes `RayCluster` client",
"title": "Client"
},
"log_cluster_conditions": {
"default": true,
"description": "Whether to log RayCluster conditions while waiting for the RayCluster to become ready. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/cluster/kubernetes/user-guides/observability.html#raycluster-status-conditions).",
"title": "Log Cluster Conditions",
"type": "boolean"
}
},
"title": "KubeRayCluster",
"type": "object"
}
Fields:
-
timeout(float) -
ray_init_options(dict[str, Any]) -
data_execution_options(RayDataExecutionOptions) -
redis_port(int) -
dashboard_port(int) -
env_vars(dict[str, str] | None) -
enable_tracing(bool) -
enable_actor_task_logging(bool) -
enable_debug_post_mortem(bool) -
enable_legacy_debugger(bool) -
_context(BaseContext | None) -
_creation_verb(str) -
image(str | None) -
deployment_name(str) -
failure_tolerance_timeout(float) -
poll_interval(float) -
lifecycle(Lifecycle) -
cluster_sharing(ClusterSharing) -
ray_cluster(RayClusterConfig) -
client(ResourceDependency[RayClusterClient]) -
log_cluster_conditions(bool) -
_name(str) -
_host(str)
Attributes¶
cluster_sharing
pydantic-field
¶
cluster_sharing: ClusterSharing
Configuration for sharing the RayCluster across Dagster steps. Existing clusters matching this configuration will be reused without recreating them. A dagster/sharing=true label will be applied to the RayCluster, and a dagster/lock-<run-id>-<step-id>=<lock> annotation will be placed on the RayCluster to mark it as being used by this step. Cleanup will only proceed if the RayCluster is not being used by any other steps, therefore cluster sharing should be used in conjunction with dagster_ray.kuberay.sensors.cleanup_expired_kuberay_clusters sensor.
failure_tolerance_timeout
pydantic-field
¶
failure_tolerance_timeout: float = 0.0
The period in seconds to wait for the cluster to transition out of failed state if it reaches it. This state can be transient under certain conditions. With the default value of 0, the first failed state appearance will raise an exception immediately.
log_cluster_conditions
pydantic-field
¶
log_cluster_conditions: bool = True
Whether to log RayCluster conditions while waiting for the RayCluster to become ready. Learn more: KubeRay docs.
Pipes¶
dagster_ray.kuberay.PipesKubeRayJobClient ¶
PipesKubeRayJobClient(
client: RayJobClient | None = None,
context_injector: PipesContextInjector | None = None,
message_reader: PipesMessageReader | None = None,
forward_termination: bool = True,
timeout: float = 600,
poll_interval: float = 1,
port_forward: bool = False,
)
Bases: PipesClient, TreatAsResourceParam
A pipes client for running RayJob on Kubernetes.
Parameters:
-
context_injector(Optional[PipesContextInjector], default:None) –A context injector to use to inject context into the
RayJob. Defaults toPipesEnvContextInjector. -
message_reader(Optional[PipesMessageReader], default:None) –A message reader to use to read messages from the glue job run. Defaults to
PipesRayJobMessageReader. -
client(Optional[client], default:None) –The Kubernetes API client.
-
forward_termination(bool, default:True) –Whether to terminate the Ray job when the Dagster process receives a termination signal, or if the startup timeout is reached. Defaults to
True. -
timeout(int, default:600) –Timeout for various internal interactions with the Kubernetes RayJob.
-
poll_interval(int, default:1) –Interval at which to poll Kubernetes for status updates.
-
port_forward(bool, default:False) –Whether to use Kubernetes port-forwarding to connect to the KubeRay cluster. Is useful when running in a local environment.
Info
Image defaults to dagster/image run tag.
Tip
Make sure ray[full] is available in the image.
Source code in src/dagster_ray/kuberay/pipes.py
Functions¶
run ¶
run(
*, context: OpOrAssetExecutionContext, ray_job: dict[str, Any], extras: PipesExtras | None = None
) -> PipesClientCompletedInvocation
Execute a RayJob, enriched with the Pipes protocol.
Parameters:
-
context(OpExecutionContext) –Current Dagster op or asset context.
-
ray_job(Dict[str, Any]) –RayJob specification.
API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>_. -
extras(Optional[Dict[str, Any]], default:None) –Additional information to pass to the Pipes session.
Source code in src/dagster_ray/kuberay/pipes.py
Configuration and Types¶
dagster_ray.kuberay.configs.RayJobConfig
pydantic-model
¶
Bases: Config
Show JSON schema:
{
"$defs": {
"RayClusterSpec": {
"additionalProperties": true,
"description": "[RayCluster spec](https://ray-project.github.io/kuberay/reference/api/#rayclusterspec) configuration options. A few sensible defaults are provided for convenience.",
"properties": {
"suspend": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"title": "Suspend"
},
"managed_by": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Managed By"
},
"autoscaler_options": {
"default": {
"upscalingMode": "Default",
"idleTimeoutSeconds": 60,
"env": [],
"envFrom": [],
"resources": {
"limits": {
"cpu": "50m",
"memory": "0.1Gi"
},
"requests": {
"cpu": "50m",
"memory": "0.1Gi"
}
}
},
"title": "Autoscaler Options",
"type": "object"
},
"head_service_annotations": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Head Service Annotations"
},
"enable_in_tree_autoscaling": {
"default": false,
"title": "Enable In Tree Autoscaling",
"type": "boolean"
},
"gcs_fault_tolerance_options": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Gcs Fault Tolerance Options"
},
"head_group_spec": {
"default": {
"serviceType": "ClusterIP",
"rayStartParams": {},
"metadata": {
"annotations": {},
"labels": {}
},
"template": {
"spec": {
"affinity": {},
"containers": [
{
"imagePullPolicy": "Always",
"name": "head",
"volumeMounts": [
{
"mountPath": "/tmp/ray",
"name": "ray-logs"
}
]
}
],
"imagePullSecrets": [],
"nodeSelector": {},
"tolerations": [],
"volumes": [
{
"emptyDir": {},
"name": "ray-logs"
}
]
}
}
},
"title": "Head Group Spec",
"type": "object"
},
"ray_version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Ray Version"
},
"worker_group_specs": {
"default": [
{
"groupName": "workers",
"replicas": 0,
"minReplicas": 0,
"maxReplicas": 1,
"rayStartParams": {},
"template": {
"metadata": {
"annotations": {},
"labels": {}
},
"spec": {
"affinity": {},
"containers": [
{
"imagePullPolicy": "Always",
"name": "worker",
"volumeMounts": [
{
"mountPath": "/tmp/ray",
"name": "ray-logs"
}
]
}
],
"imagePullSecrets": [],
"nodeSelector": {},
"tolerations": [],
"volumes": [
{
"emptyDir": {},
"name": "ray-logs"
}
]
}
}
}
],
"items": {
"type": "object"
},
"title": "Worker Group Specs",
"type": "array"
}
},
"title": "RayClusterSpec",
"type": "object"
},
"RayJobSpec": {
"additionalProperties": true,
"description": "[RayJob spec](https://ray-project.github.io/kuberay/reference/api/#rayjobspec) configuration options. A few sensible defaults are provided for convenience.",
"properties": {
"active_deadline_seconds": {
"default": 86400,
"title": "Active Deadline Seconds",
"type": "integer"
},
"backoff_limit": {
"default": 0,
"title": "Backoff Limit",
"type": "integer"
},
"ray_cluster_spec": {
"anyOf": [
{
"$ref": "#/$defs/RayClusterSpec"
},
{
"type": "null"
}
]
},
"submitter_pod_template": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Submitter Pod Template"
},
"metadata": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Metadata"
},
"cluster_selector": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Cluster Selector"
},
"managed_by": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Managed By"
},
"deletion_strategy": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"title": "Deletion Strategy"
},
"runtime_env_yaml": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Runtime Env Yaml"
},
"job_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Job Id"
},
"submission_mode": {
"default": "K8sJobMode",
"enum": [
"K8sJobMode",
"HTTPMode",
"InteractiveMode"
],
"title": "Submission Mode",
"type": "string"
},
"entrypoint_resources": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Entrypoint Resources"
},
"entrypoint_num_cpus": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"title": "Entrypoint Num Cpus"
},
"entrypoint_memory": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"title": "Entrypoint Memory"
},
"entrypoint_num_gpus": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"title": "Entrypoint Num Gpus"
},
"ttl_seconds_after_finished": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": 300,
"title": "Ttl Seconds After Finished"
},
"shutdown_after_job_finishes": {
"default": true,
"title": "Shutdown After Job Finishes",
"type": "boolean"
},
"suspend": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"title": "Suspend"
}
},
"title": "RayJobSpec",
"type": "object"
}
},
"properties": {
"kind": {
"default": "RayJob",
"title": "Kind",
"type": "string"
},
"api_version": {
"default": "ray.io/v1",
"title": "Api Version",
"type": "string"
},
"metadata": {
"description": "Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
"title": "Metadata",
"type": "object"
},
"spec": {
"$ref": "#/$defs/RayJobSpec"
}
},
"title": "RayJobConfig",
"type": "object"
}
Fields:
Attributes¶
metadata
pydantic-field
¶
Kubernetes metadata, except the name field can be omitted. In this case it will be generated by dagster-ray.
Functions¶
to_k8s ¶
to_k8s(
context: AnyDagsterContext,
image: str | None = None,
labels: Mapping[str, str] | None = None,
annotations: Mapping[str, str] | None = None,
env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]
Convert into Kubernetes manifests in camelCase format and inject additional information
Source code in src/dagster_ray/kuberay/configs.py
dagster_ray.kuberay.configs.RayJobSpec ¶
Bases: PermissiveConfig
RayJob spec configuration options. A few sensible defaults are provided for convenience.
Attributes¶
ray_cluster_spec
class-attribute
instance-attribute
¶
ray_cluster_spec: RayClusterSpec | None = Field(default_factory=RayClusterSpec)
Functions¶
to_k8s ¶
to_k8s(
context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]
Convert into Kubernetes manifests in camelCase format and inject additional information
Source code in src/dagster_ray/kuberay/configs.py
dagster_ray.kuberay.resources.rayjob.InteractiveRayJobConfig
pydantic-model
¶
Bases: RayJobConfig
Same as RayJobConfig, but spec.submission_mode mode has to be InteractiveMode
Show JSON schema:
{
"$defs": {
"InteractiveRayJobSpec": {
"additionalProperties": true,
"description": "Same as [`RayJobSpec`][dagster_ray.kuberay.configs.RayJobSpec], but `mode` has to be `InteractiveMode`",
"properties": {
"active_deadline_seconds": {
"default": 86400,
"title": "Active Deadline Seconds",
"type": "integer"
},
"backoff_limit": {
"default": 0,
"title": "Backoff Limit",
"type": "integer"
},
"ray_cluster_spec": {
"anyOf": [
{
"$ref": "#/$defs/RayClusterSpec"
},
{
"type": "null"
}
]
},
"submitter_pod_template": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Submitter Pod Template"
},
"metadata": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Metadata"
},
"cluster_selector": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Cluster Selector"
},
"managed_by": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Managed By"
},
"deletion_strategy": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"title": "Deletion Strategy"
},
"runtime_env_yaml": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Runtime Env Yaml"
},
"job_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Job Id"
},
"submission_mode": {
"const": "InteractiveMode",
"default": "InteractiveMode",
"title": "Submission Mode",
"type": "string"
},
"entrypoint_resources": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Entrypoint Resources"
},
"entrypoint_num_cpus": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"title": "Entrypoint Num Cpus"
},
"entrypoint_memory": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"title": "Entrypoint Memory"
},
"entrypoint_num_gpus": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"title": "Entrypoint Num Gpus"
},
"ttl_seconds_after_finished": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": 300,
"title": "Ttl Seconds After Finished"
},
"shutdown_after_job_finishes": {
"default": true,
"title": "Shutdown After Job Finishes",
"type": "boolean"
},
"suspend": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"title": "Suspend"
}
},
"title": "InteractiveRayJobSpec",
"type": "object"
},
"RayClusterSpec": {
"additionalProperties": true,
"description": "[RayCluster spec](https://ray-project.github.io/kuberay/reference/api/#rayclusterspec) configuration options. A few sensible defaults are provided for convenience.",
"properties": {
"suspend": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"title": "Suspend"
},
"managed_by": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Managed By"
},
"autoscaler_options": {
"default": {
"upscalingMode": "Default",
"idleTimeoutSeconds": 60,
"env": [],
"envFrom": [],
"resources": {
"limits": {
"cpu": "50m",
"memory": "0.1Gi"
},
"requests": {
"cpu": "50m",
"memory": "0.1Gi"
}
}
},
"title": "Autoscaler Options",
"type": "object"
},
"head_service_annotations": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Head Service Annotations"
},
"enable_in_tree_autoscaling": {
"default": false,
"title": "Enable In Tree Autoscaling",
"type": "boolean"
},
"gcs_fault_tolerance_options": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Gcs Fault Tolerance Options"
},
"head_group_spec": {
"default": {
"serviceType": "ClusterIP",
"rayStartParams": {},
"metadata": {
"annotations": {},
"labels": {}
},
"template": {
"spec": {
"affinity": {},
"containers": [
{
"imagePullPolicy": "Always",
"name": "head",
"volumeMounts": [
{
"mountPath": "/tmp/ray",
"name": "ray-logs"
}
]
}
],
"imagePullSecrets": [],
"nodeSelector": {},
"tolerations": [],
"volumes": [
{
"emptyDir": {},
"name": "ray-logs"
}
]
}
}
},
"title": "Head Group Spec",
"type": "object"
},
"ray_version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Ray Version"
},
"worker_group_specs": {
"default": [
{
"groupName": "workers",
"replicas": 0,
"minReplicas": 0,
"maxReplicas": 1,
"rayStartParams": {},
"template": {
"metadata": {
"annotations": {},
"labels": {}
},
"spec": {
"affinity": {},
"containers": [
{
"imagePullPolicy": "Always",
"name": "worker",
"volumeMounts": [
{
"mountPath": "/tmp/ray",
"name": "ray-logs"
}
]
}
],
"imagePullSecrets": [],
"nodeSelector": {},
"tolerations": [],
"volumes": [
{
"emptyDir": {},
"name": "ray-logs"
}
]
}
}
}
],
"items": {
"type": "object"
},
"title": "Worker Group Specs",
"type": "array"
}
},
"title": "RayClusterSpec",
"type": "object"
}
},
"description": "Same as [`RayJobConfig`][dagster_ray.kuberay.configs.RayJobConfig], but `spec.submission_mode` mode has to be `InteractiveMode`",
"properties": {
"kind": {
"default": "RayJob",
"title": "Kind",
"type": "string"
},
"api_version": {
"default": "ray.io/v1",
"title": "Api Version",
"type": "string"
},
"metadata": {
"description": "Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
"title": "Metadata",
"type": "object"
},
"spec": {
"$ref": "#/$defs/InteractiveRayJobSpec"
}
},
"title": "InteractiveRayJobConfig",
"type": "object"
}
Fields:
Attributes¶
metadata
pydantic-field
¶
Kubernetes metadata, except the name field can be omitted. In this case it will be generated by dagster-ray.
Functions¶
to_k8s ¶
to_k8s(
context: AnyDagsterContext,
image: str | None = None,
labels: Mapping[str, str] | None = None,
annotations: Mapping[str, str] | None = None,
env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]
Convert into Kubernetes manifests in camelCase format and inject additional information
Source code in src/dagster_ray/kuberay/configs.py
dagster_ray.kuberay.resources.rayjob.InteractiveRayJobSpec ¶
Bases: RayJobSpec
Same as RayJobSpec, but mode has to be InteractiveMode
Attributes¶
submission_mode
class-attribute
instance-attribute
¶
submission_mode: Literal['InteractiveMode'] = 'InteractiveMode'
active_deadline_seconds
class-attribute
instance-attribute
¶
active_deadline_seconds: int = 60 * 60 * 24
ray_cluster_spec
class-attribute
instance-attribute
¶
ray_cluster_spec: RayClusterSpec | None = Field(default_factory=RayClusterSpec)
submitter_pod_template
class-attribute
instance-attribute
¶
cluster_selector
class-attribute
instance-attribute
¶
deletion_strategy
class-attribute
instance-attribute
¶
deletion_strategy: dict[str, Any] | None = Field(
default_factory=lambda: {"onFailure": {"policy": "DeleteCluster"}, "onSuccess": {"policy": "DeleteCluster"}}
)
ttl_seconds_after_finished
class-attribute
instance-attribute
¶
ttl_seconds_after_finished: int | None = 5 * 60
shutdown_after_job_finishes
class-attribute
instance-attribute
¶
shutdown_after_job_finishes: bool = True
Functions¶
to_k8s ¶
to_k8s(
context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]
Convert into Kubernetes manifests in camelCase format and inject additional information
Source code in src/dagster_ray/kuberay/configs.py
dagster_ray.kuberay.configs.RayClusterConfig
pydantic-model
¶
Bases: Config
Show JSON schema:
{
"$defs": {
"RayClusterSpec": {
"additionalProperties": true,
"description": "[RayCluster spec](https://ray-project.github.io/kuberay/reference/api/#rayclusterspec) configuration options. A few sensible defaults are provided for convenience.",
"properties": {
"suspend": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"title": "Suspend"
},
"managed_by": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Managed By"
},
"autoscaler_options": {
"default": {
"upscalingMode": "Default",
"idleTimeoutSeconds": 60,
"env": [],
"envFrom": [],
"resources": {
"limits": {
"cpu": "50m",
"memory": "0.1Gi"
},
"requests": {
"cpu": "50m",
"memory": "0.1Gi"
}
}
},
"title": "Autoscaler Options",
"type": "object"
},
"head_service_annotations": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Head Service Annotations"
},
"enable_in_tree_autoscaling": {
"default": false,
"title": "Enable In Tree Autoscaling",
"type": "boolean"
},
"gcs_fault_tolerance_options": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Gcs Fault Tolerance Options"
},
"head_group_spec": {
"default": {
"serviceType": "ClusterIP",
"rayStartParams": {},
"metadata": {
"annotations": {},
"labels": {}
},
"template": {
"spec": {
"affinity": {},
"containers": [
{
"imagePullPolicy": "Always",
"name": "head",
"volumeMounts": [
{
"mountPath": "/tmp/ray",
"name": "ray-logs"
}
]
}
],
"imagePullSecrets": [],
"nodeSelector": {},
"tolerations": [],
"volumes": [
{
"emptyDir": {},
"name": "ray-logs"
}
]
}
}
},
"title": "Head Group Spec",
"type": "object"
},
"ray_version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Ray Version"
},
"worker_group_specs": {
"default": [
{
"groupName": "workers",
"replicas": 0,
"minReplicas": 0,
"maxReplicas": 1,
"rayStartParams": {},
"template": {
"metadata": {
"annotations": {},
"labels": {}
},
"spec": {
"affinity": {},
"containers": [
{
"imagePullPolicy": "Always",
"name": "worker",
"volumeMounts": [
{
"mountPath": "/tmp/ray",
"name": "ray-logs"
}
]
}
],
"imagePullSecrets": [],
"nodeSelector": {},
"tolerations": [],
"volumes": [
{
"emptyDir": {},
"name": "ray-logs"
}
]
}
}
}
],
"items": {
"type": "object"
},
"title": "Worker Group Specs",
"type": "array"
}
},
"title": "RayClusterSpec",
"type": "object"
}
},
"properties": {
"kind": {
"default": "RayCluster",
"title": "Kind",
"type": "string"
},
"api_version": {
"default": "ray.io/v1",
"title": "Api Version",
"type": "string"
},
"metadata": {
"description": "Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
"title": "Metadata",
"type": "object"
},
"spec": {
"$ref": "#/$defs/RayClusterSpec"
}
},
"title": "RayClusterConfig",
"type": "object"
}
Fields:
Attributes¶
metadata
pydantic-field
¶
Kubernetes metadata, except the name field can be omitted. In this case it will be generated by dagster-ray.
Functions¶
to_k8s ¶
to_k8s(
context: AnyDagsterContext,
image: str | None = None,
labels: Mapping[str, str] | None = None,
annotations: Mapping[str, str] | None = None,
env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/configs.py
dagster_ray.kuberay.configs.RayClusterSpec ¶
Bases: PermissiveConfig
RayCluster spec configuration options. A few sensible defaults are provided for convenience.
Attributes¶
autoscaler_options
class-attribute
instance-attribute
¶
head_service_annotations
class-attribute
instance-attribute
¶
enable_in_tree_autoscaling
class-attribute
instance-attribute
¶
enable_in_tree_autoscaling: bool = False
gcs_fault_tolerance_options
class-attribute
instance-attribute
¶
head_group_spec
class-attribute
instance-attribute
¶
worker_group_specs
class-attribute
instance-attribute
¶
Functions¶
to_k8s ¶
to_k8s(
context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]
Convert into Kubernetes manifests in camelCase format and inject additional information
Source code in src/dagster_ray/kuberay/configs.py
dagster_ray.kuberay.configs.MatchDagsterLabels
pydantic-model
¶
Bases: Config
Show JSON schema:
{
"properties": {
"cluster_sharing": {
"default": true,
"description": "Whether to match on `dagster/cluster-sharing=true` label.",
"title": "Cluster Sharing",
"type": "boolean"
},
"code_location": {
"default": true,
"description": "Whether to match on `dagster/code-location` label. The value will be taken from the current Dagster code location.",
"title": "Code Location",
"type": "boolean"
},
"resource_key": {
"default": true,
"description": "Whether to match on `dagster/resource-key` label. The value will be taken from the current Dagster resource key.",
"title": "Resource Key",
"type": "boolean"
},
"git_sha": {
"default": true,
"description": "Whether to match on `dagster/git-sha` label. The value will be taken from `DAGSTER_CLOUD_GIT_SHA` environment variable.",
"title": "Git Sha",
"type": "boolean"
},
"run_id": {
"default": false,
"description": "Whether to match on `dagster/run-id` label. The value will be taken from the current Dagster run ID.",
"title": "Run Id",
"type": "boolean"
}
},
"title": "MatchDagsterLabels",
"type": "object"
}
Fields:
-
cluster_sharing(bool) -
code_location(bool) -
resource_key(bool) -
git_sha(bool) -
run_id(bool)
Attributes¶
cluster_sharing
pydantic-field
¶
cluster_sharing: bool = True
Whether to match on dagster/cluster-sharing=true label.
code_location
pydantic-field
¶
code_location: bool = True
Whether to match on dagster/code-location label. The value will be taken from the current Dagster code location.
resource_key
pydantic-field
¶
resource_key: bool = True
Whether to match on dagster/resource-key label. The value will be taken from the current Dagster resource key.
dagster_ray.kuberay.configs.ClusterSharing
pydantic-model
¶
Bases: Config
Defines the strategy for sharing RayCluster resources with other Dagster steps.
By default, the cluster is expected to be created by Dagster during one of the previously executed steps.
Show JSON schema:
{
"$defs": {
"MatchDagsterLabels": {
"properties": {
"cluster_sharing": {
"default": true,
"description": "Whether to match on `dagster/cluster-sharing=true` label.",
"title": "Cluster Sharing",
"type": "boolean"
},
"code_location": {
"default": true,
"description": "Whether to match on `dagster/code-location` label. The value will be taken from the current Dagster code location.",
"title": "Code Location",
"type": "boolean"
},
"resource_key": {
"default": true,
"description": "Whether to match on `dagster/resource-key` label. The value will be taken from the current Dagster resource key.",
"title": "Resource Key",
"type": "boolean"
},
"git_sha": {
"default": true,
"description": "Whether to match on `dagster/git-sha` label. The value will be taken from `DAGSTER_CLOUD_GIT_SHA` environment variable.",
"title": "Git Sha",
"type": "boolean"
},
"run_id": {
"default": false,
"description": "Whether to match on `dagster/run-id` label. The value will be taken from the current Dagster run ID.",
"title": "Run Id",
"type": "boolean"
}
},
"title": "MatchDagsterLabels",
"type": "object"
}
},
"description": "Defines the strategy for sharing `RayCluster` resources with other Dagster steps.\n\nBy default, the cluster is expected to be created by Dagster during one of the previously executed steps.",
"properties": {
"enabled": {
"default": false,
"description": "Whether to enable sharing of RayClusters.",
"title": "Enabled",
"type": "boolean"
},
"match_dagster_labels": {
"$ref": "#/$defs/MatchDagsterLabels",
"description": "Configuration for matching on Dagster-generated labels."
},
"match_labels": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"description": "Additional user-provided labels to match on.",
"title": "Match Labels"
},
"ttl_seconds": {
"default": 1800.0,
"description": "Time to live for the lock placed on the `RayCluster` resource, marking it as in use by the current Dagster step.",
"title": "Ttl Seconds",
"type": "number"
}
},
"title": "ClusterSharing",
"type": "object"
}
Fields:
-
enabled(bool) -
match_dagster_labels(MatchDagsterLabels) -
match_labels(dict[str, str] | None) -
ttl_seconds(float)
Attributes¶
match_dagster_labels
pydantic-field
¶
match_dagster_labels: MatchDagsterLabels
Configuration for matching on Dagster-generated labels.
match_labels
pydantic-field
¶
Additional user-provided labels to match on.
--
dagster_ray.kuberay.resources.base.BaseKubeRayResource
pydantic-model
¶
Bases: RayResource, ABC
Show JSON schema:
{
"$defs": {
"ExecutionOptionsConfig": {
"properties": {
"cpu": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Cpu"
},
"gpu": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Gpu"
},
"object_store_memory": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Object Store Memory"
}
},
"title": "ExecutionOptionsConfig",
"type": "object"
},
"Lifecycle": {
"properties": {
"create": {
"default": true,
"description": "Whether to create the resource. If set to `False`, the user can manually call `.create` instead.",
"title": "Create",
"type": "boolean"
},
"wait": {
"default": true,
"description": "Whether to wait for the remote Ray cluster to become ready to accept connections. If set to `False`, the user can manually call `.wait` instead.",
"title": "Wait",
"type": "boolean"
},
"connect": {
"default": true,
"description": "Whether to run `ray.init` against the remote Ray cluster. If set to `False`, the user can manually call `.connect` instead.",
"title": "Connect",
"type": "boolean"
},
"cleanup": {
"default": "always",
"description": "Resource cleanup policy. Determines when the resource should be deleted after Dagster step execution or during interruption.",
"enum": [
"never",
"always",
"on_exception"
],
"title": "Cleanup",
"type": "string"
}
},
"title": "Lifecycle",
"type": "object"
},
"RayDataExecutionOptions": {
"properties": {
"execution_options": {
"$ref": "#/$defs/ExecutionOptionsConfig"
},
"cpu_limit": {
"default": 5000,
"title": "Cpu Limit",
"type": "integer"
},
"gpu_limit": {
"default": 0,
"title": "Gpu Limit",
"type": "integer"
},
"verbose_progress": {
"default": true,
"title": "Verbose Progress",
"type": "boolean"
},
"use_polars": {
"default": true,
"title": "Use Polars",
"type": "boolean"
}
},
"title": "RayDataExecutionOptions",
"type": "object"
}
},
"properties": {
"lifecycle": {
"$ref": "#/$defs/Lifecycle",
"description": "Actions to perform during resource setup."
},
"timeout": {
"default": 600.0,
"description": "Timeout for Ray readiness in seconds",
"title": "Timeout",
"type": "number"
},
"ray_init_options": {
"description": "Additional keyword arguments to pass to `ray.init()` call, such as `runtime_env`, `num_cpus`, etc. Dagster's `EnvVar` is supported. More details in [Ray docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).",
"title": "Ray Init Options",
"type": "object"
},
"data_execution_options": {
"$ref": "#/$defs/RayDataExecutionOptions"
},
"redis_port": {
"default": 10001,
"description": "Redis port for connection. Make sure to match with the actual available port.",
"title": "Redis Port",
"type": "integer"
},
"dashboard_port": {
"default": 8265,
"description": "Dashboard port for connection. Make sure to match with the actual available port.",
"title": "Dashboard Port",
"type": "integer"
},
"env_vars": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"description": "Environment variables to pass to the Ray cluster.",
"title": "Env Vars"
},
"enable_tracing": {
"default": false,
"description": "Enable tracing: inject `RAY_PROFILING=1` and `RAY_task_events_report_interval_ms=0` into the Ray cluster configuration. This allows using `ray.timeline()` to fetch recorded task events. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.timeline.html#ray-timeline)",
"title": "Enable Tracing",
"type": "boolean"
},
"enable_actor_task_logging": {
"default": false,
"description": "Enable actor task logging: inject `RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1` into the Ray cluster configuration.",
"title": "Enable Actor Task Logging",
"type": "boolean"
},
"enable_debug_post_mortem": {
"default": false,
"description": "Enable post-mortem debugging: inject `RAY_DEBUG_POST_MORTEM=1` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/ray-distributed-debugger.html)",
"title": "Enable Debug Post Mortem",
"type": "boolean"
},
"enable_legacy_debugger": {
"default": false,
"description": "Enable legacy debugger: inject `RAY_DEBUG=legacy` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/user-guides/debug-apps/ray-debugging.html#using-the-ray-debugger)",
"title": "Enable Legacy Debugger",
"type": "boolean"
},
"image": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Image to inject into the `RayCluster` spec. Defaults to `dagster/image` run tag. Images already provided in the `RayCluster` spec won't be overridden.",
"title": "Image"
},
"deployment_name": {
"default": "dev",
"description": "Dagster deployment name. Is used as a prefix for the Kubernetes resource name. Dagster Cloud variables are used to determine the default value.",
"title": "Deployment Name",
"type": "string"
},
"failure_tolerance_timeout": {
"default": 0.0,
"description": "The period in seconds to wait for the cluster to transition out of `failed` state if it reaches it. This state can be transient under certain conditions. With the default value of 0, the first `failed` state appearance will raise an exception immediately.",
"title": "Failure Tolerance Timeout",
"type": "number"
},
"poll_interval": {
"default": 1.0,
"description": "Poll interval for various API requests",
"title": "Poll Interval",
"type": "number"
}
},
"title": "BaseKubeRayResource",
"type": "object"
}
Fields:
-
lifecycle(Lifecycle) -
timeout(float) -
ray_init_options(dict[str, Any]) -
data_execution_options(RayDataExecutionOptions) -
redis_port(int) -
dashboard_port(int) -
env_vars(dict[str, str] | None) -
enable_tracing(bool) -
enable_actor_task_logging(bool) -
enable_debug_post_mortem(bool) -
enable_legacy_debugger(bool) -
_context(BaseContext | None) -
_creation_verb(str) -
image(str | None) -
deployment_name(str) -
failure_tolerance_timeout(float) -
poll_interval(float) -
_host(str)
Attributes¶
image
pydantic-field
¶
image: str | None = None
Image to inject into the RayCluster spec. Defaults to dagster/image run tag. Images already provided in the RayCluster spec won't be overridden.
Resources¶
dagster_ray.kuberay.KubeRayJobClientResource
pydantic-model
¶
Bases: ConfigurableResource[RayJobClient]
This configurable resource provides a dagster_ray.kuberay.client.RayJobClient.
Show JSON schema:
{
"description": "This configurable resource provides a [dagster_ray.kuberay.client.RayJobClient][].",
"properties": {
"kube_context": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Kube Context"
},
"kube_config": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Kube Config"
}
},
"title": "KubeRayJobClientResource",
"type": "object"
}
Fields:
dagster_ray.kuberay.KubeRayClusterClientResource
pydantic-model
¶
Bases: ConfigurableResource[RayClusterClient]
This configurable resource provides a dagster_ray.kuberay.client.RayClusterClient.
Show JSON schema:
{
"description": "This configurable resource provides a [dagster_ray.kuberay.client.RayClusterClient][].",
"properties": {
"kube_context": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Kube Context"
},
"kube_config": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Kube Config"
}
},
"title": "KubeRayClusterClientResource",
"type": "object"
}
Fields:
Sensors¶
dagster_ray.kuberay.sensors.cleanup_expired_kuberay_clusters ¶
cleanup_expired_kuberay_clusters(
context: SensorEvaluationContext, raycluster_client: ResourceParam[RayClusterClient]
) -> Generator[RunRequest | SkipReason, None, None]
Source code in src/dagster_ray/kuberay/sensors.py
A Dagster sensor that monitors shared RayCluster resources created by the current Dagster code location (with a dagster/code-location=<current-code-location> label selector) and submits jobs to delete clusters either:
- use Cluster Sharing (dagster/cluster-sharing=true) and have expired
- are older than DAGSTER_RAY_CLUSTER_EXPIRATION_SECONDS (defaults to 4 hours)
By default it monitors the ray namespace. This can be configured by setting DAGSTER_RAY_NAMESPACES (accepts a comma-separated list of namespaces).
Kubernetes API Clients¶
dagster_ray.kuberay.client.RayClusterClient ¶
RayClusterClient(kube_config: str | None = None, kube_context: str | None = None, api_client: ApiClient | None = None)
Bases: BaseKubeRayClient[RayClusterStatus]
Source code in src/dagster_ray/kuberay/client/raycluster/client.py
Functions¶
create ¶
Source code in src/dagster_ray/kuberay/client/base.py
delete ¶
get ¶
Source code in src/dagster_ray/kuberay/client/base.py
list ¶
Source code in src/dagster_ray/kuberay/client/base.py
update ¶
dagster_ray.kuberay.client.RayJobClient ¶
RayJobClient(kube_config: str | None = None, kube_context: str | None = None, api_client: ApiClient | None = None)
Bases: BaseKubeRayClient[RayJobStatus]