Skip to content

KubeRay API Reference

KubeRay integration components for running Ray on Kubernetes. Learn how to use it here.


Client Mode Resources

These resources initialize Ray client connection with a remote cluster.

dagster_ray.kuberay.KubeRayInteractiveJob pydantic-model

Bases: BaseKubeRayResource

Provides a Ray Job for Dagster steps.

Is the recommended way to run Ray workloads with automatic cluster management. It creates a Ray Job, connects to it in client mode and sets the jobId field. Cleanup is handled by the KubeRay controller or by the resource lifecycle logic.

Info

Image defaults to dagster/image run tag.

Tip

Make sure ray[full] is available in the image.

Show JSON schema:
{
  "$defs": {
    "ExecutionOptionsConfig": {
      "properties": {
        "cpu": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Cpu"
        },
        "gpu": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Gpu"
        },
        "object_store_memory": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Object Store Memory"
        }
      },
      "title": "ExecutionOptionsConfig",
      "type": "object"
    },
    "InteractiveRayJobConfig": {
      "description": "Same as [`RayJobConfig`][dagster_ray.kuberay.configs.RayJobConfig], but `spec.submission_mode` mode has to be `InteractiveMode`",
      "properties": {
        "kind": {
          "default": "RayJob",
          "title": "Kind",
          "type": "string"
        },
        "api_version": {
          "default": "ray.io/v1",
          "title": "Api Version",
          "type": "string"
        },
        "metadata": {
          "description": "Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
          "title": "Metadata",
          "type": "object"
        },
        "spec": {
          "$ref": "#/$defs/InteractiveRayJobSpec"
        }
      },
      "title": "InteractiveRayJobConfig",
      "type": "object"
    },
    "InteractiveRayJobSpec": {
      "additionalProperties": true,
      "description": "Same as [`RayJobSpec`][dagster_ray.kuberay.configs.RayJobSpec], but `mode` has to be `InteractiveMode`",
      "properties": {
        "active_deadline_seconds": {
          "default": 86400,
          "title": "Active Deadline Seconds",
          "type": "integer"
        },
        "backoff_limit": {
          "default": 0,
          "title": "Backoff Limit",
          "type": "integer"
        },
        "ray_cluster_spec": {
          "anyOf": [
            {
              "$ref": "#/$defs/RayClusterSpec"
            },
            {
              "type": "null"
            }
          ]
        },
        "submitter_pod_template": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Submitter Pod Template"
        },
        "metadata": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Metadata"
        },
        "cluster_selector": {
          "anyOf": [
            {
              "additionalProperties": {
                "type": "string"
              },
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Cluster Selector"
        },
        "managed_by": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Managed By"
        },
        "deletion_strategy": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "title": "Deletion Strategy"
        },
        "runtime_env_yaml": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Runtime Env Yaml"
        },
        "job_id": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Job Id"
        },
        "submission_mode": {
          "const": "InteractiveMode",
          "default": "InteractiveMode",
          "title": "Submission Mode",
          "type": "string"
        },
        "entrypoint_resources": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Entrypoint Resources"
        },
        "entrypoint_num_cpus": {
          "anyOf": [
            {
              "type": "number"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Entrypoint Num Cpus"
        },
        "entrypoint_memory": {
          "anyOf": [
            {
              "type": "number"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Entrypoint Memory"
        },
        "entrypoint_num_gpus": {
          "anyOf": [
            {
              "type": "number"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Entrypoint Num Gpus"
        },
        "ttl_seconds_after_finished": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": 300,
          "title": "Ttl Seconds After Finished"
        },
        "shutdown_after_job_finishes": {
          "default": true,
          "title": "Shutdown After Job Finishes",
          "type": "boolean"
        },
        "suspend": {
          "anyOf": [
            {
              "type": "boolean"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Suspend"
        }
      },
      "title": "InteractiveRayJobSpec",
      "type": "object"
    },
    "Lifecycle": {
      "properties": {
        "create": {
          "default": true,
          "description": "Whether to create the resource. If set to `False`, the user can manually call `.create` instead.",
          "title": "Create",
          "type": "boolean"
        },
        "wait": {
          "default": true,
          "description": "Whether to wait for the remote Ray cluster to become ready to accept connections. If set to `False`, the user can manually call `.wait` instead.",
          "title": "Wait",
          "type": "boolean"
        },
        "connect": {
          "default": true,
          "description": "Whether to run `ray.init` against the remote Ray cluster. If set to `False`, the user can manually call `.connect` instead.",
          "title": "Connect",
          "type": "boolean"
        },
        "cleanup": {
          "default": "always",
          "description": "Resource cleanup policy. Determines when the resource should be deleted after Dagster step execution or during interruption.",
          "enum": [
            "never",
            "always",
            "on_exception"
          ],
          "title": "Cleanup",
          "type": "string"
        }
      },
      "title": "Lifecycle",
      "type": "object"
    },
    "RayClusterSpec": {
      "additionalProperties": true,
      "description": "[RayCluster spec](https://ray-project.github.io/kuberay/reference/api/#rayclusterspec) configuration options. A few sensible defaults are provided for convenience.",
      "properties": {
        "suspend": {
          "anyOf": [
            {
              "type": "boolean"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Suspend"
        },
        "managed_by": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Managed By"
        },
        "autoscaler_options": {
          "default": {
            "upscalingMode": "Default",
            "idleTimeoutSeconds": 60,
            "env": [],
            "envFrom": [],
            "resources": {
              "limits": {
                "cpu": "50m",
                "memory": "0.1Gi"
              },
              "requests": {
                "cpu": "50m",
                "memory": "0.1Gi"
              }
            }
          },
          "title": "Autoscaler Options",
          "type": "object"
        },
        "head_service_annotations": {
          "anyOf": [
            {
              "additionalProperties": {
                "type": "string"
              },
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Head Service Annotations"
        },
        "enable_in_tree_autoscaling": {
          "default": false,
          "title": "Enable In Tree Autoscaling",
          "type": "boolean"
        },
        "gcs_fault_tolerance_options": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Gcs Fault Tolerance Options"
        },
        "head_group_spec": {
          "default": {
            "serviceType": "ClusterIP",
            "rayStartParams": {},
            "metadata": {
              "annotations": {},
              "labels": {}
            },
            "template": {
              "spec": {
                "affinity": {},
                "containers": [
                  {
                    "imagePullPolicy": "Always",
                    "name": "head",
                    "volumeMounts": [
                      {
                        "mountPath": "/tmp/ray",
                        "name": "ray-logs"
                      }
                    ]
                  }
                ],
                "imagePullSecrets": [],
                "nodeSelector": {},
                "tolerations": [],
                "volumes": [
                  {
                    "emptyDir": {},
                    "name": "ray-logs"
                  }
                ]
              }
            }
          },
          "title": "Head Group Spec",
          "type": "object"
        },
        "ray_version": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Ray Version"
        },
        "worker_group_specs": {
          "default": [
            {
              "groupName": "workers",
              "replicas": 0,
              "minReplicas": 0,
              "maxReplicas": 1,
              "rayStartParams": {},
              "template": {
                "metadata": {
                  "annotations": {},
                  "labels": {}
                },
                "spec": {
                  "affinity": {},
                  "containers": [
                    {
                      "imagePullPolicy": "Always",
                      "name": "worker",
                      "volumeMounts": [
                        {
                          "mountPath": "/tmp/ray",
                          "name": "ray-logs"
                        }
                      ]
                    }
                  ],
                  "imagePullSecrets": [],
                  "nodeSelector": {},
                  "tolerations": [],
                  "volumes": [
                    {
                      "emptyDir": {},
                      "name": "ray-logs"
                    }
                  ]
                }
              }
            }
          ],
          "items": {
            "type": "object"
          },
          "title": "Worker Group Specs",
          "type": "array"
        }
      },
      "title": "RayClusterSpec",
      "type": "object"
    },
    "RayDataExecutionOptions": {
      "properties": {
        "execution_options": {
          "$ref": "#/$defs/ExecutionOptionsConfig"
        },
        "cpu_limit": {
          "default": 5000,
          "title": "Cpu Limit",
          "type": "integer"
        },
        "gpu_limit": {
          "default": 0,
          "title": "Gpu Limit",
          "type": "integer"
        },
        "verbose_progress": {
          "default": true,
          "title": "Verbose Progress",
          "type": "boolean"
        },
        "use_polars": {
          "default": true,
          "title": "Use Polars",
          "type": "boolean"
        }
      },
      "title": "RayDataExecutionOptions",
      "type": "object"
    }
  },
  "description": "Provides a Ray Job for Dagster steps.\n\nIs the recommended way to run Ray workloads with automatic cluster management. It creates a Ray Job, connects to it in client mode and sets the `jobId` field. Cleanup is handled by the KubeRay controller or by the resource lifecycle logic.\n\nInfo:\n    Image defaults to `dagster/image` run tag.\n\nTip:\n    Make sure `ray[full]` is available in the image.",
  "properties": {
    "lifecycle": {
      "$ref": "#/$defs/Lifecycle",
      "description": "Actions to perform during resource setup."
    },
    "timeout": {
      "default": 600.0,
      "description": "Timeout for Ray readiness in seconds",
      "title": "Timeout",
      "type": "number"
    },
    "ray_init_options": {
      "description": "Additional keyword arguments to pass to `ray.init()` call, such as `runtime_env`, `num_cpus`, etc. Dagster's `EnvVar` is supported. More details in [Ray docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).",
      "title": "Ray Init Options",
      "type": "object"
    },
    "data_execution_options": {
      "$ref": "#/$defs/RayDataExecutionOptions"
    },
    "redis_port": {
      "default": 10001,
      "description": "Redis port for connection. Make sure to match with the actual available port.",
      "title": "Redis Port",
      "type": "integer"
    },
    "dashboard_port": {
      "default": 8265,
      "description": "Dashboard port for connection. Make sure to match with the actual available port.",
      "title": "Dashboard Port",
      "type": "integer"
    },
    "env_vars": {
      "anyOf": [
        {
          "additionalProperties": {
            "type": "string"
          },
          "type": "object"
        },
        {
          "type": "null"
        }
      ],
      "description": "Environment variables to pass to the Ray cluster.",
      "title": "Env Vars"
    },
    "enable_tracing": {
      "default": false,
      "description": "Enable tracing: inject `RAY_PROFILING=1` and `RAY_task_events_report_interval_ms=0` into the Ray cluster configuration. This allows using `ray.timeline()` to fetch recorded task events. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.timeline.html#ray-timeline)",
      "title": "Enable Tracing",
      "type": "boolean"
    },
    "enable_actor_task_logging": {
      "default": false,
      "description": "Enable actor task logging: inject `RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1` into the Ray cluster configuration.",
      "title": "Enable Actor Task Logging",
      "type": "boolean"
    },
    "enable_debug_post_mortem": {
      "default": false,
      "description": "Enable post-mortem debugging: inject `RAY_DEBUG_POST_MORTEM=1` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/ray-distributed-debugger.html)",
      "title": "Enable Debug Post Mortem",
      "type": "boolean"
    },
    "enable_legacy_debugger": {
      "default": false,
      "description": "Enable legacy debugger: inject `RAY_DEBUG=legacy` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/user-guides/debug-apps/ray-debugging.html#using-the-ray-debugger)",
      "title": "Enable Legacy Debugger",
      "type": "boolean"
    },
    "image": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Image to inject into the `RayCluster` spec. Defaults to `dagster/image` run tag. Images already provided in the `RayCluster` spec won't be overridden.",
      "title": "Image"
    },
    "deployment_name": {
      "default": "dev",
      "description": "Dagster deployment name. Is used as a prefix for the Kubernetes resource name. Dagster Cloud variables are used to determine the default value.",
      "title": "Deployment Name",
      "type": "string"
    },
    "failure_tolerance_timeout": {
      "default": 0.0,
      "description": "The period in seconds to wait for the cluster to transition out of `failed` state if it reaches it. This state can be transient under certain conditions. With the default value of 0, the first `failed` state appearance will raise an exception immediately.",
      "title": "Failure Tolerance Timeout",
      "type": "number"
    },
    "poll_interval": {
      "default": 1.0,
      "description": "Poll interval for various API requests",
      "title": "Poll Interval",
      "type": "number"
    },
    "ray_job": {
      "$ref": "#/$defs/InteractiveRayJobConfig",
      "description": "Configuration for the Kubernetes `RayJob` CR"
    },
    "client": {
      "description": "Kubernetes `RayJob` client",
      "title": "Client"
    },
    "log_cluster_conditions": {
      "default": true,
      "description": "Whether to log `RayCluster` conditions while waiting for the RayCluster to become ready. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/cluster/kubernetes/user-guides/observability.html#raycluster-status-conditions).",
      "title": "Log Cluster Conditions",
      "type": "boolean"
    }
  },
  "title": "KubeRayInteractiveJob",
  "type": "object"
}

Fields:

Attributes

lifecycle pydantic-field
lifecycle: Lifecycle

Actions to perform during resource setup.

ray_job pydantic-field

Configuration for the Kubernetes RayJob CR

client pydantic-field
client: ResourceDependency[RayJobClient]

Kubernetes RayJob client

failure_tolerance_timeout pydantic-field
failure_tolerance_timeout: float = 0.0

The period in seconds to wait for the cluster to transition out of failed state if it reaches it. This state can be transient under certain conditions. With the default value of 0, the first failed state appearance will raise an exception immediately.

log_cluster_conditions pydantic-field
log_cluster_conditions: bool = True

Whether to log RayCluster conditions while waiting for the RayCluster to become ready. Learn more: KubeRay docs.

dagster_ray.kuberay.KubeRayCluster pydantic-model

Bases: BaseKubeRayResource

Provides a Ray Cluster for Dagster steps.

It is advised to use KubeRayInteractiveJob with KubeRay >= 1.3.0 instead.

Info

Image defaults to dagster/image run tag.

Tip

Make sure ray[full] is available in the image.

Show JSON schema:
{
  "$defs": {
    "ClusterSharing": {
      "description": "Defines the strategy for sharing `RayCluster` resources with other Dagster steps.\n\nBy default, the cluster is expected to be created by Dagster during one of the previously executed steps.",
      "properties": {
        "enabled": {
          "default": false,
          "description": "Whether to enable sharing of RayClusters.",
          "title": "Enabled",
          "type": "boolean"
        },
        "match_dagster_labels": {
          "$ref": "#/$defs/MatchDagsterLabels",
          "description": "Configuration for matching on Dagster-generated labels."
        },
        "match_labels": {
          "anyOf": [
            {
              "additionalProperties": {
                "type": "string"
              },
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Additional user-provided labels to match on.",
          "title": "Match Labels"
        },
        "ttl_seconds": {
          "default": 1800.0,
          "description": "Time to live for the lock placed on the `RayCluster` resource, marking it as in use by the current Dagster step.",
          "title": "Ttl Seconds",
          "type": "number"
        }
      },
      "title": "ClusterSharing",
      "type": "object"
    },
    "ExecutionOptionsConfig": {
      "properties": {
        "cpu": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Cpu"
        },
        "gpu": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Gpu"
        },
        "object_store_memory": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Object Store Memory"
        }
      },
      "title": "ExecutionOptionsConfig",
      "type": "object"
    },
    "Lifecycle": {
      "properties": {
        "create": {
          "default": true,
          "description": "Whether to create the resource. If set to `False`, the user can manually call `.create` instead.",
          "title": "Create",
          "type": "boolean"
        },
        "wait": {
          "default": true,
          "description": "Whether to wait for the remote Ray cluster to become ready to accept connections. If set to `False`, the user can manually call `.wait` instead.",
          "title": "Wait",
          "type": "boolean"
        },
        "connect": {
          "default": true,
          "description": "Whether to run `ray.init` against the remote Ray cluster. If set to `False`, the user can manually call `.connect` instead.",
          "title": "Connect",
          "type": "boolean"
        },
        "cleanup": {
          "default": "always",
          "description": "Resource cleanup policy. Determines when the resource should be deleted after Dagster step execution or during interruption.",
          "enum": [
            "never",
            "always",
            "on_exception"
          ],
          "title": "Cleanup",
          "type": "string"
        }
      },
      "title": "Lifecycle",
      "type": "object"
    },
    "MatchDagsterLabels": {
      "properties": {
        "cluster_sharing": {
          "default": true,
          "description": "Whether to match on `dagster/cluster-sharing=true` label.",
          "title": "Cluster Sharing",
          "type": "boolean"
        },
        "code_location": {
          "default": true,
          "description": "Whether to match on `dagster/code-location` label. The value will be taken from the current Dagster code location.",
          "title": "Code Location",
          "type": "boolean"
        },
        "resource_key": {
          "default": true,
          "description": "Whether to match on `dagster/resource-key` label. The value will be taken from the current Dagster resource key.",
          "title": "Resource Key",
          "type": "boolean"
        },
        "git_sha": {
          "default": true,
          "description": "Whether to match on `dagster/git-sha` label. The value will be taken from `DAGSTER_CLOUD_GIT_SHA` environment variable.",
          "title": "Git Sha",
          "type": "boolean"
        },
        "run_id": {
          "default": false,
          "description": "Whether to match on `dagster/run-id` label. The value will be taken from the current Dagster run ID.",
          "title": "Run Id",
          "type": "boolean"
        }
      },
      "title": "MatchDagsterLabels",
      "type": "object"
    },
    "RayClusterConfig": {
      "properties": {
        "kind": {
          "default": "RayCluster",
          "title": "Kind",
          "type": "string"
        },
        "api_version": {
          "default": "ray.io/v1",
          "title": "Api Version",
          "type": "string"
        },
        "metadata": {
          "description": "Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
          "title": "Metadata",
          "type": "object"
        },
        "spec": {
          "$ref": "#/$defs/RayClusterSpec"
        }
      },
      "title": "RayClusterConfig",
      "type": "object"
    },
    "RayClusterSpec": {
      "additionalProperties": true,
      "description": "[RayCluster spec](https://ray-project.github.io/kuberay/reference/api/#rayclusterspec) configuration options. A few sensible defaults are provided for convenience.",
      "properties": {
        "suspend": {
          "anyOf": [
            {
              "type": "boolean"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Suspend"
        },
        "managed_by": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Managed By"
        },
        "autoscaler_options": {
          "default": {
            "upscalingMode": "Default",
            "idleTimeoutSeconds": 60,
            "env": [],
            "envFrom": [],
            "resources": {
              "limits": {
                "cpu": "50m",
                "memory": "0.1Gi"
              },
              "requests": {
                "cpu": "50m",
                "memory": "0.1Gi"
              }
            }
          },
          "title": "Autoscaler Options",
          "type": "object"
        },
        "head_service_annotations": {
          "anyOf": [
            {
              "additionalProperties": {
                "type": "string"
              },
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Head Service Annotations"
        },
        "enable_in_tree_autoscaling": {
          "default": false,
          "title": "Enable In Tree Autoscaling",
          "type": "boolean"
        },
        "gcs_fault_tolerance_options": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Gcs Fault Tolerance Options"
        },
        "head_group_spec": {
          "default": {
            "serviceType": "ClusterIP",
            "rayStartParams": {},
            "metadata": {
              "annotations": {},
              "labels": {}
            },
            "template": {
              "spec": {
                "affinity": {},
                "containers": [
                  {
                    "imagePullPolicy": "Always",
                    "name": "head",
                    "volumeMounts": [
                      {
                        "mountPath": "/tmp/ray",
                        "name": "ray-logs"
                      }
                    ]
                  }
                ],
                "imagePullSecrets": [],
                "nodeSelector": {},
                "tolerations": [],
                "volumes": [
                  {
                    "emptyDir": {},
                    "name": "ray-logs"
                  }
                ]
              }
            }
          },
          "title": "Head Group Spec",
          "type": "object"
        },
        "ray_version": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Ray Version"
        },
        "worker_group_specs": {
          "default": [
            {
              "groupName": "workers",
              "replicas": 0,
              "minReplicas": 0,
              "maxReplicas": 1,
              "rayStartParams": {},
              "template": {
                "metadata": {
                  "annotations": {},
                  "labels": {}
                },
                "spec": {
                  "affinity": {},
                  "containers": [
                    {
                      "imagePullPolicy": "Always",
                      "name": "worker",
                      "volumeMounts": [
                        {
                          "mountPath": "/tmp/ray",
                          "name": "ray-logs"
                        }
                      ]
                    }
                  ],
                  "imagePullSecrets": [],
                  "nodeSelector": {},
                  "tolerations": [],
                  "volumes": [
                    {
                      "emptyDir": {},
                      "name": "ray-logs"
                    }
                  ]
                }
              }
            }
          ],
          "items": {
            "type": "object"
          },
          "title": "Worker Group Specs",
          "type": "array"
        }
      },
      "title": "RayClusterSpec",
      "type": "object"
    },
    "RayDataExecutionOptions": {
      "properties": {
        "execution_options": {
          "$ref": "#/$defs/ExecutionOptionsConfig"
        },
        "cpu_limit": {
          "default": 5000,
          "title": "Cpu Limit",
          "type": "integer"
        },
        "gpu_limit": {
          "default": 0,
          "title": "Gpu Limit",
          "type": "integer"
        },
        "verbose_progress": {
          "default": true,
          "title": "Verbose Progress",
          "type": "boolean"
        },
        "use_polars": {
          "default": true,
          "title": "Use Polars",
          "type": "boolean"
        }
      },
      "title": "RayDataExecutionOptions",
      "type": "object"
    }
  },
  "description": "Provides a Ray Cluster for Dagster steps.\n\nIt is advised to use [`KubeRayInteractiveJob`][dagster_ray.kuberay.resources.KubeRayInteractiveJob] with KubeRay >= 1.3.0 instead.\n\nInfo:\n    Image defaults to `dagster/image` run tag.\n\nTip:\n    Make sure `ray[full]` is available in the image.",
  "properties": {
    "lifecycle": {
      "$ref": "#/$defs/Lifecycle",
      "description": "Actions to perform during resource setup."
    },
    "timeout": {
      "default": 600.0,
      "description": "Timeout for Ray readiness in seconds",
      "title": "Timeout",
      "type": "number"
    },
    "ray_init_options": {
      "description": "Additional keyword arguments to pass to `ray.init()` call, such as `runtime_env`, `num_cpus`, etc. Dagster's `EnvVar` is supported. More details in [Ray docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).",
      "title": "Ray Init Options",
      "type": "object"
    },
    "data_execution_options": {
      "$ref": "#/$defs/RayDataExecutionOptions"
    },
    "redis_port": {
      "default": 10001,
      "description": "Redis port for connection. Make sure to match with the actual available port.",
      "title": "Redis Port",
      "type": "integer"
    },
    "dashboard_port": {
      "default": 8265,
      "description": "Dashboard port for connection. Make sure to match with the actual available port.",
      "title": "Dashboard Port",
      "type": "integer"
    },
    "env_vars": {
      "anyOf": [
        {
          "additionalProperties": {
            "type": "string"
          },
          "type": "object"
        },
        {
          "type": "null"
        }
      ],
      "description": "Environment variables to pass to the Ray cluster.",
      "title": "Env Vars"
    },
    "enable_tracing": {
      "default": false,
      "description": "Enable tracing: inject `RAY_PROFILING=1` and `RAY_task_events_report_interval_ms=0` into the Ray cluster configuration. This allows using `ray.timeline()` to fetch recorded task events. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.timeline.html#ray-timeline)",
      "title": "Enable Tracing",
      "type": "boolean"
    },
    "enable_actor_task_logging": {
      "default": false,
      "description": "Enable actor task logging: inject `RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1` into the Ray cluster configuration.",
      "title": "Enable Actor Task Logging",
      "type": "boolean"
    },
    "enable_debug_post_mortem": {
      "default": false,
      "description": "Enable post-mortem debugging: inject `RAY_DEBUG_POST_MORTEM=1` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/ray-distributed-debugger.html)",
      "title": "Enable Debug Post Mortem",
      "type": "boolean"
    },
    "enable_legacy_debugger": {
      "default": false,
      "description": "Enable legacy debugger: inject `RAY_DEBUG=legacy` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/user-guides/debug-apps/ray-debugging.html#using-the-ray-debugger)",
      "title": "Enable Legacy Debugger",
      "type": "boolean"
    },
    "image": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Image to inject into the `RayCluster` spec. Defaults to `dagster/image` run tag. Images already provided in the `RayCluster` spec won't be overridden.",
      "title": "Image"
    },
    "deployment_name": {
      "default": "dev",
      "description": "Dagster deployment name. Is used as a prefix for the Kubernetes resource name. Dagster Cloud variables are used to determine the default value.",
      "title": "Deployment Name",
      "type": "string"
    },
    "failure_tolerance_timeout": {
      "default": 0.0,
      "description": "The period in seconds to wait for the cluster to transition out of `failed` state if it reaches it. This state can be transient under certain conditions. With the default value of 0, the first `failed` state appearance will raise an exception immediately.",
      "title": "Failure Tolerance Timeout",
      "type": "number"
    },
    "poll_interval": {
      "default": 1.0,
      "description": "Poll interval for various API requests",
      "title": "Poll Interval",
      "type": "number"
    },
    "cluster_sharing": {
      "$ref": "#/$defs/ClusterSharing",
      "description": "Configuration for sharing the `RayCluster` across Dagster steps. Existing clusters matching this configuration will be reused without recreating them. A `dagster/sharing=true` label will be applied to the `RayCluster`, and a `dagster/lock-<run-id>-<step-id>=<lock>` annotation will be placed on the `RayCluster` to mark it as being used by this step. Cleanup will only proceed if the `RayCluster` is not being used by any other steps, therefore cluster sharing should be used in conjunction with [dagster_ray.kuberay.sensors.cleanup_expired_kuberay_clusters][] sensor."
    },
    "ray_cluster": {
      "$ref": "#/$defs/RayClusterConfig",
      "description": "Kubernetes `RayCluster` CR configuration."
    },
    "client": {
      "description": "Kubernetes `RayCluster` client",
      "title": "Client"
    },
    "log_cluster_conditions": {
      "default": true,
      "description": "Whether to log RayCluster conditions while waiting for the RayCluster to become ready. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/cluster/kubernetes/user-guides/observability.html#raycluster-status-conditions).",
      "title": "Log Cluster Conditions",
      "type": "boolean"
    }
  },
  "title": "KubeRayCluster",
  "type": "object"
}

Fields:

Attributes

cluster_sharing pydantic-field
cluster_sharing: ClusterSharing

Configuration for sharing the RayCluster across Dagster steps. Existing clusters matching this configuration will be reused without recreating them. A dagster/sharing=true label will be applied to the RayCluster, and a dagster/lock-<run-id>-<step-id>=<lock> annotation will be placed on the RayCluster to mark it as being used by this step. Cleanup will only proceed if the RayCluster is not being used by any other steps, therefore cluster sharing should be used in conjunction with dagster_ray.kuberay.sensors.cleanup_expired_kuberay_clusters sensor.

lifecycle pydantic-field
lifecycle: Lifecycle

Actions to perform during resource setup.

ray_cluster pydantic-field
ray_cluster: RayClusterConfig

Kubernetes RayCluster CR configuration.

client pydantic-field
client: ResourceDependency[RayClusterClient]

Kubernetes RayCluster client

failure_tolerance_timeout pydantic-field
failure_tolerance_timeout: float = 0.0

The period in seconds to wait for the cluster to transition out of failed state if it reaches it. This state can be transient under certain conditions. With the default value of 0, the first failed state appearance will raise an exception immediately.

log_cluster_conditions pydantic-field
log_cluster_conditions: bool = True

Whether to log RayCluster conditions while waiting for the RayCluster to become ready. Learn more: KubeRay docs.


Pipes

dagster_ray.kuberay.PipesKubeRayJobClient

PipesKubeRayJobClient(
    client: RayJobClient | None = None,
    context_injector: PipesContextInjector | None = None,
    message_reader: PipesMessageReader | None = None,
    forward_termination: bool = True,
    timeout: float = 600,
    poll_interval: float = 1,
    port_forward: bool = False,
)

Bases: PipesClient, TreatAsResourceParam

A pipes client for running RayJob on Kubernetes.

Parameters:

  • context_injector (Optional[PipesContextInjector], default: None ) –

    A context injector to use to inject context into the RayJob. Defaults to PipesEnvContextInjector.

  • message_reader (Optional[PipesMessageReader], default: None ) –

    A message reader to use to read messages from the glue job run. Defaults to PipesRayJobMessageReader.

  • client (Optional[client], default: None ) –

    The Kubernetes API client.

  • forward_termination (bool, default: True ) –

    Whether to terminate the Ray job when the Dagster process receives a termination signal, or if the startup timeout is reached. Defaults to True.

  • timeout (int, default: 600 ) –

    Timeout for various internal interactions with the Kubernetes RayJob.

  • poll_interval (int, default: 1 ) –

    Interval at which to poll Kubernetes for status updates.

  • port_forward (bool, default: False ) –

    Whether to use Kubernetes port-forwarding to connect to the KubeRay cluster. Is useful when running in a local environment.

Info

Image defaults to dagster/image run tag.

Tip

Make sure ray[full] is available in the image.

Source code in src/dagster_ray/kuberay/pipes.py
def __init__(
    self,
    client: RayJobClient | None = None,
    context_injector: PipesContextInjector | None = None,
    message_reader: PipesMessageReader | None = None,
    forward_termination: bool = True,
    timeout: float = 600,
    poll_interval: float = 1,
    port_forward: bool = False,
):
    self.client: RayJobClient = client or RayJobClient()

    self._context_injector = context_injector or PipesEnvContextInjector()
    self._message_reader = message_reader or PipesRayJobMessageReader()

    self.forward_termination = check.bool_param(forward_termination, "forward_termination")
    self.timeout = check.int_param(timeout, "timeout")
    self.poll_interval = check.int_param(poll_interval, "poll_interval")
    self.port_forward = check.bool_param(port_forward, "port_forward")

    self._job_submission_client: JobSubmissionClient | None = None

Functions

run
run(
    *, context: OpOrAssetExecutionContext, ray_job: dict[str, Any], extras: PipesExtras | None = None
) -> PipesClientCompletedInvocation

Execute a RayJob, enriched with the Pipes protocol.

Parameters:

  • context (OpExecutionContext) –

    Current Dagster op or asset context.

  • ray_job (Dict[str, Any]) –

    RayJob specification. API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>_.

  • extras (Optional[Dict[str, Any]], default: None ) –

    Additional information to pass to the Pipes session.

Source code in src/dagster_ray/kuberay/pipes.py
def run(  # type: ignore
    self,
    *,
    context: OpOrAssetExecutionContext,
    ray_job: dict[str, Any],
    extras: PipesExtras | None = None,
) -> PipesClientCompletedInvocation:
    """
    Execute a RayJob, enriched with the Pipes protocol.

    Args:
        context (OpExecutionContext): Current Dagster op or asset context.
        ray_job (Dict[str, Any]): RayJob specification. `API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>`_.
        extras (Optional[Dict[str, Any]]): Additional information to pass to the Pipes session.
    """
    with open_pipes_session(
        context=context,
        message_reader=self._message_reader,
        context_injector=self._context_injector,
        extras=extras,
    ) as session:
        ray_job = self._enrich_ray_job(context, session, ray_job)
        start_response = self._start(context, session, ray_job)
        start_status = cast(RayJobStatus, start_response["status"])
        ray_job_id = start_status["jobId"]  # pyright: ignore[reportTypedDictNotRequiredAccess]

        name = ray_job["metadata"]["name"]
        namespace = ray_job["metadata"]["namespace"]

        with self.client.ray_cluster_client.job_submission_client(
            name=self.client.get_ray_cluster_name(
                name=name, namespace=namespace, timeout=self.timeout, poll_interval=self.poll_interval
            ),
            namespace=namespace,
            port_forward=self.port_forward,
        ) as job_submission_client:
            self._job_submission_client = job_submission_client

            session.report_launched(
                {
                    "extras": {
                        PIPES_LAUNCHED_EXTRAS_RAY_JOB_ID_KEY: ray_job_id,
                        PIPES_LAUNCHED_EXTRAS_RAY_ADDRESS_KEY: job_submission_client.get_address(),
                    }
                }
            )

            try:
                self._wait_for_completion(context, start_response)

                if isinstance(self._message_reader, PipesRayJobMessageReader) and self.port_forward:
                    # in this case the message reader will fail once port forwarding is finished
                    # TODO: merge https://github.com/danielgafni/dagster-ray/pull/123
                    # to avoid this work-around
                    self._message_reader.thread_ready.wait()
                    context.log.debug(
                        "[pipes] waiting for PipesRayJobMessageReader to complete before stopping port-forwarding"
                    )
                    self._message_reader.session_closed.set()
                    self._message_reader.completed.wait()

                return PipesClientCompletedInvocation(
                    session, metadata={"RayJob": f"{namespace}/{name}", "Ray Job ID": ray_job_id}
                )

            except DagsterExecutionInterruptedError:
                if self.forward_termination:
                    context.log.warning(
                        f"[pipes] Dagster process interrupted! Will terminate RayJob {namespace}/{name}."
                    )
                    self._terminate(context, start_response)
                raise

Configuration and Types

dagster_ray.kuberay.configs.RayJobConfig pydantic-model

Bases: Config

Show JSON schema:
{
  "$defs": {
    "RayClusterSpec": {
      "additionalProperties": true,
      "description": "[RayCluster spec](https://ray-project.github.io/kuberay/reference/api/#rayclusterspec) configuration options. A few sensible defaults are provided for convenience.",
      "properties": {
        "suspend": {
          "anyOf": [
            {
              "type": "boolean"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Suspend"
        },
        "managed_by": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Managed By"
        },
        "autoscaler_options": {
          "default": {
            "upscalingMode": "Default",
            "idleTimeoutSeconds": 60,
            "env": [],
            "envFrom": [],
            "resources": {
              "limits": {
                "cpu": "50m",
                "memory": "0.1Gi"
              },
              "requests": {
                "cpu": "50m",
                "memory": "0.1Gi"
              }
            }
          },
          "title": "Autoscaler Options",
          "type": "object"
        },
        "head_service_annotations": {
          "anyOf": [
            {
              "additionalProperties": {
                "type": "string"
              },
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Head Service Annotations"
        },
        "enable_in_tree_autoscaling": {
          "default": false,
          "title": "Enable In Tree Autoscaling",
          "type": "boolean"
        },
        "gcs_fault_tolerance_options": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Gcs Fault Tolerance Options"
        },
        "head_group_spec": {
          "default": {
            "serviceType": "ClusterIP",
            "rayStartParams": {},
            "metadata": {
              "annotations": {},
              "labels": {}
            },
            "template": {
              "spec": {
                "affinity": {},
                "containers": [
                  {
                    "imagePullPolicy": "Always",
                    "name": "head",
                    "volumeMounts": [
                      {
                        "mountPath": "/tmp/ray",
                        "name": "ray-logs"
                      }
                    ]
                  }
                ],
                "imagePullSecrets": [],
                "nodeSelector": {},
                "tolerations": [],
                "volumes": [
                  {
                    "emptyDir": {},
                    "name": "ray-logs"
                  }
                ]
              }
            }
          },
          "title": "Head Group Spec",
          "type": "object"
        },
        "ray_version": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Ray Version"
        },
        "worker_group_specs": {
          "default": [
            {
              "groupName": "workers",
              "replicas": 0,
              "minReplicas": 0,
              "maxReplicas": 1,
              "rayStartParams": {},
              "template": {
                "metadata": {
                  "annotations": {},
                  "labels": {}
                },
                "spec": {
                  "affinity": {},
                  "containers": [
                    {
                      "imagePullPolicy": "Always",
                      "name": "worker",
                      "volumeMounts": [
                        {
                          "mountPath": "/tmp/ray",
                          "name": "ray-logs"
                        }
                      ]
                    }
                  ],
                  "imagePullSecrets": [],
                  "nodeSelector": {},
                  "tolerations": [],
                  "volumes": [
                    {
                      "emptyDir": {},
                      "name": "ray-logs"
                    }
                  ]
                }
              }
            }
          ],
          "items": {
            "type": "object"
          },
          "title": "Worker Group Specs",
          "type": "array"
        }
      },
      "title": "RayClusterSpec",
      "type": "object"
    },
    "RayJobSpec": {
      "additionalProperties": true,
      "description": "[RayJob spec](https://ray-project.github.io/kuberay/reference/api/#rayjobspec) configuration options. A few sensible defaults are provided for convenience.",
      "properties": {
        "active_deadline_seconds": {
          "default": 86400,
          "title": "Active Deadline Seconds",
          "type": "integer"
        },
        "backoff_limit": {
          "default": 0,
          "title": "Backoff Limit",
          "type": "integer"
        },
        "ray_cluster_spec": {
          "anyOf": [
            {
              "$ref": "#/$defs/RayClusterSpec"
            },
            {
              "type": "null"
            }
          ]
        },
        "submitter_pod_template": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Submitter Pod Template"
        },
        "metadata": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Metadata"
        },
        "cluster_selector": {
          "anyOf": [
            {
              "additionalProperties": {
                "type": "string"
              },
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Cluster Selector"
        },
        "managed_by": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Managed By"
        },
        "deletion_strategy": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "title": "Deletion Strategy"
        },
        "runtime_env_yaml": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Runtime Env Yaml"
        },
        "job_id": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Job Id"
        },
        "submission_mode": {
          "default": "K8sJobMode",
          "enum": [
            "K8sJobMode",
            "HTTPMode",
            "InteractiveMode"
          ],
          "title": "Submission Mode",
          "type": "string"
        },
        "entrypoint_resources": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Entrypoint Resources"
        },
        "entrypoint_num_cpus": {
          "anyOf": [
            {
              "type": "number"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Entrypoint Num Cpus"
        },
        "entrypoint_memory": {
          "anyOf": [
            {
              "type": "number"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Entrypoint Memory"
        },
        "entrypoint_num_gpus": {
          "anyOf": [
            {
              "type": "number"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Entrypoint Num Gpus"
        },
        "ttl_seconds_after_finished": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": 300,
          "title": "Ttl Seconds After Finished"
        },
        "shutdown_after_job_finishes": {
          "default": true,
          "title": "Shutdown After Job Finishes",
          "type": "boolean"
        },
        "suspend": {
          "anyOf": [
            {
              "type": "boolean"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Suspend"
        }
      },
      "title": "RayJobSpec",
      "type": "object"
    }
  },
  "properties": {
    "kind": {
      "default": "RayJob",
      "title": "Kind",
      "type": "string"
    },
    "api_version": {
      "default": "ray.io/v1",
      "title": "Api Version",
      "type": "string"
    },
    "metadata": {
      "description": "Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
      "title": "Metadata",
      "type": "object"
    },
    "spec": {
      "$ref": "#/$defs/RayJobSpec"
    }
  },
  "title": "RayJobConfig",
  "type": "object"
}

Fields:

Attributes

metadata pydantic-field
metadata: dict[str, Any]

Kubernetes metadata, except the name field can be omitted. In this case it will be generated by dagster-ray.

spec pydantic-field
spec: RayJobSpec

Functions

to_k8s
to_k8s(
    context: AnyDagsterContext,
    image: str | None = None,
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""

    labels = labels or {}
    annotations = annotations or {}

    return {
        "apiVersion": self.api_version,
        "kind": self.kind,
        "metadata": remove_none_from_dict(
            {
                "name": self.metadata.get("name"),
                "labels": {**(self.metadata.get("labels", {}) or {}), **labels},
                "annotations": {**self.metadata.get("annotations", {}), **annotations},
            }
        ),
        "spec": self.spec.to_k8s(
            context=context,
            image=image,
            env_vars=env_vars,
        ),
    }

dagster_ray.kuberay.configs.RayJobSpec

Bases: PermissiveConfig

RayJob spec configuration options. A few sensible defaults are provided for convenience.

Attributes

ray_cluster_spec class-attribute instance-attribute
ray_cluster_spec: RayClusterSpec | None = Field(default_factory=RayClusterSpec)
entrypoint_num_cpus class-attribute instance-attribute
entrypoint_num_cpus: float | None = None
entrypoint_num_gpus class-attribute instance-attribute
entrypoint_num_gpus: float | None = None
entrypoint_memory class-attribute instance-attribute
entrypoint_memory: float | None = None
entrypoint_resources class-attribute instance-attribute
entrypoint_resources: str | None = None

Functions

to_k8s
to_k8s(
    context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""
    return remove_none_from_dict(
        {
            "activeDeadlineSeconds": self.active_deadline_seconds,
            "backoffLimit": self.backoff_limit,
            "submitterPodTemplate": self.submitter_pod_template,
            "metadata": self.metadata,
            "clusterSelector": self.cluster_selector,
            "managedBy": self.managed_by,
            "deletionStrategy": self.deletion_strategy,
            "runtimeEnvYAML": self.runtime_env_yaml,
            "jobId": self.job_id,
            "submissionMode": self.submission_mode,
            "entrypointResources": self.entrypoint_resources,
            "entrypointNumCpus": self.entrypoint_num_cpus,
            "entrypointMemory": self.entrypoint_memory,
            "entrypointNumGpus": self.entrypoint_num_gpus,
            "ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
            "shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
            "suspend": self.suspend,
            "rayClusterSpec": self.ray_cluster_spec.to_k8s(context=context, image=image, env_vars=env_vars)
            if self.ray_cluster_spec is not None
            else None,
        }
    )

dagster_ray.kuberay.resources.rayjob.InteractiveRayJobConfig pydantic-model

Bases: RayJobConfig

Same as RayJobConfig, but spec.submission_mode mode has to be InteractiveMode

Show JSON schema:
{
  "$defs": {
    "InteractiveRayJobSpec": {
      "additionalProperties": true,
      "description": "Same as [`RayJobSpec`][dagster_ray.kuberay.configs.RayJobSpec], but `mode` has to be `InteractiveMode`",
      "properties": {
        "active_deadline_seconds": {
          "default": 86400,
          "title": "Active Deadline Seconds",
          "type": "integer"
        },
        "backoff_limit": {
          "default": 0,
          "title": "Backoff Limit",
          "type": "integer"
        },
        "ray_cluster_spec": {
          "anyOf": [
            {
              "$ref": "#/$defs/RayClusterSpec"
            },
            {
              "type": "null"
            }
          ]
        },
        "submitter_pod_template": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Submitter Pod Template"
        },
        "metadata": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Metadata"
        },
        "cluster_selector": {
          "anyOf": [
            {
              "additionalProperties": {
                "type": "string"
              },
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Cluster Selector"
        },
        "managed_by": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Managed By"
        },
        "deletion_strategy": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "title": "Deletion Strategy"
        },
        "runtime_env_yaml": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Runtime Env Yaml"
        },
        "job_id": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Job Id"
        },
        "submission_mode": {
          "const": "InteractiveMode",
          "default": "InteractiveMode",
          "title": "Submission Mode",
          "type": "string"
        },
        "entrypoint_resources": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Entrypoint Resources"
        },
        "entrypoint_num_cpus": {
          "anyOf": [
            {
              "type": "number"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Entrypoint Num Cpus"
        },
        "entrypoint_memory": {
          "anyOf": [
            {
              "type": "number"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Entrypoint Memory"
        },
        "entrypoint_num_gpus": {
          "anyOf": [
            {
              "type": "number"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Entrypoint Num Gpus"
        },
        "ttl_seconds_after_finished": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": 300,
          "title": "Ttl Seconds After Finished"
        },
        "shutdown_after_job_finishes": {
          "default": true,
          "title": "Shutdown After Job Finishes",
          "type": "boolean"
        },
        "suspend": {
          "anyOf": [
            {
              "type": "boolean"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Suspend"
        }
      },
      "title": "InteractiveRayJobSpec",
      "type": "object"
    },
    "RayClusterSpec": {
      "additionalProperties": true,
      "description": "[RayCluster spec](https://ray-project.github.io/kuberay/reference/api/#rayclusterspec) configuration options. A few sensible defaults are provided for convenience.",
      "properties": {
        "suspend": {
          "anyOf": [
            {
              "type": "boolean"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Suspend"
        },
        "managed_by": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Managed By"
        },
        "autoscaler_options": {
          "default": {
            "upscalingMode": "Default",
            "idleTimeoutSeconds": 60,
            "env": [],
            "envFrom": [],
            "resources": {
              "limits": {
                "cpu": "50m",
                "memory": "0.1Gi"
              },
              "requests": {
                "cpu": "50m",
                "memory": "0.1Gi"
              }
            }
          },
          "title": "Autoscaler Options",
          "type": "object"
        },
        "head_service_annotations": {
          "anyOf": [
            {
              "additionalProperties": {
                "type": "string"
              },
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Head Service Annotations"
        },
        "enable_in_tree_autoscaling": {
          "default": false,
          "title": "Enable In Tree Autoscaling",
          "type": "boolean"
        },
        "gcs_fault_tolerance_options": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Gcs Fault Tolerance Options"
        },
        "head_group_spec": {
          "default": {
            "serviceType": "ClusterIP",
            "rayStartParams": {},
            "metadata": {
              "annotations": {},
              "labels": {}
            },
            "template": {
              "spec": {
                "affinity": {},
                "containers": [
                  {
                    "imagePullPolicy": "Always",
                    "name": "head",
                    "volumeMounts": [
                      {
                        "mountPath": "/tmp/ray",
                        "name": "ray-logs"
                      }
                    ]
                  }
                ],
                "imagePullSecrets": [],
                "nodeSelector": {},
                "tolerations": [],
                "volumes": [
                  {
                    "emptyDir": {},
                    "name": "ray-logs"
                  }
                ]
              }
            }
          },
          "title": "Head Group Spec",
          "type": "object"
        },
        "ray_version": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Ray Version"
        },
        "worker_group_specs": {
          "default": [
            {
              "groupName": "workers",
              "replicas": 0,
              "minReplicas": 0,
              "maxReplicas": 1,
              "rayStartParams": {},
              "template": {
                "metadata": {
                  "annotations": {},
                  "labels": {}
                },
                "spec": {
                  "affinity": {},
                  "containers": [
                    {
                      "imagePullPolicy": "Always",
                      "name": "worker",
                      "volumeMounts": [
                        {
                          "mountPath": "/tmp/ray",
                          "name": "ray-logs"
                        }
                      ]
                    }
                  ],
                  "imagePullSecrets": [],
                  "nodeSelector": {},
                  "tolerations": [],
                  "volumes": [
                    {
                      "emptyDir": {},
                      "name": "ray-logs"
                    }
                  ]
                }
              }
            }
          ],
          "items": {
            "type": "object"
          },
          "title": "Worker Group Specs",
          "type": "array"
        }
      },
      "title": "RayClusterSpec",
      "type": "object"
    }
  },
  "description": "Same as [`RayJobConfig`][dagster_ray.kuberay.configs.RayJobConfig], but `spec.submission_mode` mode has to be `InteractiveMode`",
  "properties": {
    "kind": {
      "default": "RayJob",
      "title": "Kind",
      "type": "string"
    },
    "api_version": {
      "default": "ray.io/v1",
      "title": "Api Version",
      "type": "string"
    },
    "metadata": {
      "description": "Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
      "title": "Metadata",
      "type": "object"
    },
    "spec": {
      "$ref": "#/$defs/InteractiveRayJobSpec"
    }
  },
  "title": "InteractiveRayJobConfig",
  "type": "object"
}

Fields:

Attributes

metadata pydantic-field
metadata: dict[str, Any]

Kubernetes metadata, except the name field can be omitted. In this case it will be generated by dagster-ray.

spec pydantic-field

Functions

to_k8s
to_k8s(
    context: AnyDagsterContext,
    image: str | None = None,
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""

    labels = labels or {}
    annotations = annotations or {}

    return {
        "apiVersion": self.api_version,
        "kind": self.kind,
        "metadata": remove_none_from_dict(
            {
                "name": self.metadata.get("name"),
                "labels": {**(self.metadata.get("labels", {}) or {}), **labels},
                "annotations": {**self.metadata.get("annotations", {}), **annotations},
            }
        ),
        "spec": self.spec.to_k8s(
            context=context,
            image=image,
            env_vars=env_vars,
        ),
    }

dagster_ray.kuberay.resources.rayjob.InteractiveRayJobSpec

Bases: RayJobSpec

Same as RayJobSpec, but mode has to be InteractiveMode

Attributes

submission_mode class-attribute instance-attribute
submission_mode: Literal['InteractiveMode'] = 'InteractiveMode'
active_deadline_seconds class-attribute instance-attribute
active_deadline_seconds: int = 60 * 60 * 24
backoff_limit class-attribute instance-attribute
backoff_limit: int = 0
ray_cluster_spec class-attribute instance-attribute
ray_cluster_spec: RayClusterSpec | None = Field(default_factory=RayClusterSpec)
submitter_pod_template class-attribute instance-attribute
submitter_pod_template: dict[str, Any] | None = None
metadata class-attribute instance-attribute
metadata: dict[str, Any] | None = None
cluster_selector class-attribute instance-attribute
cluster_selector: dict[str, str] | None = None
managed_by class-attribute instance-attribute
managed_by: str | None = None
deletion_strategy class-attribute instance-attribute
deletion_strategy: dict[str, Any] | None = Field(
    default_factory=lambda: {"onFailure": {"policy": "DeleteCluster"}, "onSuccess": {"policy": "DeleteCluster"}}
)
runtime_env_yaml class-attribute instance-attribute
runtime_env_yaml: str | None = None
job_id class-attribute instance-attribute
job_id: str | None = None
entrypoint_resources class-attribute instance-attribute
entrypoint_resources: str | None = None
entrypoint_num_cpus class-attribute instance-attribute
entrypoint_num_cpus: float | None = None
entrypoint_memory class-attribute instance-attribute
entrypoint_memory: float | None = None
entrypoint_num_gpus class-attribute instance-attribute
entrypoint_num_gpus: float | None = None
ttl_seconds_after_finished class-attribute instance-attribute
ttl_seconds_after_finished: int | None = 5 * 60
shutdown_after_job_finishes class-attribute instance-attribute
shutdown_after_job_finishes: bool = True
suspend class-attribute instance-attribute
suspend: bool | None = None

Functions

to_k8s
to_k8s(
    context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""
    return remove_none_from_dict(
        {
            "activeDeadlineSeconds": self.active_deadline_seconds,
            "backoffLimit": self.backoff_limit,
            "submitterPodTemplate": self.submitter_pod_template,
            "metadata": self.metadata,
            "clusterSelector": self.cluster_selector,
            "managedBy": self.managed_by,
            "deletionStrategy": self.deletion_strategy,
            "runtimeEnvYAML": self.runtime_env_yaml,
            "jobId": self.job_id,
            "submissionMode": self.submission_mode,
            "entrypointResources": self.entrypoint_resources,
            "entrypointNumCpus": self.entrypoint_num_cpus,
            "entrypointMemory": self.entrypoint_memory,
            "entrypointNumGpus": self.entrypoint_num_gpus,
            "ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
            "shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
            "suspend": self.suspend,
            "rayClusterSpec": self.ray_cluster_spec.to_k8s(context=context, image=image, env_vars=env_vars)
            if self.ray_cluster_spec is not None
            else None,
        }
    )

dagster_ray.kuberay.configs.RayClusterConfig pydantic-model

Bases: Config

Show JSON schema:
{
  "$defs": {
    "RayClusterSpec": {
      "additionalProperties": true,
      "description": "[RayCluster spec](https://ray-project.github.io/kuberay/reference/api/#rayclusterspec) configuration options. A few sensible defaults are provided for convenience.",
      "properties": {
        "suspend": {
          "anyOf": [
            {
              "type": "boolean"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Suspend"
        },
        "managed_by": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Managed By"
        },
        "autoscaler_options": {
          "default": {
            "upscalingMode": "Default",
            "idleTimeoutSeconds": 60,
            "env": [],
            "envFrom": [],
            "resources": {
              "limits": {
                "cpu": "50m",
                "memory": "0.1Gi"
              },
              "requests": {
                "cpu": "50m",
                "memory": "0.1Gi"
              }
            }
          },
          "title": "Autoscaler Options",
          "type": "object"
        },
        "head_service_annotations": {
          "anyOf": [
            {
              "additionalProperties": {
                "type": "string"
              },
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Head Service Annotations"
        },
        "enable_in_tree_autoscaling": {
          "default": false,
          "title": "Enable In Tree Autoscaling",
          "type": "boolean"
        },
        "gcs_fault_tolerance_options": {
          "anyOf": [
            {
              "type": "object"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Gcs Fault Tolerance Options"
        },
        "head_group_spec": {
          "default": {
            "serviceType": "ClusterIP",
            "rayStartParams": {},
            "metadata": {
              "annotations": {},
              "labels": {}
            },
            "template": {
              "spec": {
                "affinity": {},
                "containers": [
                  {
                    "imagePullPolicy": "Always",
                    "name": "head",
                    "volumeMounts": [
                      {
                        "mountPath": "/tmp/ray",
                        "name": "ray-logs"
                      }
                    ]
                  }
                ],
                "imagePullSecrets": [],
                "nodeSelector": {},
                "tolerations": [],
                "volumes": [
                  {
                    "emptyDir": {},
                    "name": "ray-logs"
                  }
                ]
              }
            }
          },
          "title": "Head Group Spec",
          "type": "object"
        },
        "ray_version": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Ray Version"
        },
        "worker_group_specs": {
          "default": [
            {
              "groupName": "workers",
              "replicas": 0,
              "minReplicas": 0,
              "maxReplicas": 1,
              "rayStartParams": {},
              "template": {
                "metadata": {
                  "annotations": {},
                  "labels": {}
                },
                "spec": {
                  "affinity": {},
                  "containers": [
                    {
                      "imagePullPolicy": "Always",
                      "name": "worker",
                      "volumeMounts": [
                        {
                          "mountPath": "/tmp/ray",
                          "name": "ray-logs"
                        }
                      ]
                    }
                  ],
                  "imagePullSecrets": [],
                  "nodeSelector": {},
                  "tolerations": [],
                  "volumes": [
                    {
                      "emptyDir": {},
                      "name": "ray-logs"
                    }
                  ]
                }
              }
            }
          ],
          "items": {
            "type": "object"
          },
          "title": "Worker Group Specs",
          "type": "array"
        }
      },
      "title": "RayClusterSpec",
      "type": "object"
    }
  },
  "properties": {
    "kind": {
      "default": "RayCluster",
      "title": "Kind",
      "type": "string"
    },
    "api_version": {
      "default": "ray.io/v1",
      "title": "Api Version",
      "type": "string"
    },
    "metadata": {
      "description": "Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
      "title": "Metadata",
      "type": "object"
    },
    "spec": {
      "$ref": "#/$defs/RayClusterSpec"
    }
  },
  "title": "RayClusterConfig",
  "type": "object"
}

Fields:

Attributes

metadata pydantic-field
metadata: dict[str, Any]

Kubernetes metadata, except the name field can be omitted. In this case it will be generated by dagster-ray.

spec pydantic-field

Functions

to_k8s
to_k8s(
    context: AnyDagsterContext,
    image: str | None = None,
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    assert context.log is not None
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""

    labels = labels or {}
    annotations = annotations or {}

    return {
        "apiVersion": self.api_version,
        "kind": self.kind,
        "metadata": remove_none_from_dict(
            {
                "name": self.metadata.get("name"),
                "labels": {**(self.metadata.get("labels", {}) or {}), **labels},
                "annotations": {**self.metadata.get("annotations", {}), **annotations},
            }
        ),
        "spec": self.spec.to_k8s(context=context, image=image, env_vars=env_vars),
    }

dagster_ray.kuberay.configs.RayClusterSpec

Bases: PermissiveConfig

RayCluster spec configuration options. A few sensible defaults are provided for convenience.

Attributes

suspend class-attribute instance-attribute
suspend: bool | None = None
managed_by class-attribute instance-attribute
managed_by: str | None = None
autoscaler_options class-attribute instance-attribute
autoscaler_options: dict[str, Any] = DEFAULT_AUTOSCALER_OPTIONS
head_service_annotations class-attribute instance-attribute
head_service_annotations: dict[str, str] | None = None
enable_in_tree_autoscaling class-attribute instance-attribute
enable_in_tree_autoscaling: bool = False
gcs_fault_tolerance_options class-attribute instance-attribute
gcs_fault_tolerance_options: dict[str, Any] | None = None
head_group_spec class-attribute instance-attribute
head_group_spec: dict[str, Any] = DEFAULT_HEAD_GROUP_SPEC
ray_version class-attribute instance-attribute
ray_version: str | None = None
worker_group_specs class-attribute instance-attribute
worker_group_specs: list[dict[str, Any]] = DEFAULT_WORKER_GROUP_SPECS

Functions

to_k8s
to_k8s(
    context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""

    assert context.log is not None

    # TODO: inject self.redis_port and self.dashboard_port into the RayCluster configuration
    # TODO: auto-apply some tags from dagster-k8s/config

    head_group_spec = self.head_group_spec.copy()
    worker_group_specs = self.worker_group_specs.copy()

    k8s_env_vars: list[dict[str, Any]] = []

    if env_vars:
        for key, value in env_vars.items():
            k8s_env_vars.append({"name": key, "value": value})

    def update_group_spec(group_spec: dict[str, Any]):
        # TODO: only inject if the container has a `dagster.io/inject-image` annotation or smth
        if group_spec["template"]["spec"]["containers"][0].get("image") is None:
            if image is None:
                raise ValueError(MISSING_IMAGE_MESSAGE)
            else:
                group_spec["template"]["spec"]["containers"][0]["image"] = image

        for container in group_spec["template"]["spec"]["containers"]:
            container["env"] = container.get("env", []) + k8s_env_vars

    update_group_spec(head_group_spec)
    for worker_group_spec in worker_group_specs:
        update_group_spec(worker_group_spec)

    return remove_none_from_dict(
        {
            "enableInTreeAutoscaling": self.enable_in_tree_autoscaling,
            "autoscalerOptions": self.autoscaler_options,
            "headGroupSpec": head_group_spec,
            "workerGroupSpecs": worker_group_specs,
            "suspend": self.suspend,
            "managedBy": self.managed_by,
            "headServiceAnnotations": self.head_service_annotations,
            "gcsFaultToleranceOptions": self.gcs_fault_tolerance_options,
            "rayVersion": self.ray_version,
        }
    )

dagster_ray.kuberay.configs.MatchDagsterLabels pydantic-model

Bases: Config

Show JSON schema:
{
  "properties": {
    "cluster_sharing": {
      "default": true,
      "description": "Whether to match on `dagster/cluster-sharing=true` label.",
      "title": "Cluster Sharing",
      "type": "boolean"
    },
    "code_location": {
      "default": true,
      "description": "Whether to match on `dagster/code-location` label. The value will be taken from the current Dagster code location.",
      "title": "Code Location",
      "type": "boolean"
    },
    "resource_key": {
      "default": true,
      "description": "Whether to match on `dagster/resource-key` label. The value will be taken from the current Dagster resource key.",
      "title": "Resource Key",
      "type": "boolean"
    },
    "git_sha": {
      "default": true,
      "description": "Whether to match on `dagster/git-sha` label. The value will be taken from `DAGSTER_CLOUD_GIT_SHA` environment variable.",
      "title": "Git Sha",
      "type": "boolean"
    },
    "run_id": {
      "default": false,
      "description": "Whether to match on `dagster/run-id` label. The value will be taken from the current Dagster run ID.",
      "title": "Run Id",
      "type": "boolean"
    }
  },
  "title": "MatchDagsterLabels",
  "type": "object"
}

Fields:

Attributes

cluster_sharing pydantic-field
cluster_sharing: bool = True

Whether to match on dagster/cluster-sharing=true label.

code_location pydantic-field
code_location: bool = True

Whether to match on dagster/code-location label. The value will be taken from the current Dagster code location.

resource_key pydantic-field
resource_key: bool = True

Whether to match on dagster/resource-key label. The value will be taken from the current Dagster resource key.

git_sha pydantic-field
git_sha: bool = True

Whether to match on dagster/git-sha label. The value will be taken from DAGSTER_CLOUD_GIT_SHA environment variable.

run_id pydantic-field
run_id: bool = False

Whether to match on dagster/run-id label. The value will be taken from the current Dagster run ID.

dagster_ray.kuberay.configs.ClusterSharing pydantic-model

Bases: Config

Defines the strategy for sharing RayCluster resources with other Dagster steps.

By default, the cluster is expected to be created by Dagster during one of the previously executed steps.

Show JSON schema:
{
  "$defs": {
    "MatchDagsterLabels": {
      "properties": {
        "cluster_sharing": {
          "default": true,
          "description": "Whether to match on `dagster/cluster-sharing=true` label.",
          "title": "Cluster Sharing",
          "type": "boolean"
        },
        "code_location": {
          "default": true,
          "description": "Whether to match on `dagster/code-location` label. The value will be taken from the current Dagster code location.",
          "title": "Code Location",
          "type": "boolean"
        },
        "resource_key": {
          "default": true,
          "description": "Whether to match on `dagster/resource-key` label. The value will be taken from the current Dagster resource key.",
          "title": "Resource Key",
          "type": "boolean"
        },
        "git_sha": {
          "default": true,
          "description": "Whether to match on `dagster/git-sha` label. The value will be taken from `DAGSTER_CLOUD_GIT_SHA` environment variable.",
          "title": "Git Sha",
          "type": "boolean"
        },
        "run_id": {
          "default": false,
          "description": "Whether to match on `dagster/run-id` label. The value will be taken from the current Dagster run ID.",
          "title": "Run Id",
          "type": "boolean"
        }
      },
      "title": "MatchDagsterLabels",
      "type": "object"
    }
  },
  "description": "Defines the strategy for sharing `RayCluster` resources with other Dagster steps.\n\nBy default, the cluster is expected to be created by Dagster during one of the previously executed steps.",
  "properties": {
    "enabled": {
      "default": false,
      "description": "Whether to enable sharing of RayClusters.",
      "title": "Enabled",
      "type": "boolean"
    },
    "match_dagster_labels": {
      "$ref": "#/$defs/MatchDagsterLabels",
      "description": "Configuration for matching on Dagster-generated labels."
    },
    "match_labels": {
      "anyOf": [
        {
          "additionalProperties": {
            "type": "string"
          },
          "type": "object"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Additional user-provided labels to match on.",
      "title": "Match Labels"
    },
    "ttl_seconds": {
      "default": 1800.0,
      "description": "Time to live for the lock placed on the `RayCluster` resource, marking it as in use by the current Dagster step.",
      "title": "Ttl Seconds",
      "type": "number"
    }
  },
  "title": "ClusterSharing",
  "type": "object"
}

Fields:

Attributes

enabled pydantic-field
enabled: bool = False

Whether to enable sharing of RayClusters.

match_dagster_labels pydantic-field
match_dagster_labels: MatchDagsterLabels

Configuration for matching on Dagster-generated labels.

match_labels pydantic-field
match_labels: dict[str, str] | None = None

Additional user-provided labels to match on.

ttl_seconds pydantic-field
ttl_seconds: float = DEFAULT_CLUSTER_SHARING_TTL_SECONDS

Time to live for the lock placed on the RayCluster resource, marking it as in use by the current Dagster step.

--

dagster_ray.kuberay.resources.base.BaseKubeRayResource pydantic-model

Bases: RayResource, ABC

Show JSON schema:
{
  "$defs": {
    "ExecutionOptionsConfig": {
      "properties": {
        "cpu": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Cpu"
        },
        "gpu": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Gpu"
        },
        "object_store_memory": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Object Store Memory"
        }
      },
      "title": "ExecutionOptionsConfig",
      "type": "object"
    },
    "Lifecycle": {
      "properties": {
        "create": {
          "default": true,
          "description": "Whether to create the resource. If set to `False`, the user can manually call `.create` instead.",
          "title": "Create",
          "type": "boolean"
        },
        "wait": {
          "default": true,
          "description": "Whether to wait for the remote Ray cluster to become ready to accept connections. If set to `False`, the user can manually call `.wait` instead.",
          "title": "Wait",
          "type": "boolean"
        },
        "connect": {
          "default": true,
          "description": "Whether to run `ray.init` against the remote Ray cluster. If set to `False`, the user can manually call `.connect` instead.",
          "title": "Connect",
          "type": "boolean"
        },
        "cleanup": {
          "default": "always",
          "description": "Resource cleanup policy. Determines when the resource should be deleted after Dagster step execution or during interruption.",
          "enum": [
            "never",
            "always",
            "on_exception"
          ],
          "title": "Cleanup",
          "type": "string"
        }
      },
      "title": "Lifecycle",
      "type": "object"
    },
    "RayDataExecutionOptions": {
      "properties": {
        "execution_options": {
          "$ref": "#/$defs/ExecutionOptionsConfig"
        },
        "cpu_limit": {
          "default": 5000,
          "title": "Cpu Limit",
          "type": "integer"
        },
        "gpu_limit": {
          "default": 0,
          "title": "Gpu Limit",
          "type": "integer"
        },
        "verbose_progress": {
          "default": true,
          "title": "Verbose Progress",
          "type": "boolean"
        },
        "use_polars": {
          "default": true,
          "title": "Use Polars",
          "type": "boolean"
        }
      },
      "title": "RayDataExecutionOptions",
      "type": "object"
    }
  },
  "properties": {
    "lifecycle": {
      "$ref": "#/$defs/Lifecycle",
      "description": "Actions to perform during resource setup."
    },
    "timeout": {
      "default": 600.0,
      "description": "Timeout for Ray readiness in seconds",
      "title": "Timeout",
      "type": "number"
    },
    "ray_init_options": {
      "description": "Additional keyword arguments to pass to `ray.init()` call, such as `runtime_env`, `num_cpus`, etc. Dagster's `EnvVar` is supported. More details in [Ray docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).",
      "title": "Ray Init Options",
      "type": "object"
    },
    "data_execution_options": {
      "$ref": "#/$defs/RayDataExecutionOptions"
    },
    "redis_port": {
      "default": 10001,
      "description": "Redis port for connection. Make sure to match with the actual available port.",
      "title": "Redis Port",
      "type": "integer"
    },
    "dashboard_port": {
      "default": 8265,
      "description": "Dashboard port for connection. Make sure to match with the actual available port.",
      "title": "Dashboard Port",
      "type": "integer"
    },
    "env_vars": {
      "anyOf": [
        {
          "additionalProperties": {
            "type": "string"
          },
          "type": "object"
        },
        {
          "type": "null"
        }
      ],
      "description": "Environment variables to pass to the Ray cluster.",
      "title": "Env Vars"
    },
    "enable_tracing": {
      "default": false,
      "description": "Enable tracing: inject `RAY_PROFILING=1` and `RAY_task_events_report_interval_ms=0` into the Ray cluster configuration. This allows using `ray.timeline()` to fetch recorded task events. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.timeline.html#ray-timeline)",
      "title": "Enable Tracing",
      "type": "boolean"
    },
    "enable_actor_task_logging": {
      "default": false,
      "description": "Enable actor task logging: inject `RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1` into the Ray cluster configuration.",
      "title": "Enable Actor Task Logging",
      "type": "boolean"
    },
    "enable_debug_post_mortem": {
      "default": false,
      "description": "Enable post-mortem debugging: inject `RAY_DEBUG_POST_MORTEM=1` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/ray-distributed-debugger.html)",
      "title": "Enable Debug Post Mortem",
      "type": "boolean"
    },
    "enable_legacy_debugger": {
      "default": false,
      "description": "Enable legacy debugger: inject `RAY_DEBUG=legacy` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/user-guides/debug-apps/ray-debugging.html#using-the-ray-debugger)",
      "title": "Enable Legacy Debugger",
      "type": "boolean"
    },
    "image": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Image to inject into the `RayCluster` spec. Defaults to `dagster/image` run tag. Images already provided in the `RayCluster` spec won't be overridden.",
      "title": "Image"
    },
    "deployment_name": {
      "default": "dev",
      "description": "Dagster deployment name. Is used as a prefix for the Kubernetes resource name. Dagster Cloud variables are used to determine the default value.",
      "title": "Deployment Name",
      "type": "string"
    },
    "failure_tolerance_timeout": {
      "default": 0.0,
      "description": "The period in seconds to wait for the cluster to transition out of `failed` state if it reaches it. This state can be transient under certain conditions. With the default value of 0, the first `failed` state appearance will raise an exception immediately.",
      "title": "Failure Tolerance Timeout",
      "type": "number"
    },
    "poll_interval": {
      "default": 1.0,
      "description": "Poll interval for various API requests",
      "title": "Poll Interval",
      "type": "number"
    }
  },
  "title": "BaseKubeRayResource",
  "type": "object"
}

Fields:

Attributes

image pydantic-field
image: str | None = None

Image to inject into the RayCluster spec. Defaults to dagster/image run tag. Images already provided in the RayCluster spec won't be overridden.

deployment_name pydantic-field
deployment_name: str = DEFAULT_DEPLOYMENT_NAME

Dagster deployment name. Is used as a prefix for the Kubernetes resource name. Dagster Cloud variables are used to determine the default value.

poll_interval pydantic-field
poll_interval: float = 1.0

Poll interval for various API requests


Resources

dagster_ray.kuberay.KubeRayJobClientResource pydantic-model

Bases: ConfigurableResource[RayJobClient]

This configurable resource provides a dagster_ray.kuberay.client.RayJobClient.

Show JSON schema:
{
  "description": "This configurable resource provides a [dagster_ray.kuberay.client.RayJobClient][].",
  "properties": {
    "kube_context": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "title": "Kube Context"
    },
    "kube_config": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "title": "Kube Config"
    }
  },
  "title": "KubeRayJobClientResource",
  "type": "object"
}

Fields:

  • kube_context (str | None)
  • kube_config (str | None)

dagster_ray.kuberay.KubeRayClusterClientResource pydantic-model

Bases: ConfigurableResource[RayClusterClient]

This configurable resource provides a dagster_ray.kuberay.client.RayClusterClient.

Show JSON schema:
{
  "description": "This configurable resource provides a [dagster_ray.kuberay.client.RayClusterClient][].",
  "properties": {
    "kube_context": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "title": "Kube Context"
    },
    "kube_config": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "title": "Kube Config"
    }
  },
  "title": "KubeRayClusterClientResource",
  "type": "object"
}

Fields:

  • kube_context (str | None)
  • kube_config (str | None)

Sensors

dagster_ray.kuberay.sensors.cleanup_expired_kuberay_clusters

cleanup_expired_kuberay_clusters(
    context: SensorEvaluationContext, raycluster_client: ResourceParam[RayClusterClient]
) -> Generator[RunRequest | SkipReason, None, None]
Source code in src/dagster_ray/kuberay/sensors.py
@dg.sensor(job=delete_kuberay_clusters, minimum_interval_seconds=5 * 60)
def cleanup_expired_kuberay_clusters(
    context: dg.SensorEvaluationContext,
    raycluster_client: dg.ResourceParam[RayClusterClient],
) -> Generator[dg.RunRequest | dg.SkipReason, None, None]:
    f"""A Dagster sensor that monitors shared `RayCluster` resources created by the current code location and submits jobs to delete clusters that either:
        - use [Cluster Sharing](../tutorial/#cluster-sharing) (`dagster/cluster-sharing=true`) and have expired
        - are older than `DAGSTER_RAY_CLUSTER_EXPIRATION_SECONDS` (defaults to 4 hours)

    By default it monitors the `ray` namespace. This can be configured by setting `{DAGSTER_RAY_NAMESPACES_ENV_VAR}` (accepts a comma-separated list of namespaces)."""
    assert context.code_location_origin is not None

    found_any = False
    namespaces = os.environ.get(DAGSTER_RAY_NAMESPACES_ENV_VAR, "ray").split(",")
    expiration_seconds = int(
        os.environ.get(
            DAGSTER_RAY_CLUSTER_EXPIRATION_SECONDS_ENV_VAR, DAGSTER_RAY_CLUSTER_EXPIRATION_SECONDS_DEFAULT_VALUE
        )
    )

    for namespace in namespaces:
        cluster_names = []
        for cluster in raycluster_client.list(
            namespace=namespace,
            label_selector=f"dagster/code-location={context.code_location_origin.location_name}",
        ).get("items", []):
            if cluster["metadata"].get("labels", {}).get("dagster/cluster-sharing") == "true":
                locks = ClusterSharingLock.parse_all_locks(
                    cast(dict[str, str], cluster.get("metadata", {}).get("annotations", {}))
                )
                alive_locks = ClusterSharingLock.get_alive_locks(locks)
                if not alive_locks:
                    context.log.info(
                        f"Found expired RayCluster with cluster sharing enabled: {cluster['metadata']['namespace']}/{cluster['metadata']['name']}"
                    )
                    cluster_names.append(cluster["metadata"]["name"])
            else:
                # check if the cluster age since creation time exceeds expiration_seconds

                if cluster.get("metadata", {}).get("creationTimestamp"):
                    cluster_age = datetime.now(timezone.utc) - datetime.strptime(
                        cluster["metadata"]["creationTimestamp"], "%Y-%m-%dT%H:%M:%SZ"
                    ).replace(tzinfo=timezone.utc)
                    if cluster_age.total_seconds() >= expiration_seconds:
                        context.log.info(
                            f"Found expired RayCluster (time since creation exceeds {expiration_seconds} seconds): {cluster['metadata']['namespace']}/{cluster['metadata']['name']}"
                        )
                        cluster_names.append(cluster["metadata"]["name"])

        if len(cluster_names) > 0:
            found_any = True
            yield dg.RunRequest(
                run_config=dg.RunConfig(
                    ops={
                        "delete_kuberay_clusters_op": DeleteKubeRayClustersConfig(
                            namespace=namespace,
                            clusters=[RayClusterRef(name=name) for name in cluster_names],
                        )
                    }
                )
            )

    if not found_any:
        yield dg.SkipReason(f"No expired RayClusters found in namespaces: {namespaces}")

A Dagster sensor that monitors shared RayCluster resources created by the current Dagster code location (with a dagster/code-location=<current-code-location> label selector) and submits jobs to delete clusters either: - use Cluster Sharing (dagster/cluster-sharing=true) and have expired - are older than DAGSTER_RAY_CLUSTER_EXPIRATION_SECONDS (defaults to 4 hours)

By default it monitors the ray namespace. This can be configured by setting DAGSTER_RAY_NAMESPACES (accepts a comma-separated list of namespaces).


Kubernetes API Clients

dagster_ray.kuberay.client.RayClusterClient

RayClusterClient(kube_config: str | None = None, kube_context: str | None = None, api_client: ApiClient | None = None)

Bases: BaseKubeRayClient[RayClusterStatus]

Source code in src/dagster_ray/kuberay/client/raycluster/client.py
def __init__(
    self,
    kube_config: str | None = None,
    kube_context: str | None = None,
    api_client: ApiClient | None = None,
) -> None:
    self.kube_config = kube_config
    self.kube_context = kube_context

    # note: this call must happen BEFORE creating the api clients
    if api_client is None:
        load_kubeconfig(context=self.kube_context, config_file=self.kube_config)

    super().__init__(group=GROUP, version=VERSION, kind=KIND, plural=PLURAL, api_client=api_client)

Functions

create
create(body: dict[str, Any], namespace: str) -> Any
Source code in src/dagster_ray/kuberay/client/base.py
def create(self, body: dict[str, Any], namespace: str) -> Any:
    return self._api.create_namespaced_custom_object(
        group=self.group,
        version=body.get("apiVersion", f"{self.group}/{self.version}").split("/")[1],
        plural=self.plural,
        body=body,
        namespace=namespace,
    )
delete
delete(name: str, namespace: str)
Source code in src/dagster_ray/kuberay/client/base.py
def delete(self, name: str, namespace: str):
    return self._api.delete_namespaced_custom_object(
        group=self.group,
        version=self.version,
        plural=self.plural,
        name=name,
        namespace=namespace,
    )
get
get(name: str, namespace: str) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/client/base.py
def get(self, name: str, namespace: str) -> dict[str, Any]:
    from kubernetes.client import ApiException

    try:
        resource: Any = self._api.get_namespaced_custom_object(
            group=self.group,
            version=self.version,
            plural=self.plural,
            name=name,
            namespace=namespace,
        )
        return resource
    except ApiException as e:
        if e.status == 404:
            return {}
        raise
list
list(namespace: str, label_selector: str = '', async_req: bool = False) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/client/base.py
def list(self, namespace: str, label_selector: str = "", async_req: bool = False) -> dict[str, Any]:
    from kubernetes.client import ApiException

    try:
        resource: Any = self._api.list_namespaced_custom_object(
            group=self.group,
            version=self.version,
            plural=self.plural,
            namespace=namespace,
            label_selector=label_selector,
            async_req=async_req,
        )
        if "items" in resource:
            return resource
        else:
            return {}
    except ApiException as e:
        if e.status == 404:
            return {}

        raise
update
update(name: str, namespace: str, body: Any)
Source code in src/dagster_ray/kuberay/client/base.py
def update(self, name: str, namespace: str, body: Any):
    return self._api.patch_namespaced_custom_object(
        group=self.group,
        version=self.version,
        plural=self.plural,
        name=name,
        body=body,
        namespace=namespace,
    )

dagster_ray.kuberay.client.RayJobClient

RayJobClient(kube_config: str | None = None, kube_context: str | None = None, api_client: ApiClient | None = None)

Bases: BaseKubeRayClient[RayJobStatus]

Source code in src/dagster_ray/kuberay/client/rayjob/client.py
def __init__(
    self,
    kube_config: str | None = None,
    kube_context: str | None = None,
    api_client: ApiClient | None = None,
) -> None:
    self.kube_config = kube_config
    self.kube_context = kube_context

    # this call must happen BEFORE creating K8s apis
    if api_client is None:
        load_kubeconfig(config_file=kube_config, context=kube_context)

    super().__init__(group=GROUP, version=VERSION, kind=KIND, plural=PLURAL, api_client=api_client)

Functions

create
create(body: dict[str, Any], namespace: str) -> Any
Source code in src/dagster_ray/kuberay/client/base.py
def create(self, body: dict[str, Any], namespace: str) -> Any:
    return self._api.create_namespaced_custom_object(
        group=self.group,
        version=body.get("apiVersion", f"{self.group}/{self.version}").split("/")[1],
        plural=self.plural,
        body=body,
        namespace=namespace,
    )
delete
delete(name: str, namespace: str)
Source code in src/dagster_ray/kuberay/client/base.py
def delete(self, name: str, namespace: str):
    return self._api.delete_namespaced_custom_object(
        group=self.group,
        version=self.version,
        plural=self.plural,
        name=name,
        namespace=namespace,
    )
get
get(name: str, namespace: str) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/client/base.py
def get(self, name: str, namespace: str) -> dict[str, Any]:
    from kubernetes.client import ApiException

    try:
        resource: Any = self._api.get_namespaced_custom_object(
            group=self.group,
            version=self.version,
            plural=self.plural,
            name=name,
            namespace=namespace,
        )
        return resource
    except ApiException as e:
        if e.status == 404:
            return {}
        raise
list
list(namespace: str, label_selector: str = '', async_req: bool = False) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/client/base.py
def list(self, namespace: str, label_selector: str = "", async_req: bool = False) -> dict[str, Any]:
    from kubernetes.client import ApiException

    try:
        resource: Any = self._api.list_namespaced_custom_object(
            group=self.group,
            version=self.version,
            plural=self.plural,
            namespace=namespace,
            label_selector=label_selector,
            async_req=async_req,
        )
        if "items" in resource:
            return resource
        else:
            return {}
    except ApiException as e:
        if e.status == 404:
            return {}

        raise
update
update(name: str, namespace: str, body: Any)
Source code in src/dagster_ray/kuberay/client/base.py
def update(self, name: str, namespace: str, body: Any):
    return self._api.patch_namespaced_custom_object(
        group=self.group,
        version=self.version,
        plural=self.plural,
        name=name,
        body=body,
        namespace=namespace,
    )