Skip to content

KubeRay API Reference

KubeRay integration components for running Ray on Kubernetes. Learn how to use it here.

kuberay

Client Mode Resources

These resources initialize Ray client connection with a remote cluster.

KubeRayInteractiveJob

Bases: BaseRayResource, BaseKubeRayResourceConfig

Provides a RayJob for Dagster steps.

Is the recommended way to run Ray workloads with automatic cluster management. It creates a RayJob, connects to it in client mode and sets the jobId field. Cleanup is handled by the KubeRay controller or by the resource lifecycle logic.

Info

Image defaults to dagster/image run tag.

Tip

Make sure ray[full] is available in the image.

Attributes

lifecycle class-attribute instance-attribute

lifecycle: Lifecycle = Field(
    default_factory=lambda: Lifecycle(cleanup="on_exception"), description="Actions to perform during resource setup."
)

ray_job class-attribute instance-attribute

ray_job: InteractiveRayJobConfig = Field(
    default_factory=InteractiveRayJobConfig, description="Configuration for the Kubernetes `RayJob` CR"
)

client class-attribute instance-attribute

client: ResourceDependency[RayJobClient] = Field(
    default_factory=KubeRayJobClientResource, description="Kubernetes `RayJob` client"
)

log_cluster_conditions class-attribute instance-attribute

log_cluster_conditions: bool = Field(
    default=True,
    description="Whether to log `RayCluster` conditions while waiting for the RayCluster to become ready. For more information, see https://docs.ray.io/en/latest/cluster/kubernetes/user-guides/observability.html#raycluster-status-conditions.",
)

KubeRayCluster

Bases: BaseKubeRayResourceConfig, BaseRayResource

Provides a RayCluster for Dagster steps.

It is advised to use dagster_ray.kuberay.KubeRayInteractiveJob with KubeRay >= 1.3.0 instead.

Info

Image defaults to dagster/image run tag.

Tip

Make sure ray[full] is available in the image.

Attributes

lifecycle class-attribute instance-attribute

lifecycle: Lifecycle = Field(
    default_factory=lambda: Lifecycle(cleanup="always"), description="Actions to perform during resource setup."
)

ray_cluster class-attribute instance-attribute

ray_cluster: RayClusterConfig = Field(
    default_factory=RayClusterConfig, description="Kubernetes `RayCluster` CR configuration."
)

client class-attribute instance-attribute

client: ResourceDependency[RayClusterClient] = Field(
    default_factory=KubeRayClusterClientResource, description="Kubernetes `RayCluster` client"
)

log_cluster_conditions class-attribute instance-attribute

log_cluster_conditions: bool = Field(
    default=True,
    description="Whether to log RayCluster conditions while waiting for the RayCluster to become ready. For more information, see https://docs.ray.io/en/latest/cluster/kubernetes/user-guides/observability.html#raycluster-status-conditions.",
)

Job Submission Resources

These resources submit Ray jobs to a remote cluster.

PipesKubeRayJobClient

PipesKubeRayJobClient(
    client: RayJobClient | None = None,
    context_injector: PipesContextInjector | None = None,
    message_reader: PipesMessageReader | None = None,
    forward_termination: bool = True,
    timeout: float = 600,
    poll_interval: float = 1,
    port_forward: bool = False,
)

Bases: PipesClient, TreatAsResourceParam

A pipes client for running RayJob on Kubernetes.

Parameters:

Name Type Description Default
context_injector Optional[PipesContextInjector]

A context injector to use to inject context into the RayJob. Defaults to PipesEnvContextInjector.

None
message_reader Optional[PipesMessageReader]

A message reader to use to read messages from the glue job run. Defaults to PipesRayJobMessageReader.

None
client Optional[client]

The Kubernetes API client.

None
forward_termination bool

Whether to terminate the Ray job when the Dagster process receives a termination signal, or if the startup timeout is reached. Defaults to True.

True
timeout int

Timeout for various internal interactions with the Kubernetes RayJob.

600
poll_interval int

Interval at which to poll the Kubernetes for status updates.

1
port_forward bool

Whether to use Kubernetes port-forwarding to connect to the KubeRay cluster.

False
Info

Image defaults to dagster/image run tag.

Tip

Make sure ray[full] is available in the image.

Source code in src/dagster_ray/kuberay/pipes.py
def __init__(
    self,
    client: RayJobClient | None = None,
    context_injector: PipesContextInjector | None = None,
    message_reader: PipesMessageReader | None = None,
    forward_termination: bool = True,
    timeout: float = 600,
    poll_interval: float = 1,
    port_forward: bool = False,
):
    self.client: RayJobClient = client or RayJobClient()

    self._context_injector = context_injector or PipesEnvContextInjector()
    self._message_reader = message_reader or PipesRayJobMessageReader()

    self.forward_termination = check.bool_param(forward_termination, "forward_termination")
    self.timeout = check.int_param(timeout, "timeout")
    self.poll_interval = check.int_param(poll_interval, "poll_interval")
    self.port_forward = check.bool_param(port_forward, "port_forward")

    self._job_submission_client: JobSubmissionClient | None = None

Functions

run

run(
    *, context: OpOrAssetExecutionContext, ray_job: dict[str, Any], extras: PipesExtras | None = None
) -> PipesClientCompletedInvocation

Execute a RayJob, enriched with the Pipes protocol.

Parameters:

Name Type Description Default
context OpExecutionContext

Current Dagster op or asset context.

required
ray_job Dict[str, Any]

RayJob specification. API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>_.

required
extras Optional[Dict[str, Any]]

Additional information to pass to the Pipes session.

None
Source code in src/dagster_ray/kuberay/pipes.py
def run(  # type: ignore
    self,
    *,
    context: OpOrAssetExecutionContext,
    ray_job: dict[str, Any],
    extras: PipesExtras | None = None,
) -> PipesClientCompletedInvocation:
    """
    Execute a RayJob, enriched with the Pipes protocol.

    Args:
        context (OpExecutionContext): Current Dagster op or asset context.
        ray_job (Dict[str, Any]): RayJob specification. `API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>`_.
        extras (Optional[Dict[str, Any]]): Additional information to pass to the Pipes session.
    """
    with open_pipes_session(
        context=context,
        message_reader=self._message_reader,
        context_injector=self._context_injector,
        extras=extras,
    ) as session:
        ray_job = self._enrich_ray_job(context, session, ray_job)
        start_response = self._start(context, session, ray_job)
        start_status = cast(RayJobStatus, start_response["status"])
        ray_job_id = start_status["jobId"]  # pyright: ignore[reportTypedDictNotRequiredAccess]

        name = ray_job["metadata"]["name"]
        namespace = ray_job["metadata"]["namespace"]

        with self.client.ray_cluster_client.job_submission_client(
            name=self.client.get_ray_cluster_name(
                name=name, namespace=namespace, timeout=self.timeout, poll_interval=self.poll_interval
            ),
            namespace=namespace,
            port_forward=self.port_forward,
        ) as job_submission_client:
            self._job_submission_client = job_submission_client

            session.report_launched(
                {
                    "extras": {
                        PIPES_LAUNCHED_EXTRAS_RAY_JOB_ID_KEY: ray_job_id,
                        PIPES_LAUNCHED_EXTRAS_RAY_ADDRESS_KEY: job_submission_client.get_address(),
                    }
                }
            )

            try:
                self._wait_for_completion(context, start_response)

                if isinstance(self._message_reader, PipesRayJobMessageReader) and self.port_forward:
                    # in this case the message reader will fail once port forwarding is finished
                    # TODO: merge https://github.com/danielgafni/dagster-ray/pull/123
                    # to avoid this work-around
                    self._message_reader.thread_ready.wait()
                    context.log.debug(
                        "[pipes] waiting for PipesRayJobMessageReader to complete before stopping port-forwarding"
                    )
                    self._message_reader.session_closed.set()
                    self._message_reader.completed.wait()

                return PipesClientCompletedInvocation(
                    session, metadata={"RayJob": f"{namespace}/{name}", "Ray Job ID": ray_job_id}
                )

            except DagsterExecutionInterruptedError:
                if self.forward_termination:
                    context.log.warning(
                        f"[pipes] Dagster process interrupted! Will terminate RayJob {namespace}/{name}."
                    )
                    self._terminate(context, start_response)
                raise

Configuration and Types

RayJobConfig

Bases: Config

Attributes

metadata class-attribute instance-attribute

metadata: dict[str, Any] = Field(
    default_factory=dict,
    description="Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
)

spec class-attribute instance-attribute

spec: RayJobSpec = Field(default_factory=RayJobSpec)

Functions

to_k8s

to_k8s(
    context: AnyDagsterContext,
    image: str | None = None,
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""

    labels = labels or {}
    annotations = annotations or {}

    return {
        "apiVersion": self.api_version,
        "kind": self.kind,
        "metadata": remove_none_from_dict(
            {
                "name": self.metadata.get("name"),
                "labels": {**(self.metadata.get("labels", {}) or {}), **labels},
                "annotations": {**self.metadata.get("annotations", {}), **annotations},
            }
        ),
        "spec": self.spec.to_k8s(
            context=context,
            image=image,
            env_vars=env_vars,
        ),
    }

RayJobSpec

Bases: PermissiveConfig

RayJob spec configuration options. A few sensible defaults are provided for convenience.

Attributes

ray_cluster_spec class-attribute instance-attribute

ray_cluster_spec: RayClusterSpec | None = Field(default_factory=RayClusterSpec)

entrypoint_num_cpus class-attribute instance-attribute

entrypoint_num_cpus: float | None = None

entrypoint_num_gpus class-attribute instance-attribute

entrypoint_num_gpus: float | None = None

entrypoint_memory class-attribute instance-attribute

entrypoint_memory: float | None = None

entrypoint_resources class-attribute instance-attribute

entrypoint_resources: str | None = None

Functions

to_k8s

to_k8s(
    context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""
    return remove_none_from_dict(
        {
            "activeDeadlineSeconds": self.active_deadline_seconds,
            "backoffLimit": self.backoff_limit,
            "submitterPodTemplate": self.submitter_pod_template,
            "metadata": self.metadata,
            "clusterSelector": self.cluster_selector,
            "managedBy": self.managed_by,
            "deletionStrategy": self.deletion_strategy,
            "runtimeEnvYAML": self.runtime_env_yaml,
            "jobId": self.job_id,
            "submissionMode": self.submission_mode,
            "entrypointResources": self.entrypoint_resources,
            "entrypointNumCpus": self.entrypoint_num_cpus,
            "entrypointMemory": self.entrypoint_memory,
            "entrypointNumGpus": self.entrypoint_num_gpus,
            "ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
            "shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
            "suspend": self.suspend,
            "rayClusterSpec": self.ray_cluster_spec.to_k8s(context=context, image=image, env_vars=env_vars)
            if self.ray_cluster_spec is not None
            else None,
        }
    )

InteractiveRayJobConfig

Bases: RayJobConfig

Same as dagster_ray.kuberay.resources.rayjob.RayJobConfig, but spec.submission_mode mode has to be InteractiveMode

Attributes

metadata class-attribute instance-attribute

metadata: dict[str, Any] = Field(
    default_factory=dict,
    description="Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
)

spec class-attribute instance-attribute

spec: InteractiveRayJobSpec = Field(default_factory=InteractiveRayJobSpec)

Functions

to_k8s

to_k8s(
    context: AnyDagsterContext,
    image: str | None = None,
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""

    labels = labels or {}
    annotations = annotations or {}

    return {
        "apiVersion": self.api_version,
        "kind": self.kind,
        "metadata": remove_none_from_dict(
            {
                "name": self.metadata.get("name"),
                "labels": {**(self.metadata.get("labels", {}) or {}), **labels},
                "annotations": {**self.metadata.get("annotations", {}), **annotations},
            }
        ),
        "spec": self.spec.to_k8s(
            context=context,
            image=image,
            env_vars=env_vars,
        ),
    }

InteractiveRayJobSpec

Bases: RayJobSpec

Same as dagster_ray.kuberay.resources.rayjob.RayJobSpec, but submission mode has to be InteractiveMode

Attributes

submission_mode class-attribute instance-attribute

submission_mode: Literal['InteractiveMode'] = 'InteractiveMode'

active_deadline_seconds class-attribute instance-attribute

active_deadline_seconds: int = 60 * 60 * 24

backoff_limit class-attribute instance-attribute

backoff_limit: int = 0

ray_cluster_spec class-attribute instance-attribute

ray_cluster_spec: RayClusterSpec | None = Field(default_factory=RayClusterSpec)

submitter_pod_template class-attribute instance-attribute

submitter_pod_template: dict[str, Any] | None = None

metadata class-attribute instance-attribute

metadata: dict[str, Any] | None = None

cluster_selector class-attribute instance-attribute

cluster_selector: dict[str, str] | None = None

managed_by class-attribute instance-attribute

managed_by: str | None = None

deletion_strategy class-attribute instance-attribute

deletion_strategy: dict[str, Any] | None = Field(
    default_factory=lambda: {"onFailure": {"policy": "DeleteCluster"}, "onSuccess": {"policy": "DeleteCluster"}}
)

runtime_env_yaml class-attribute instance-attribute

runtime_env_yaml: str | None = None

job_id class-attribute instance-attribute

job_id: str | None = None

entrypoint_resources class-attribute instance-attribute

entrypoint_resources: str | None = None

entrypoint_num_cpus class-attribute instance-attribute

entrypoint_num_cpus: float | None = None

entrypoint_memory class-attribute instance-attribute

entrypoint_memory: float | None = None

entrypoint_num_gpus class-attribute instance-attribute

entrypoint_num_gpus: float | None = None

ttl_seconds_after_finished class-attribute instance-attribute

ttl_seconds_after_finished: int | None = 5 * 60

shutdown_after_job_finishes class-attribute instance-attribute

shutdown_after_job_finishes: bool = True

suspend class-attribute instance-attribute

suspend: bool | None = None

Functions

to_k8s

to_k8s(
    context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""
    return remove_none_from_dict(
        {
            "activeDeadlineSeconds": self.active_deadline_seconds,
            "backoffLimit": self.backoff_limit,
            "submitterPodTemplate": self.submitter_pod_template,
            "metadata": self.metadata,
            "clusterSelector": self.cluster_selector,
            "managedBy": self.managed_by,
            "deletionStrategy": self.deletion_strategy,
            "runtimeEnvYAML": self.runtime_env_yaml,
            "jobId": self.job_id,
            "submissionMode": self.submission_mode,
            "entrypointResources": self.entrypoint_resources,
            "entrypointNumCpus": self.entrypoint_num_cpus,
            "entrypointMemory": self.entrypoint_memory,
            "entrypointNumGpus": self.entrypoint_num_gpus,
            "ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
            "shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
            "suspend": self.suspend,
            "rayClusterSpec": self.ray_cluster_spec.to_k8s(context=context, image=image, env_vars=env_vars)
            if self.ray_cluster_spec is not None
            else None,
        }
    )

RayClusterConfig

Bases: Config

Attributes

metadata class-attribute instance-attribute

metadata: dict[str, Any] = Field(
    default_factory=dict,
    description="Kubernetes metadata, except the name field can be omitted. In this case it will be generated by `dagster-ray`.",
)

spec class-attribute instance-attribute

spec: RayClusterSpec = Field(default_factory=RayClusterSpec)

Functions

to_k8s

to_k8s(
    context: AnyDagsterContext,
    image: str | None = None,
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    labels: Mapping[str, str] | None = None,
    annotations: Mapping[str, str] | None = None,
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    assert context.log is not None
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""

    labels = labels or {}
    annotations = annotations or {}

    return {
        "apiVersion": self.api_version,
        "kind": self.kind,
        "metadata": remove_none_from_dict(
            {
                "name": self.metadata.get("name"),
                "labels": {**(self.metadata.get("labels", {}) or {}), **labels},
                "annotations": {**self.metadata.get("annotations", {}), **annotations},
            }
        ),
        "spec": self.spec.to_k8s(context=context, image=image, env_vars=env_vars),
    }

RayClusterSpec

Bases: PermissiveConfig

RayCluster spec configuration options. A few sensible defaults are provided for convenience.

Attributes

suspend class-attribute instance-attribute

suspend: bool | None = None

managed_by class-attribute instance-attribute

managed_by: str | None = None

autoscaler_options class-attribute instance-attribute

autoscaler_options: dict[str, Any] = DEFAULT_AUTOSCALER_OPTIONS

head_service_annotations class-attribute instance-attribute

head_service_annotations: dict[str, str] | None = None

enable_in_tree_autoscaling class-attribute instance-attribute

enable_in_tree_autoscaling: bool = False

gcs_fault_tolerance_options class-attribute instance-attribute

gcs_fault_tolerance_options: dict[str, Any] | None = None

head_group_spec class-attribute instance-attribute

head_group_spec: dict[str, Any] = DEFAULT_HEAD_GROUP_SPEC

ray_version class-attribute instance-attribute

ray_version: str | None = None

worker_group_specs class-attribute instance-attribute

worker_group_specs: list[dict[str, Any]] = DEFAULT_WORKER_GROUP_SPECS

Functions

to_k8s

to_k8s(
    context: AnyDagsterContext, image: str | None = None, env_vars: Mapping[str, str] | None = None
) -> dict[str, Any]

Convert into Kubernetes manifests in camelCase format and inject additional information

Source code in src/dagster_ray/kuberay/configs.py
def to_k8s(
    self,
    context: AnyDagsterContext,
    image: str | None = None,  # is injected into headgroup and workergroups, unless already specified there
    env_vars: Mapping[str, str] | None = None,
) -> dict[str, Any]:
    """Convert into Kubernetes manifests in camelCase format and inject additional information"""

    assert context.log is not None

    # TODO: inject self.redis_port and self.dashboard_port into the RayCluster configuration
    # TODO: auto-apply some tags from dagster-k8s/config

    head_group_spec = self.head_group_spec.copy()
    worker_group_specs = self.worker_group_specs.copy()

    k8s_env_vars: list[dict[str, Any]] = []

    if env_vars:
        for key, value in env_vars.items():
            k8s_env_vars.append({"name": key, "value": value})

    def update_group_spec(group_spec: dict[str, Any]):
        # TODO: only inject if the container has a `dagster.io/inject-image` annotation or smth
        if group_spec["template"]["spec"]["containers"][0].get("image") is None:
            if image is None:
                raise ValueError(MISSING_IMAGE_MESSAGE)
            else:
                group_spec["template"]["spec"]["containers"][0]["image"] = image

        for container in group_spec["template"]["spec"]["containers"]:
            container["env"] = container.get("env", []) + k8s_env_vars

    update_group_spec(head_group_spec)
    for worker_group_spec in worker_group_specs:
        update_group_spec(worker_group_spec)

    return remove_none_from_dict(
        {
            "enableInTreeAutoscaling": self.enable_in_tree_autoscaling,
            "autoscalerOptions": self.autoscaler_options,
            "headGroupSpec": head_group_spec,
            "workerGroupSpecs": worker_group_specs,
            "suspend": self.suspend,
            "managedBy": self.managed_by,
            "headServiceAnnotations": self.head_service_annotations,
            "gcsFaultToleranceOptions": self.gcs_fault_tolerance_options,
            "rayVersion": self.ray_version,
        }
    )

--

BaseKubeRayResourceConfig

Bases: Config

Attributes

image class-attribute instance-attribute

image: str | None = Field(
    default=None,
    description="Image to inject into the `RayCluster` spec. Defaults to `dagster/image` run tag. Images already provided in the `RayCluster` spec won't be overridden.",
)

deployment_name class-attribute instance-attribute

deployment_name: str = Field(
    default=DEFAULT_DEPLOYMENT_NAME,
    description="Dagster deployment name. Is used as a prefix for the Kubernetes resource name. Dagster Cloud variables are used to determine the default value.",
)

poll_interval class-attribute instance-attribute

poll_interval: float = Field(default=1.0, description='Poll interval for various API requests')

Resources

KubeRayJobClientResource

Bases: ConfigurableResource[RayJobClient]

This configurable resource provides a dagster_ray.kuberay.client.RayJobClient.

KubeRayClusterClientResource

Bases: ConfigurableResource[RayClusterClient]

This configurable resource provides a dagster_ray.kuberay.client.RayClusterClient.

Kubernetes API Clients

RayClusterClient

RayClusterClient(kube_config: str | None = None, kube_context: str | None = None, api_client: ApiClient | None = None)

Bases: BaseKubeRayClient[RayClusterStatus]

Source code in src/dagster_ray/kuberay/client/raycluster/client.py
def __init__(
    self,
    kube_config: str | None = None,
    kube_context: str | None = None,
    api_client: ApiClient | None = None,
) -> None:
    super().__init__(group=GROUP, version=VERSION, kind=KIND, plural=PLURAL, api_client=api_client)

    # these are only used because of kubectl port-forward CLI command
    # TODO: remove kubectl usage and remove these attributes
    self.config_file = kube_config
    self.context = kube_context

Functions

create

create(body: dict[str, Any], namespace: str) -> Any
Source code in src/dagster_ray/kuberay/client/base.py
def create(self, body: dict[str, Any], namespace: str) -> Any:
    return self._api.create_namespaced_custom_object(
        group=self.group,
        version=body.get("apiVersion", f"{self.group}/{self.version}").split("/")[1],
        plural=self.plural,
        body=body,
        namespace=namespace,
    )

delete

delete(name: str, namespace: str)
Source code in src/dagster_ray/kuberay/client/base.py
def delete(self, name: str, namespace: str):
    return self._api.delete_namespaced_custom_object(
        group=self.group,
        version=self.version,
        plural=self.plural,
        name=name,
        namespace=namespace,
    )

get

get(name: str, namespace: str) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/client/base.py
def get(self, name: str, namespace: str) -> dict[str, Any]:
    from kubernetes.client import ApiException

    try:
        resource: Any = self._api.get_namespaced_custom_object(
            group=self.group,
            version=self.version,
            plural=self.plural,
            name=name,
            namespace=namespace,
        )
        return resource
    except ApiException as e:
        if e.status == 404:
            return {}
        raise

list

list(namespace: str, label_selector: str = '', async_req: bool = False) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/client/base.py
def list(self, namespace: str, label_selector: str = "", async_req: bool = False) -> dict[str, Any]:
    from kubernetes.client import ApiException

    try:
        resource: Any = self._api.list_namespaced_custom_object(
            group=self.group,
            version=self.version,
            plural=self.plural,
            namespace=namespace,
            label_selector=label_selector,
            async_req=async_req,
        )
        if "items" in resource:
            return resource
        else:
            return {}
    except ApiException as e:
        if e.status == 404:
            return {}

        raise

update

update(name: str, namespace: str, body: Any)
Source code in src/dagster_ray/kuberay/client/base.py
def update(self, name: str, namespace: str, body: Any):
    return self._api.patch_namespaced_custom_object(
        group=self.group,
        version=self.version,
        plural=self.plural,
        name=name,
        body=body,
        namespace=namespace,
    )

RayJobClient

RayJobClient(kube_config: str | None = None, kube_context: str | None = None, api_client: ApiClient | None = None)

Bases: BaseKubeRayClient[RayJobStatus]

Source code in src/dagster_ray/kuberay/client/rayjob/client.py
def __init__(
    self,
    kube_config: str | None = None,
    kube_context: str | None = None,
    api_client: ApiClient | None = None,
) -> None:
    # this call must happen BEFORE creating K8s apis
    load_kubeconfig(config_file=kube_config, context=kube_context)

    self.config_file = kube_config
    self.context = kube_context

    super().__init__(group=GROUP, version=VERSION, kind=KIND, plural=PLURAL, api_client=api_client)

Functions

create

create(body: dict[str, Any], namespace: str) -> Any
Source code in src/dagster_ray/kuberay/client/base.py
def create(self, body: dict[str, Any], namespace: str) -> Any:
    return self._api.create_namespaced_custom_object(
        group=self.group,
        version=body.get("apiVersion", f"{self.group}/{self.version}").split("/")[1],
        plural=self.plural,
        body=body,
        namespace=namespace,
    )

delete

delete(name: str, namespace: str)
Source code in src/dagster_ray/kuberay/client/base.py
def delete(self, name: str, namespace: str):
    return self._api.delete_namespaced_custom_object(
        group=self.group,
        version=self.version,
        plural=self.plural,
        name=name,
        namespace=namespace,
    )

get

get(name: str, namespace: str) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/client/base.py
def get(self, name: str, namespace: str) -> dict[str, Any]:
    from kubernetes.client import ApiException

    try:
        resource: Any = self._api.get_namespaced_custom_object(
            group=self.group,
            version=self.version,
            plural=self.plural,
            name=name,
            namespace=namespace,
        )
        return resource
    except ApiException as e:
        if e.status == 404:
            return {}
        raise

list

list(namespace: str, label_selector: str = '', async_req: bool = False) -> dict[str, Any]
Source code in src/dagster_ray/kuberay/client/base.py
def list(self, namespace: str, label_selector: str = "", async_req: bool = False) -> dict[str, Any]:
    from kubernetes.client import ApiException

    try:
        resource: Any = self._api.list_namespaced_custom_object(
            group=self.group,
            version=self.version,
            plural=self.plural,
            namespace=namespace,
            label_selector=label_selector,
            async_req=async_req,
        )
        if "items" in resource:
            return resource
        else:
            return {}
    except ApiException as e:
        if e.status == 404:
            return {}

        raise

update

update(name: str, namespace: str, body: Any)
Source code in src/dagster_ray/kuberay/client/base.py
def update(self, name: str, namespace: str, body: Any):
    return self._api.patch_namespaced_custom_object(
        group=self.group,
        version=self.version,
        plural=self.plural,
        name=name,
        body=body,
        namespace=namespace,
    )