Skip to content

KubeRay API Reference

KubeRay integration components for running Ray on Kubernetes. Learn how to use it here.


Client Mode Resources

These resources initialize Ray client connection with a remote cluster.

dagster_ray.kuberay.KubeRayInteractiveJob pydantic-model

Bases: RayResource, BaseKubeRayResourceConfig

Provides a Ray Job for Dagster steps.

Is the recommended way to run Ray workloads with automatic cluster management. It creates a Ray Job, connects to it in client mode and sets the jobId field. Cleanup is handled by the KubeRay controller or by the resource lifecycle logic.

Info

Image defaults to dagster/image run tag.

Tip

Make sure ray[full] is available in the image.

Fields:

Attributes

lifecycle class-attribute instance-attribute
lifecycle: Lifecycle = Field(
    default_factory=lambda: Lifecycle(cleanup="on_exception"), description="Actions to perform during resource setup."
)
ray_job class-attribute instance-attribute
ray_job: InteractiveRayJobConfig = Field(
    default_factory=InteractiveRayJobConfig, description="Configuration for the Kubernetes `RayJob` CR"
)
client class-attribute instance-attribute
client: ResourceDependency[RayJobClient] = Field(
    default_factory=KubeRayJobClientResource, description="Kubernetes `RayJob` client"
)
log_cluster_conditions class-attribute instance-attribute
log_cluster_conditions: bool = Field(
    default=True,
    description="Whether to log `RayCluster` conditions while waiting for the RayCluster to become ready. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/cluster/kubernetes/user-guides/observability.html#raycluster-status-conditions).",
)

dagster_ray.kuberay.KubeRayCluster pydantic-model

Bases: BaseKubeRayResourceConfig, RayResource

Provides a Ray Cluster for Dagster steps.

It is advised to use KubeRayInteractiveJob with KubeRay >= 1.3.0 instead.

Info

Image defaults to dagster/image run tag.

Tip

Make sure ray[full] is available in the image.

Fields:

Attributes

lifecycle class-attribute instance-attribute
lifecycle: Lifecycle = Field(
    default_factory=lambda: Lifecycle(cleanup="always"), description="Actions to perform during resource setup."
)
ray_cluster class-attribute instance-attribute
ray_cluster: RayClusterConfig = Field(
    default_factory=RayClusterConfig, description="Kubernetes `RayCluster` CR configuration."
)
client class-attribute instance-attribute
client: ResourceDependency[RayClusterClient] = Field(
    default_factory=KubeRayClusterClientResource, description="Kubernetes `RayCluster` client"
)
log_cluster_conditions class-attribute instance-attribute
log_cluster_conditions: bool = Field(
    default=True,
    description="Whether to log RayCluster conditions while waiting for the RayCluster to become ready. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/cluster/kubernetes/user-guides/observability.html#raycluster-status-conditions).",
)

Pipes

dagster_ray.kuberay.PipesKubeRayJobClient

PipesKubeRayJobClient(
    client: RayJobClient | None = None,
    context_injector: PipesContextInjector | None = None,
    message_reader: PipesMessageReader | None = None,
    forward_termination: bool = True,
    timeout: float = 600,
    poll_interval: float = 1,
    port_forward: bool = False,
)

Bases: PipesClient, TreatAsResourceParam

A pipes client for running RayJob on Kubernetes.

Parameters:

  • context_injector (Optional[PipesContextInjector], default: None ) –

    A context injector to use to inject context into the RayJob. Defaults to PipesEnvContextInjector.

  • message_reader (Optional[PipesMessageReader], default: None ) –

    A message reader to use to read messages from the glue job run. Defaults to PipesRayJobMessageReader.

  • client (Optional[client], default: None ) –

    The Kubernetes API client.

  • forward_termination (bool, default: True ) –

    Whether to terminate the Ray job when the Dagster process receives a termination signal, or if the startup timeout is reached. Defaults to True.

  • timeout (int, default: 600 ) –

    Timeout for various internal interactions with the Kubernetes RayJob.

  • poll_interval (int, default: 1 ) –

    Interval at which to poll Kubernetes for status updates.

  • port_forward (bool, default: False ) –

    Whether to use Kubernetes port-forwarding to connect to the KubeRay cluster. Is useful when running in a local environment.

Info

Image defaults to dagster/image run tag.

Tip

Make sure ray[full] is available in the image.

Source code in src/dagster_ray/kuberay/pipes.py
def __init__(
    self,
    client: RayJobClient | None = None,
    context_injector: PipesContextInjector | None = None,
    message_reader: PipesMessageReader | None = None,
    forward_termination: bool = True,
    timeout: float = 600,
    poll_interval: float = 1,
    port_forward: bool = False,
):
    self.client: RayJobClient = client or RayJobClient()

    self._context_injector = context_injector or PipesEnvContextInjector()
    self._message_reader = message_reader or PipesRayJobMessageReader()

    self.forward_termination = check.bool_param(forward_termination, "forward_termination")
    self.timeout = check.int_param(timeout, "timeout")
    self.poll_interval = check.int_param(poll_interval, "poll_interval")
    self.port_forward = check.bool_param(port_forward, "port_forward")

    self._job_submission_client: JobSubmissionClient | None = None

Functions

run
run(
    *, context: OpOrAssetExecutionContext, ray_job: dict[str, Any], extras: PipesExtras | None = None
) -> PipesClientCompletedInvocation

Execute a RayJob, enriched with the Pipes protocol.

Parameters:

  • context (OpExecutionContext) –

    Current Dagster op or asset context.

  • ray_job (Dict[str, Any]) –

    RayJob specification. API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>_.

  • extras (Optional[Dict[str, Any]], default: None ) –

    Additional information to pass to the Pipes session.

Source code in src/dagster_ray/kuberay/pipes.py
def run(  # type: ignore
    self,
    *,
    context: OpOrAssetExecutionContext,
    ray_job: dict[str, Any],
    extras: PipesExtras | None = None,
) -> PipesClientCompletedInvocation:
    """
    Execute a RayJob, enriched with the Pipes protocol.

    Args:
        context (OpExecutionContext): Current Dagster op or asset context.
        ray_job (Dict[str, Any]): RayJob specification. `API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>`_.
        extras (Optional[Dict[str, Any]]): Additional information to pass to the Pipes session.
    """
    with open_pipes_session(
        context=context,
        message_reader=self._message_reader,
        context_injector=self._context_injector,
        extras=extras,
    ) as session:
        ray_job = self._enrich_ray_job(context, session, ray_job)
        start_response = self._start(context, session, ray_job)
        start_status = cast(RayJobStatus, start_response["status"])
        ray_job_id = start_status["jobId"]  # pyright: ignore[reportTypedDictNotRequiredAccess]

        name = ray_job["metadata"]["name"]
        namespace = ray_job["metadata"]["namespace"]

        with self.client.ray_cluster_client.job_submission_client(
            name=self.client.get_ray_cluster_name(
                name=name, namespace=namespace, timeout=self.timeout, poll_interval=self.poll_interval
            ),
            namespace=namespace,
            port_forward=self.port_forward,
        ) as job_submission_client:
            self._job_submission_client = job_submission_client

            session.report_launched(
                {
                    "extras": {
                        PIPES_LAUNCHED_EXTRAS_RAY_JOB_ID_KEY: ray_job_id,
                        PIPES_LAUNCHED_EXTRAS_RAY_ADDRESS_KEY: job_submission_client.get_address(),
                    }
                }
            )

            try:
                self._wait_for_completion(context, start_response)

                if isinstance(self._message_reader, PipesRayJobMessageReader) and self.port_forward:
                    # in this case the message reader will fail once port forwarding is finished
                    # TODO: merge https://github.com/danielgafni/dagster-ray/pull/123
                    # to avoid this work-around
                    self._message_reader.thread_ready.wait()
                    context.log.debug(
                        "[pipes] waiting for PipesRayJobMessageReader to complete before stopping port-forwarding"
                    )
                    self._message_reader.session_closed.set()
                    self._message_reader.completed.wait()

                return PipesClientCompletedInvocation(
                    session, metadata={"RayJob": f"{namespace}/{name}", "Ray Job ID": ray_job_id}
                )

            except DagsterExecutionInterruptedError:
                if self.forward_termination:
                    context.log.warning(
                        f"[pipes] Dagster process interrupted! Will terminate RayJob {namespace}/{name}."
                    )
                    self._terminate(context, start_response)
                raise

Configuration and Types

dagster_ray.kuberay.configs.RayJobConfig pydantic-model

Bases: Config

Attributes

metadata class-attribute instance-attribute
metadata: dict[str, Any] = Field(
    default_factory=dict,
    description="Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
)
spec class-attribute instance-attribute
spec: RayJobSpec = Field(default_factory=RayJobSpec)

Functions

to_k8s
to_k8s(
    context: AnyDagsterContext,
    image: str | None = None,
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""

    labels = labels or {}
    annotations = annotations or {}

    return {
        "apiVersion": self.api_version,
        "kind": self.kind,
        "metadata": remove_none_from_dict(
            {
                "name": self.metadata.get("name"),
                "labels": {**(self.metadata.get("labels", {}) or {}), **labels},
                "annotations": {**self.metadata.get("annotations", {}), **annotations},
            }
        ),
        "spec": self.spec.to_k8s(
            context=context,
            image=image,
            env_vars=env_vars,
        ),
    }

dagster_ray.kuberay.configs.RayJobSpec

Bases: PermissiveConfig

RayJob spec configuration options. A few sensible defaults are provided for convenience.

Attributes

ray_cluster_spec class-attribute instance-attribute
ray_cluster_spec: RayClusterSpec | None = Field(default_factory=RayClusterSpec)
entrypoint_num_cpus class-attribute instance-attribute
entrypoint_num_cpus: float | None = None
entrypoint_num_gpus class-attribute instance-attribute
entrypoint_num_gpus: float | None = None
entrypoint_memory class-attribute instance-attribute
entrypoint_memory: float | None = None
entrypoint_resources class-attribute instance-attribute
entrypoint_resources: str | None = None

Functions

to_k8s
to_k8s(
    context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""
    return remove_none_from_dict(
        {
            "activeDeadlineSeconds": self.active_deadline_seconds,
            "backoffLimit": self.backoff_limit,
            "submitterPodTemplate": self.submitter_pod_template,
            "metadata": self.metadata,
            "clusterSelector": self.cluster_selector,
            "managedBy": self.managed_by,
            "deletionStrategy": self.deletion_strategy,
            "runtimeEnvYAML": self.runtime_env_yaml,
            "jobId": self.job_id,
            "submissionMode": self.submission_mode,
            "entrypointResources": self.entrypoint_resources,
            "entrypointNumCpus": self.entrypoint_num_cpus,
            "entrypointMemory": self.entrypoint_memory,
            "entrypointNumGpus": self.entrypoint_num_gpus,
            "ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
            "shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
            "suspend": self.suspend,
            "rayClusterSpec": self.ray_cluster_spec.to_k8s(context=context, image=image, env_vars=env_vars)
            if self.ray_cluster_spec is not None
            else None,
        }
    )

dagster_ray.kuberay.resources.rayjob.InteractiveRayJobConfig pydantic-model

Bases: RayJobConfig

Same as RayJobConfig, but spec.submission_mode mode has to be InteractiveMode

Attributes

metadata class-attribute instance-attribute
metadata: dict[str, Any] = Field(
    default_factory=dict,
    description="Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
)
spec class-attribute instance-attribute

Functions

to_k8s
to_k8s(
    context: AnyDagsterContext,
    image: str | None = None,
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""

    labels = labels or {}
    annotations = annotations or {}

    return {
        "apiVersion": self.api_version,
        "kind": self.kind,
        "metadata": remove_none_from_dict(
            {
                "name": self.metadata.get("name"),
                "labels": {**(self.metadata.get("labels", {}) or {}), **labels},
                "annotations": {**self.metadata.get("annotations", {}), **annotations},
            }
        ),
        "spec": self.spec.to_k8s(
            context=context,
            image=image,
            env_vars=env_vars,
        ),
    }

dagster_ray.kuberay.resources.rayjob.InteractiveRayJobSpec

Bases: RayJobSpec

Same as RayJobSpec, but mode has to be InteractiveMode

Attributes

submission_mode class-attribute instance-attribute
submission_mode: Literal['InteractiveMode'] = 'InteractiveMode'
active_deadline_seconds class-attribute instance-attribute
active_deadline_seconds: int = 60 * 60 * 24
backoff_limit class-attribute instance-attribute
backoff_limit: int = 0
ray_cluster_spec class-attribute instance-attribute
ray_cluster_spec: RayClusterSpec | None = Field(default_factory=RayClusterSpec)
submitter_pod_template class-attribute instance-attribute
submitter_pod_template: dict[str, Any] | None = None
metadata class-attribute instance-attribute
metadata: dict[str, Any] | None = None
cluster_selector class-attribute instance-attribute
cluster_selector: dict[str, str] | None = None
managed_by class-attribute instance-attribute
managed_by: str | None = None
deletion_strategy class-attribute instance-attribute
deletion_strategy: dict[str, Any] | None = Field(
    default_factory=lambda: {"onFailure": {"policy": "DeleteCluster"}, "onSuccess": {"policy": "DeleteCluster"}}
)
runtime_env_yaml class-attribute instance-attribute
runtime_env_yaml: str | None = None
job_id class-attribute instance-attribute
job_id: str | None = None
entrypoint_resources class-attribute instance-attribute
entrypoint_resources: str | None = None
entrypoint_num_cpus class-attribute instance-attribute
entrypoint_num_cpus: float | None = None
entrypoint_memory class-attribute instance-attribute
entrypoint_memory: float | None = None
entrypoint_num_gpus class-attribute instance-attribute
entrypoint_num_gpus: float | None = None
ttl_seconds_after_finished class-attribute instance-attribute
ttl_seconds_after_finished: int | None = 5 * 60
shutdown_after_job_finishes class-attribute instance-attribute
shutdown_after_job_finishes: bool = True
suspend class-attribute instance-attribute
suspend: bool | None = None

Functions

to_k8s
to_k8s(
    context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""
    return remove_none_from_dict(
        {
            "activeDeadlineSeconds": self.active_deadline_seconds,
            "backoffLimit": self.backoff_limit,
            "submitterPodTemplate": self.submitter_pod_template,
            "metadata": self.metadata,
            "clusterSelector": self.cluster_selector,
            "managedBy": self.managed_by,
            "deletionStrategy": self.deletion_strategy,
            "runtimeEnvYAML": self.runtime_env_yaml,
            "jobId": self.job_id,
            "submissionMode": self.submission_mode,
            "entrypointResources": self.entrypoint_resources,
            "entrypointNumCpus": self.entrypoint_num_cpus,
            "entrypointMemory": self.entrypoint_memory,
            "entrypointNumGpus": self.entrypoint_num_gpus,
            "ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
            "shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
            "suspend": self.suspend,
            "rayClusterSpec": self.ray_cluster_spec.to_k8s(context=context, image=image, env_vars=env_vars)
            if self.ray_cluster_spec is not None
            else None,
        }
    )

dagster_ray.kuberay.configs.RayClusterConfig pydantic-model

Bases: Config

Attributes

metadata class-attribute instance-attribute
metadata: dict[str, Any] = Field(
    default_factory=dict,
    description="Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
)
spec class-attribute instance-attribute
spec: RayClusterSpec = Field(default_factory=RayClusterSpec)

Functions

to_k8s
to_k8s(
    context: AnyDagsterContext,
    image: str | None = None,
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    assert context.log is not None
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""

    labels = labels or {}
    annotations = annotations or {}

    return {
        "apiVersion": self.api_version,
        "kind": self.kind,
        "metadata": remove_none_from_dict(
            {
                "name": self.metadata.get("name"),
                "labels": {**(self.metadata.get("labels", {}) or {}), **labels},
                "annotations": {**self.metadata.get("annotations", {}), **annotations},
            }
        ),
        "spec": self.spec.to_k8s(context=context, image=image, env_vars=env_vars),
    }

dagster_ray.kuberay.configs.RayClusterSpec

Bases: PermissiveConfig

RayCluster spec configuration options. A few sensible defaults are provided for convenience.

Attributes

suspend class-attribute instance-attribute
suspend: bool | None = None
managed_by class-attribute instance-attribute
managed_by: str | None = None
autoscaler_options class-attribute instance-attribute
autoscaler_options: dict[str, Any] = DEFAULT_AUTOSCALER_OPTIONS
head_service_annotations class-attribute instance-attribute
head_service_annotations: dict[str, str] | None = None
enable_in_tree_autoscaling class-attribute instance-attribute
enable_in_tree_autoscaling: bool = False
gcs_fault_tolerance_options class-attribute instance-attribute
gcs_fault_tolerance_options: dict[str, Any] | None = None
head_group_spec class-attribute instance-attribute
head_group_spec: dict[str, Any] = DEFAULT_HEAD_GROUP_SPEC
ray_version class-attribute instance-attribute
ray_version: str | None = None
worker_group_specs class-attribute instance-attribute
worker_group_specs: list[dict[str, Any]] = DEFAULT_WORKER_GROUP_SPECS

Functions

to_k8s
to_k8s(
    context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""

    assert context.log is not None

    # TODO: inject self.redis_port and self.dashboard_port into the RayCluster configuration
    # TODO: auto-apply some tags from dagster-k8s/config

    head_group_spec = self.head_group_spec.copy()
    worker_group_specs = self.worker_group_specs.copy()

    k8s_env_vars: list[dict[str, Any]] = []

    if env_vars:
        for key, value in env_vars.items():
            k8s_env_vars.append({"name": key, "value": value})

    def update_group_spec(group_spec: dict[str, Any]):
        # TODO: only inject if the container has a `dagster.io/inject-image` annotation or smth
        if group_spec["template"]["spec"]["containers"][0].get("image") is None:
            if image is None:
                raise ValueError(MISSING_IMAGE_MESSAGE)
            else:
                group_spec["template"]["spec"]["containers"][0]["image"] = image

        for container in group_spec["template"]["spec"]["containers"]:
            container["env"] = container.get("env", []) + k8s_env_vars

    update_group_spec(head_group_spec)
    for worker_group_spec in worker_group_specs:
        update_group_spec(worker_group_spec)

    return remove_none_from_dict(
        {
            "enableInTreeAutoscaling": self.enable_in_tree_autoscaling,
            "autoscalerOptions": self.autoscaler_options,
            "headGroupSpec": head_group_spec,
            "workerGroupSpecs": worker_group_specs,
            "suspend": self.suspend,
            "managedBy": self.managed_by,
            "headServiceAnnotations": self.head_service_annotations,
            "gcsFaultToleranceOptions": self.gcs_fault_tolerance_options,
            "rayVersion": self.ray_version,
        }
    )

--

dagster_ray.kuberay.resources.base.BaseKubeRayResourceConfig pydantic-model

Bases: Config

Attributes

image class-attribute instance-attribute
image: str | None = Field(
    default=None,
    description="Image to inject into the `RayCluster` spec. Defaults to `dagster/image` run tag. Images already provided in the `RayCluster` spec won't be overridden.",
)
deployment_name class-attribute instance-attribute
deployment_name: str = Field(
    default=DEFAULT_DEPLOYMENT_NAME,
    description="Dagster deployment name. Is used as a prefix for the Kubernetes resource name. Dagster Cloud variables are used to determine the default value.",
)
poll_interval class-attribute instance-attribute
poll_interval: float = Field(default=1.0, description='Poll interval for various API requests')

Resources

dagster_ray.kuberay.KubeRayJobClientResource pydantic-model

Bases: ConfigurableResource[RayJobClient]

This configurable resource provides a dagster_ray.kuberay.client.RayJobClient.

dagster_ray.kuberay.KubeRayClusterClientResource pydantic-model

Bases: ConfigurableResource[RayClusterClient]

This configurable resource provides a dagster_ray.kuberay.client.RayClusterClient.


Kubernetes API Clients

dagster_ray.kuberay.client.RayClusterClient

RayClusterClient(kube_config: str | None = None, kube_context: str | None = None, api_client: ApiClient | None = None)

Bases: BaseKubeRayClient[RayClusterStatus]

Source code in src/dagster_ray/kuberay/client/raycluster/client.py
def __init__(
    self,
    kube_config: str | None = None,
    kube_context: str | None = None,
    api_client: ApiClient | None = None,
) -> None:
    super().__init__(group=GROUP, version=VERSION, kind=KIND, plural=PLURAL, api_client=api_client)

    # these are only used because of kubectl port-forward CLI command
    # TODO: remove kubectl usage and remove these attributes
    self.config_file = kube_config
    self.context = kube_context

Functions

create
create(body: dict[str, Any], namespace: str) -> Any
Source code in src/dagster_ray/kuberay/client/base.py
def create(self, body: dict[str, Any], namespace: str) -> Any:
    return self._api.create_namespaced_custom_object(
        group=self.group,
        version=body.get("apiVersion", f"{self.group}/{self.version}").split("/")[1],
        plural=self.plural,
        body=body,
        namespace=namespace,
    )
delete
delete(name: str, namespace: str)
Source code in src/dagster_ray/kuberay/client/base.py
def delete(self, name: str, namespace: str):
    return self._api.delete_namespaced_custom_object(
        group=self.group,
        version=self.version,
        plural=self.plural,
        name=name,
        namespace=namespace,
    )
get
get(name: str, namespace: str) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/client/base.py
def get(self, name: str, namespace: str) -> dict[str, Any]:
    from kubernetes.client import ApiException

    try:
        resource: Any = self._api.get_namespaced_custom_object(
            group=self.group,
            version=self.version,
            plural=self.plural,
            name=name,
            namespace=namespace,
        )
        return resource
    except ApiException as e:
        if e.status == 404:
            return {}
        raise
list
list(namespace: str, label_selector: str = '', async_req: bool = False) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/client/base.py
def list(self, namespace: str, label_selector: str = "", async_req: bool = False) -> dict[str, Any]:
    from kubernetes.client import ApiException

    try:
        resource: Any = self._api.list_namespaced_custom_object(
            group=self.group,
            version=self.version,
            plural=self.plural,
            namespace=namespace,
            label_selector=label_selector,
            async_req=async_req,
        )
        if "items" in resource:
            return resource
        else:
            return {}
    except ApiException as e:
        if e.status == 404:
            return {}

        raise
update
update(name: str, namespace: str, body: Any)
Source code in src/dagster_ray/kuberay/client/base.py
def update(self, name: str, namespace: str, body: Any):
    return self._api.patch_namespaced_custom_object(
        group=self.group,
        version=self.version,
        plural=self.plural,
        name=name,
        body=body,
        namespace=namespace,
    )

dagster_ray.kuberay.client.RayJobClient

RayJobClient(kube_config: str | None = None, kube_context: str | None = None, api_client: ApiClient | None = None)

Bases: BaseKubeRayClient[RayJobStatus]

Source code in src/dagster_ray/kuberay/client/rayjob/client.py
def __init__(
    self,
    kube_config: str | None = None,
    kube_context: str | None = None,
    api_client: ApiClient | None = None,
) -> None:
    # this call must happen BEFORE creating K8s apis
    load_kubeconfig(config_file=kube_config, context=kube_context)

    self.config_file = kube_config
    self.context = kube_context

    super().__init__(group=GROUP, version=VERSION, kind=KIND, plural=PLURAL, api_client=api_client)

Functions

create
create(body: dict[str, Any], namespace: str) -> Any
Source code in src/dagster_ray/kuberay/client/base.py
def create(self, body: dict[str, Any], namespace: str) -> Any:
    return self._api.create_namespaced_custom_object(
        group=self.group,
        version=body.get("apiVersion", f"{self.group}/{self.version}").split("/")[1],
        plural=self.plural,
        body=body,
        namespace=namespace,
    )
delete
delete(name: str, namespace: str)
Source code in src/dagster_ray/kuberay/client/base.py
def delete(self, name: str, namespace: str):
    return self._api.delete_namespaced_custom_object(
        group=self.group,
        version=self.version,
        plural=self.plural,
        name=name,
        namespace=namespace,
    )
get
get(name: str, namespace: str) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/client/base.py
def get(self, name: str, namespace: str) -> dict[str, Any]:
    from kubernetes.client import ApiException

    try:
        resource: Any = self._api.get_namespaced_custom_object(
            group=self.group,
            version=self.version,
            plural=self.plural,
            name=name,
            namespace=namespace,
        )
        return resource
    except ApiException as e:
        if e.status == 404:
            return {}
        raise
list
list(namespace: str, label_selector: str = '', async_req: bool = False) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/client/base.py
def list(self, namespace: str, label_selector: str = "", async_req: bool = False) -> dict[str, Any]:
    from kubernetes.client import ApiException

    try:
        resource: Any = self._api.list_namespaced_custom_object(
            group=self.group,
            version=self.version,
            plural=self.plural,
            namespace=namespace,
            label_selector=label_selector,
            async_req=async_req,
        )
        if "items" in resource:
            return resource
        else:
            return {}
    except ApiException as e:
        if e.status == 404:
            return {}

        raise
update
update(name: str, namespace: str, body: Any)
Source code in src/dagster_ray/kuberay/client/base.py
def update(self, name: str, namespace: str, body: Any):
    return self._api.patch_namespaced_custom_object(
        group=self.group,
        version=self.version,
        plural=self.plural,
        name=name,
        body=body,
        namespace=namespace,
    )