Skip to content

Core API Reference

Core dagster-ray APIs for using external Ray clusters. Learn how to use it here.


Ray Resources

RayResource can be used to connect to external Ray clusters when provided as a Dagster resource, or as a type annotation (all other Ray resources in dagster-ray inherit from RayResource)

dagster_ray.RayResource pydantic-model

Bases: ConfigurableResource, ABC

Base class for Ray Resources providing a common interface for Ray cluster management.

This abstract base class defines the interface that all Ray resources must implement, providing a backend-agnostic way to interact with Ray clusters. Concrete implementations include LocalRay for local development and KubeRay resources for Kubernetes deployments.

The RayResource handles the lifecycle of Ray clusters including creation, connection, and cleanup, with configurable policies for each stage.

Example

Use as a type annotation for backend-agnostic code

import dagster as dg
from dagster_ray import RayResource

@dg.asset
def my_asset(ray_cluster: RayResource):
    # Works with any Ray backend
    import ray
    return ray.get(ray.put("hello"))

Example

Manual lifecycle management

from dagster_ray import Lifecycle

ray_resource = SomeRayResource(
    lifecycle=Lifecycle(
        create=False,  # Don't auto-create
        connect=False  # Don't auto-connect
    )
)

Note

This is an abstract class and cannot be instantiated directly. Use concrete implementations like LocalRay or KubeRayCluster instead.

Show JSON schema:
{
  "$defs": {
    "ExecutionOptionsConfig": {
      "properties": {
        "cpu": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Cpu"
        },
        "gpu": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Gpu"
        },
        "object_store_memory": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Object Store Memory"
        }
      },
      "title": "ExecutionOptionsConfig",
      "type": "object"
    },
    "Lifecycle": {
      "properties": {
        "create": {
          "default": true,
          "description": "Whether to create the resource. If set to `False`, the user can manually call `.create` instead.",
          "title": "Create",
          "type": "boolean"
        },
        "wait": {
          "default": true,
          "description": "Whether to wait for the remote Ray cluster to become ready to accept connections. If set to `False`, the user can manually call `.wait` instead.",
          "title": "Wait",
          "type": "boolean"
        },
        "connect": {
          "default": true,
          "description": "Whether to run `ray.init` against the remote Ray cluster. If set to `False`, the user can manually call `.connect` instead.",
          "title": "Connect",
          "type": "boolean"
        },
        "cleanup": {
          "default": "always",
          "description": "Resource cleanup policy. Determines when the resource should be deleted after Dagster step execution or during interruption.",
          "enum": [
            "never",
            "always",
            "on_exception"
          ],
          "title": "Cleanup",
          "type": "string"
        }
      },
      "title": "Lifecycle",
      "type": "object"
    },
    "RayDataExecutionOptions": {
      "properties": {
        "execution_options": {
          "$ref": "#/$defs/ExecutionOptionsConfig"
        },
        "cpu_limit": {
          "default": 5000,
          "title": "Cpu Limit",
          "type": "integer"
        },
        "gpu_limit": {
          "default": 0,
          "title": "Gpu Limit",
          "type": "integer"
        },
        "verbose_progress": {
          "default": true,
          "title": "Verbose Progress",
          "type": "boolean"
        },
        "use_polars": {
          "default": true,
          "title": "Use Polars",
          "type": "boolean"
        }
      },
      "title": "RayDataExecutionOptions",
      "type": "object"
    }
  },
  "description": "Base class for Ray Resources providing a common interface for Ray cluster management.\n\nThis abstract base class defines the interface that all Ray resources must implement,\nproviding a backend-agnostic way to interact with Ray clusters. Concrete implementations\ninclude LocalRay for local development and KubeRay resources for Kubernetes deployments.\n\nThe RayResource handles the lifecycle of Ray clusters including creation, connection,\nand cleanup, with configurable policies for each stage.\n\nExample:\n    Use as a type annotation for backend-agnostic code\n    ```python\n    import dagster as dg\n    from dagster_ray import RayResource\n\n    @dg.asset\n    def my_asset(ray_cluster: RayResource):\n        # Works with any Ray backend\n        import ray\n        return ray.get(ray.put(\"hello\"))\n    ```\n\nExample:\n    Manual lifecycle management\n    ```python\n    from dagster_ray import Lifecycle\n\n    ray_resource = SomeRayResource(\n        lifecycle=Lifecycle(\n            create=False,  # Don't auto-create\n            connect=False  # Don't auto-connect\n        )\n    )\n    ```\n\nNote:\n    This is an abstract class and cannot be instantiated directly. Use concrete\n    implementations like LocalRay or KubeRayCluster instead.",
  "properties": {
    "lifecycle": {
      "$ref": "#/$defs/Lifecycle",
      "description": "Actions to perform during resource setup."
    },
    "timeout": {
      "default": 600.0,
      "description": "Timeout for Ray readiness in seconds",
      "title": "Timeout",
      "type": "number"
    },
    "ray_init_options": {
      "additionalProperties": true,
      "description": "Additional keyword arguments to pass to `ray.init()` call, such as `runtime_env`, `num_cpus`, etc. Dagster's `EnvVar` is supported. More details in [Ray docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).",
      "title": "Ray Init Options",
      "type": "object"
    },
    "data_execution_options": {
      "$ref": "#/$defs/RayDataExecutionOptions"
    },
    "redis_port": {
      "default": 10001,
      "description": "Redis port for connection. Make sure to match with the actual available port.",
      "title": "Redis Port",
      "type": "integer"
    },
    "dashboard_port": {
      "default": 8265,
      "description": "Dashboard port for connection. Make sure to match with the actual available port.",
      "title": "Dashboard Port",
      "type": "integer"
    },
    "env_vars": {
      "anyOf": [
        {
          "additionalProperties": {
            "type": "string"
          },
          "type": "object"
        },
        {
          "type": "null"
        }
      ],
      "description": "Environment variables to pass to the Ray cluster.",
      "title": "Env Vars"
    },
    "enable_tracing": {
      "default": false,
      "description": "Enable tracing: inject `RAY_PROFILING=1` and `RAY_task_events_report_interval_ms=0` into the Ray cluster configuration. This allows using `ray.timeline()` to fetch recorded task events. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.timeline.html#ray-timeline)",
      "title": "Enable Tracing",
      "type": "boolean"
    },
    "enable_actor_task_logging": {
      "default": false,
      "description": "Enable actor task logging: inject `RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1` into the Ray cluster configuration.",
      "title": "Enable Actor Task Logging",
      "type": "boolean"
    },
    "enable_debug_post_mortem": {
      "default": false,
      "description": "Enable post-mortem debugging: inject `RAY_DEBUG_POST_MORTEM=1` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/ray-distributed-debugger.html)",
      "title": "Enable Debug Post Mortem",
      "type": "boolean"
    },
    "enable_legacy_debugger": {
      "default": false,
      "description": "Enable legacy debugger: inject `RAY_DEBUG=legacy` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/user-guides/debug-apps/ray-debugging.html#using-the-ray-debugger)",
      "title": "Enable Legacy Debugger",
      "type": "boolean"
    }
  },
  "title": "RayResource",
  "type": "object"
}

Fields:

Attributes

lifecycle pydantic-field
lifecycle: Lifecycle

Actions to perform during resource setup.

timeout pydantic-field
timeout: float = 600.0

Timeout for Ray readiness in seconds

ray_init_options pydantic-field
ray_init_options: dict[str, Any]

Additional keyword arguments to pass to ray.init() call, such as runtime_env, num_cpus, etc. Dagster's EnvVar is supported. More details in Ray docs.

redis_port pydantic-field
redis_port: int = 10001

Redis port for connection. Make sure to match with the actual available port.

dashboard_port pydantic-field
dashboard_port: int = 8265

Dashboard port for connection. Make sure to match with the actual available port.

env_vars pydantic-field
env_vars: dict[str, str] | None

Environment variables to pass to the Ray cluster.

enable_tracing pydantic-field
enable_tracing: bool = False

Enable tracing: inject RAY_PROFILING=1 and RAY_task_events_report_interval_ms=0 into the Ray cluster configuration. This allows using ray.timeline() to fetch recorded task events. Learn more: KubeRay docs

enable_actor_task_logging pydantic-field
enable_actor_task_logging: bool = False

Enable actor task logging: inject RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1 into the Ray cluster configuration.

enable_debug_post_mortem pydantic-field
enable_debug_post_mortem: bool = False

Enable post-mortem debugging: inject RAY_DEBUG_POST_MORTEM=1 into the Ray cluster configuration. Learn more: KubeRay docs

enable_legacy_debugger pydantic-field
enable_legacy_debugger: bool = False

Enable legacy debugger: inject RAY_DEBUG=legacy into the Ray cluster configuration. Learn more: KubeRay docs

runtime_job_id property
runtime_job_id: str

Returns the Ray Job ID for the current job which was created with ray.init(). :return:

Functions

on_create
on_create(context: AnyDagsterContext)

Called after the resource is successfully created. Override to customize it.

Source code in src/dagster_ray/_base/resources.py
def on_create(self, context: AnyDagsterContext):
    """Called after the resource is successfully created. Override to customize it."""
    assert context.log is not None
    context.log.info(f"{self._creation_verb} {self.display_name}.")
on_ready
on_ready(context: AnyDagsterContext)

Called after the resource becomes ready. Override to customize it.

Source code in src/dagster_ray/_base/resources.py
def on_ready(self, context: AnyDagsterContext):
    """Called after the resource becomes ready. Override to customize it."""
    pass
on_connect
on_connect(context: AnyDagsterContext)

Called after the Ray client connects. Override to customize it.

Source code in src/dagster_ray/_base/resources.py
def on_connect(self, context: AnyDagsterContext):
    """Called after the Ray client connects. Override to customize it."""
    assert context.log is not None
    context.log.info(f"Connected to {self.display_name} via Ray Client")
on_cleanup
on_cleanup(context: AnyDagsterContext, *, deleted: bool)

Called after cleanup logic runs. Override to customize it.

Source code in src/dagster_ray/_base/resources.py
def on_cleanup(self, context: AnyDagsterContext, *, deleted: bool):
    """Called after cleanup logic runs. Override to customize it."""
    assert context.log is not None
    if deleted:
        context.log.info(f'Deleted {self.display_name} according to cleanup policy "{self.lifecycle.cleanup}"')

The LocalRay can be used to connect to a local Ray cluster.

dagster_ray.core.resources.LocalRay pydantic-model

Bases: RayResource

Dummy Resource. Is useful for testing and local development. Provides the same interface as actual Resources.

Show JSON schema:
{
  "$defs": {
    "ExecutionOptionsConfig": {
      "properties": {
        "cpu": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Cpu"
        },
        "gpu": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Gpu"
        },
        "object_store_memory": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Object Store Memory"
        }
      },
      "title": "ExecutionOptionsConfig",
      "type": "object"
    },
    "Lifecycle": {
      "properties": {
        "create": {
          "default": true,
          "description": "Whether to create the resource. If set to `False`, the user can manually call `.create` instead.",
          "title": "Create",
          "type": "boolean"
        },
        "wait": {
          "default": true,
          "description": "Whether to wait for the remote Ray cluster to become ready to accept connections. If set to `False`, the user can manually call `.wait` instead.",
          "title": "Wait",
          "type": "boolean"
        },
        "connect": {
          "default": true,
          "description": "Whether to run `ray.init` against the remote Ray cluster. If set to `False`, the user can manually call `.connect` instead.",
          "title": "Connect",
          "type": "boolean"
        },
        "cleanup": {
          "default": "always",
          "description": "Resource cleanup policy. Determines when the resource should be deleted after Dagster step execution or during interruption.",
          "enum": [
            "never",
            "always",
            "on_exception"
          ],
          "title": "Cleanup",
          "type": "string"
        }
      },
      "title": "Lifecycle",
      "type": "object"
    },
    "RayDataExecutionOptions": {
      "properties": {
        "execution_options": {
          "$ref": "#/$defs/ExecutionOptionsConfig"
        },
        "cpu_limit": {
          "default": 5000,
          "title": "Cpu Limit",
          "type": "integer"
        },
        "gpu_limit": {
          "default": 0,
          "title": "Gpu Limit",
          "type": "integer"
        },
        "verbose_progress": {
          "default": true,
          "title": "Verbose Progress",
          "type": "boolean"
        },
        "use_polars": {
          "default": true,
          "title": "Use Polars",
          "type": "boolean"
        }
      },
      "title": "RayDataExecutionOptions",
      "type": "object"
    }
  },
  "description": "Dummy Resource.\nIs useful for testing and local development.\nProvides the same interface as actual Resources.",
  "properties": {
    "lifecycle": {
      "$ref": "#/$defs/Lifecycle",
      "description": "Actions to perform during resource setup."
    },
    "timeout": {
      "default": 600.0,
      "description": "Timeout for Ray readiness in seconds",
      "title": "Timeout",
      "type": "number"
    },
    "ray_init_options": {
      "additionalProperties": true,
      "description": "Additional keyword arguments to pass to `ray.init()` call, such as `runtime_env`, `num_cpus`, etc. Dagster's `EnvVar` is supported. More details in [Ray docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).",
      "title": "Ray Init Options",
      "type": "object"
    },
    "data_execution_options": {
      "$ref": "#/$defs/RayDataExecutionOptions"
    },
    "redis_port": {
      "default": 10001,
      "description": "Redis port for connection. Make sure to match with the actual available port.",
      "title": "Redis Port",
      "type": "integer"
    },
    "dashboard_port": {
      "default": 8265,
      "description": "Dashboard port for connection. Make sure to match with the actual available port.",
      "title": "Dashboard Port",
      "type": "integer"
    },
    "env_vars": {
      "anyOf": [
        {
          "additionalProperties": {
            "type": "string"
          },
          "type": "object"
        },
        {
          "type": "null"
        }
      ],
      "description": "Environment variables to pass to the Ray cluster.",
      "title": "Env Vars"
    },
    "enable_tracing": {
      "default": false,
      "description": "Enable tracing: inject `RAY_PROFILING=1` and `RAY_task_events_report_interval_ms=0` into the Ray cluster configuration. This allows using `ray.timeline()` to fetch recorded task events. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.timeline.html#ray-timeline)",
      "title": "Enable Tracing",
      "type": "boolean"
    },
    "enable_actor_task_logging": {
      "default": false,
      "description": "Enable actor task logging: inject `RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1` into the Ray cluster configuration.",
      "title": "Enable Actor Task Logging",
      "type": "boolean"
    },
    "enable_debug_post_mortem": {
      "default": false,
      "description": "Enable post-mortem debugging: inject `RAY_DEBUG_POST_MORTEM=1` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/ray-distributed-debugger.html)",
      "title": "Enable Debug Post Mortem",
      "type": "boolean"
    },
    "enable_legacy_debugger": {
      "default": false,
      "description": "Enable legacy debugger: inject `RAY_DEBUG=legacy` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/user-guides/debug-apps/ray-debugging.html#using-the-ray-debugger)",
      "title": "Enable Legacy Debugger",
      "type": "boolean"
    }
  },
  "title": "LocalRay",
  "type": "object"
}

Fields:

Attributes

host property
host: str
ray_address property
ray_address: None

Run Launcher

dagster_ray.core.run_launcher.RayRunLauncher

RayRunLauncher(
    address: str,
    metadata: dict[str, Any] | None = None,
    headers: dict[str, Any] | None = None,
    cookies: dict[str, Any] | None = None,
    env_vars: list[str] | None = None,
    runtime_env: dict[str, Any] | None = None,
    num_cpus: int | None = None,
    num_gpus: int | None = None,
    memory: int | None = None,
    resources: dict[str, float] | None = None,
    inst_data: ConfigurableClassData | None = None,
)

Bases: RunLauncher, ConfigurableClass

RunLauncher that submits Dagster runs as isolated Ray jobs to a Ray cluster.

Configuration can be provided via dagster.yaml and individual runs can override settings using the dagster-ray/config tag.

Example

Configure via dagster.yaml

run_launcher:
  module: dagster_ray
  class: RayRunLauncher
  config:
    address: "ray://head-node:10001"
    num_cpus: 2
    num_gpus: 0

Example

Override settings per job

import dagster as dg

@dg.job(
    tags={
        "dagster-ray/config": {
            "num_cpus": 16,
            "num_gpus": 1,
            "runtime_env": {"pip": {"packages": ["torch"]}},
        }
    }
)
def my_job():
    return my_op()

Source code in src/dagster_ray/core/run_launcher.py
def __init__(
    self,
    address: str,
    metadata: dict[str, Any] | None = None,
    headers: dict[str, Any] | None = None,
    cookies: dict[str, Any] | None = None,
    env_vars: list[str] | None = None,
    runtime_env: dict[str, Any] | None = None,
    num_cpus: int | None = None,
    num_gpus: int | None = None,
    memory: int | None = None,
    resources: dict[str, float] | None = None,
    inst_data: ConfigurableClassData | None = None,
):
    self._inst_data = dg._check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)

    self.address = address
    self.metadata = metadata
    self.headers = headers
    self.cookies = cookies
    self.env_vars = env_vars
    self.runtime_env = runtime_env
    self.num_cpus = num_cpus
    self.num_gpus = num_gpus
    self.memory = memory
    self.resources = resources

    super().__init__()

Functions


Executor

dagster_ray.core.executor.ray_executor

ray_executor(init_context: InitExecutorContext) -> Executor

Executes steps by submitting them as Ray jobs.

The steps are started inside the Ray cluster directly. When used together with RayRunLauncher, the executor can inherit the job submission client configuration. This behavior can be disabled by setting inherit_job_submission_client_from_ray_run_launcher to False.

Example

Use ray_executor for the entire code location

import dagster as dg
from dagster_ray import ray_executor

ray_executor = ray_executor.configured(
    {"address": EnvVar("RAY_ADDRESS"), "runtime_env": {"pip": ["polars"]}}
)

defs = dg.Definitions(..., executor=ray_executor])

Example

Override configuration for a specific asset

import dagster as dg

@dg.asset(
    op_tags={"dagster-ray/config": {"num_cpus": 2}}
)
def my_asset(): ...

Source code in src/dagster_ray/core/executor.py
@dg.executor(
    name="ray",
    config_schema=_RAY_EXECUTOR_CONFIG_SCHEMA,
    requirements=multiple_process_executor_requirements(),
)
def ray_executor(init_context: InitExecutorContext) -> Executor:
    """Executes steps by submitting them as Ray jobs.

    The steps are started inside the Ray cluster directly.
    When used together with [`RayRunLauncher`][dagster_ray.core.run_launcher.RayRunLauncher], the executor can inherit the job submission client configuration.
    This behavior can be disabled by setting `inherit_job_submission_client_from_ray_run_launcher` to `False`.

    Example:
        Use `ray_executor` for the entire code location
        ```python
        import dagster as dg
        from dagster_ray import ray_executor

        ray_executor = ray_executor.configured(
            {"address": EnvVar("RAY_ADDRESS"), "runtime_env": {"pip": ["polars"]}}
        )

        defs = dg.Definitions(..., executor=ray_executor])
        ```

    Example:
        Override configuration for a specific asset
        ```python
        import dagster as dg

        @dg.asset(
            op_tags={"dagster-ray/config": {"num_cpus": 2}}
        )
        def my_asset(): ...
        ```
    """
    from ray.job_submission import JobSubmissionClient

    exc_cfg = init_context.executor_config
    ray_cfg = RayExecutorConfig(**exc_cfg["ray"])  # type: ignore

    if ray_cfg.inherit_job_submission_client_from_ray_run_launcher and isinstance(
        init_context.instance.run_launcher, RayRunLauncher
    ):
        # TODO: some RunLauncher config values can be automatically passed to the executor
        client = init_context.instance.run_launcher.client
    else:
        client = JobSubmissionClient(
            ray_cfg.address, metadata=ray_cfg.metadata, headers=ray_cfg.headers, cookies=ray_cfg.cookies
        )

    return StepDelegatingExecutor(
        RayStepHandler(
            client=client,
            env_vars=ray_cfg.env_vars,
            runtime_env=ray_cfg.runtime_env,
            num_cpus=ray_cfg.num_cpus,
            num_gpus=ray_cfg.num_gpus,
            memory=ray_cfg.memory,
            resources=ray_cfg.resources,
        ),
        retries=RetryMode.from_config(exc_cfg["retries"]),  # type: ignore
        max_concurrent=dg._check.opt_int_elem(exc_cfg, "max_concurrent"),
        tag_concurrency_limits=dg._check.opt_list_elem(exc_cfg, "tag_concurrency_limits"),
        should_verify_step=True,
    )

Pipes

Run external Ray scripts as Ray jobs while streaming back logs and metadata into Dagster with Dagster Pipes.

dagster_ray.core.pipes.PipesRayJobClient

PipesRayJobClient(
    address: str | None = None,
    headers: dict[str, Any] | None = None,
    verify: str | bool = True,
    cookies: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    context_injector: PipesContextInjector | None = None,
    message_reader: PipesMessageReader | None = None,
    forward_termination: bool = True,
    timeout: float = 600,
    poll_interval: float = 1,
)

Bases: PipesClient, TreatAsResourceParam

A Pipes client for running Ray jobs on remote clusters.

Starts the job directly on the Ray cluster and reads the logs from the job.

Uses JobSubmissionClient to run and monitor the job. Learn more about it here.

Parameters:

  • address (str | None, default: None ) –

    Ray dashboard HTTP address. If unspecified, connects to a local Ray cluster or uses the RAY_ADDRESS environment variable.

  • headers (dict[str, Any] | None, default: None ) –

    HTTP headers for Ray Dashboard requests. Passed to JobSubmissionClient.

  • verify (str | bool, default: True ) –

    Whether to verify TLS certificate. Passed to JobSubmissionClient.

  • cookies (dict[str, Any] | None, default: None ) –

    HTTP cookies for Ray Dashboard requests. Passed to JobSubmissionClient.

  • metadata (dict[str, Any] | None, default: None ) –

    Ray Job metadata. Passed to JobSubmissionClient.

  • context_injector (PipesContextInjector | None, default: None ) –

    A context injector to use to inject context into the Ray job. Defaults to PipesEnvContextInjector.

  • message_reader (PipesMessageReader | None, default: None ) –

    A message reader to use when reading Pipes messages. from the glue job run. Defaults to PipesRayJobMessageReader.

  • forward_termination (bool, default: True ) –

    Whether to cancel the Ray job run when the Dagster process receives a termination signal.

  • timeout (float, default: 600 ) –

    Timeout for various internal interactions with the Ray job.

  • poll_interval (float, default: 1 ) –

    Interval at which to poll Ray for status updates. Is useful when running in a local environment.

Source code in src/dagster_ray/core/pipes.py
def __init__(
    self,
    address: str | None = None,
    headers: dict[str, Any] | None = None,
    verify: str | bool = True,
    cookies: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    context_injector: PipesContextInjector | None = None,
    message_reader: PipesMessageReader | None = None,
    forward_termination: bool = True,
    timeout: float = 600,
    poll_interval: float = 1,
):
    from ray.job_submission import JobSubmissionClient

    self.client = JobSubmissionClient(
        address=address,
        headers=headers,
        verify=verify,
        cookies=cookies,
        metadata=metadata,
    )
    self._context_injector = context_injector or PipesEnvContextInjector()
    self._message_reader = message_reader or PipesRayJobMessageReader(
        job_submission_client_kwargs={
            "headers": headers,
            "verify": verify,
            "cookies": cookies,
            "metadata": metadata,
        }
    )

    self.forward_termination = check.bool_param(forward_termination, "forward_termination")
    self.timeout = check.int_param(timeout, "timeout")
    self.poll_interval = check.int_param(poll_interval, "poll_interval")

Functions

run
run(
    *, context: OpOrAssetExecutionContext, submit_job_params: SubmitJobParams, extras: PipesExtras | None = None
) -> PipesClientCompletedInvocation

Execute a RayJob, enriched with the Pipes protocol.

Parameters:

Source code in src/dagster_ray/core/pipes.py
def run(  # type: ignore
    self,
    *,
    context: OpOrAssetExecutionContext,
    submit_job_params: SubmitJobParams,
    extras: PipesExtras | None = None,
) -> PipesClientCompletedInvocation:
    """
    Execute a RayJob, enriched with the Pipes protocol.

    Args:
        context: Current Dagster op or asset context.
        submit_job_params: Parameters for [`JobSubmissionClient.submit_job`][ray.job_submission.JobSubmissionClient.submit_job].
        extras: Additional information to pass to the Pipes session, retrievable via [`PipesContext.get_extras`](https://docs.dagster.io/integrations/libraries/pipes/dagster-pipes#dagster_pipes.PipesContext.get_extra).
    """

    with open_pipes_session(
        context=context,
        message_reader=self._message_reader,
        context_injector=self._context_injector,
        extras=extras,
    ) as session:
        enriched_submit_job_params = self._enrich_submit_job_params(context, session, submit_job_params)

        job_id = self._start(context, session, enriched_submit_job_params)

        try:
            # self._read_messages(context, job_id)
            self._wait_for_completion(context, job_id)
            return PipesClientCompletedInvocation(session, metadata={"Ray Job ID": job_id})

        except DagsterExecutionInterruptedError:
            if self.forward_termination:
                context.log.warning(f"[pipes] Dagster process interrupted! Will terminate RayJob {job_id}.")
                self._terminate(context, job_id)
            raise

dagster_ray.core.pipes.PipesRayJobMessageReader

PipesRayJobMessageReader(job_submission_client_kwargs: dict[str, Any] | None = None)

Bases: PipesMessageReader

Source code in src/dagster_ray/core/pipes.py
def __init__(self, job_submission_client_kwargs: dict[str, Any] | None = None):
    self._job_submission_client_kwargs = job_submission_client_kwargs
    self._thread: threading.Thread | None = None
    self.session_closed = threading.Event()
    self._job_id = None
    self._client = None
    self.thread_ready = threading.Event()

    self.completed = threading.Event()

Functions


IO Manager

Send data between Dagster steps while they are running inside a Ray cluster.

dagster_ray.core.io_manager.RayIOManager

Bases: ConfigurableIOManager

IO Manager that stores intermediate values in Ray's object store.

The RayIOManager allows storing and retrieving intermediate values in Ray's distributed object store, making it ideal for use with RayRunLauncher and ray_executor. It works by storing Dagster step keys in a global Ray actor that maintains a mapping between step keys and Ray ObjectRefs.

Attributes:

  • address (str | None) –

    Ray cluster address. If provided, will initialize Ray connection. If None, assumes Ray is already initialized.

Example

Basic usage

import dagster as dg
from dagster_ray import RayIOManager

@dg.asset(io_manager_key="ray_io_manager")
def upstream() -> int:
    return 42

@dg.asset
def downstream(upstream: int):
    return upstream * 2

definitions = dg.Definitions(
    assets=[upstream, downstream],
    resources={"ray_io_manager": RayIOManager()}
)

Example

With Ray cluster address

ray_io_manager = RayIOManager(address="ray://head-node:10001")

Info
  • Works with picklable Python objects
  • Supports partitioned assets and partition mappings
  • Uses Ray's automatic object movement for fault tolerance
  • Objects are stored with the Ray actor as owner for lifecycle management