Skip to content

Core API Reference

Core dagster-ray APIs for using external Ray clusters. Learn how to use it here.


Ray Resources

RayResource can be used to connect to external Ray clusters when provided as a Dagster resource, or as a type annotation (all other Ray resources in dagster-ray inherit from RayResource)

dagster_ray.RayResource pydantic-model

Bases: ConfigurableResource, ABC

Base class for Ray Resources providing a common interface for Ray cluster management.

This abstract base class defines the interface that all Ray resources must implement, providing a backend-agnostic way to interact with Ray clusters. Concrete implementations include LocalRay for local development and KubeRay resources for Kubernetes deployments.

The RayResource handles the lifecycle of Ray clusters including creation, connection, and cleanup, with configurable policies for each stage.

Example

Use as a type annotation for backend-agnostic code

import dagster as dg
from dagster_ray import RayResource

@dg.asset
def my_asset(ray_cluster: RayResource):
    # Works with any Ray backend
    import ray
    return ray.get(ray.put("hello"))

Example

Manual lifecycle management

from dagster_ray import Lifecycle

ray_resource = SomeRayResource(
    lifecycle=Lifecycle(
        create=False,  # Don't auto-create
        connect=False  # Don't auto-connect
    )
)

Note

This is an abstract class and cannot be instantiated directly. Use concrete implementations like LocalRay or KubeRayCluster instead.

Show JSON schema:
{
  "$defs": {
    "ExecutionOptionsConfig": {
      "properties": {
        "cpu": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Cpu"
        },
        "gpu": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Gpu"
        },
        "object_store_memory": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "title": "Object Store Memory"
        }
      },
      "title": "ExecutionOptionsConfig",
      "type": "object"
    },
    "Lifecycle": {
      "properties": {
        "create": {
          "default": true,
          "description": "Whether to create the resource. If set to `False`, the user can manually call `.create` instead.",
          "title": "Create",
          "type": "boolean"
        },
        "wait": {
          "default": true,
          "description": "Whether to wait for the remote Ray cluster to become ready to accept connections. If set to `False`, the user can manually call `.wait` instead.",
          "title": "Wait",
          "type": "boolean"
        },
        "connect": {
          "default": true,
          "description": "Whether to run `ray.init` against the remote Ray cluster. If set to `False`, the user can manually call `.connect` instead.",
          "title": "Connect",
          "type": "boolean"
        },
        "cleanup": {
          "default": "always",
          "description": "Resource cleanup policy. Determines when the resource should be deleted after Dagster step execution or during interruption.",
          "enum": [
            "never",
            "always",
            "on_exception"
          ],
          "title": "Cleanup",
          "type": "string"
        }
      },
      "title": "Lifecycle",
      "type": "object"
    },
    "RayDataExecutionOptions": {
      "properties": {
        "execution_options": {
          "$ref": "#/$defs/ExecutionOptionsConfig"
        },
        "cpu_limit": {
          "default": 5000,
          "title": "Cpu Limit",
          "type": "integer"
        },
        "gpu_limit": {
          "default": 0,
          "title": "Gpu Limit",
          "type": "integer"
        },
        "verbose_progress": {
          "default": true,
          "title": "Verbose Progress",
          "type": "boolean"
        },
        "use_polars": {
          "default": true,
          "title": "Use Polars",
          "type": "boolean"
        }
      },
      "title": "RayDataExecutionOptions",
      "type": "object"
    }
  },
  "description": "Base class for Ray Resources providing a common interface for Ray cluster management.\n\nThis abstract base class defines the interface that all Ray resources must implement,\nproviding a backend-agnostic way to interact with Ray clusters. Concrete implementations\ninclude LocalRay for local development and KubeRay resources for Kubernetes deployments.\n\nThe RayResource handles the lifecycle of Ray clusters including creation, connection,\nand cleanup, with configurable policies for each stage.\n\nExample:\n    Use as a type annotation for backend-agnostic code\n    ```python\n    import dagster as dg\n    from dagster_ray import RayResource\n\n    @dg.asset\n    def my_asset(ray_cluster: RayResource):\n        # Works with any Ray backend\n        import ray\n        return ray.get(ray.put(\"hello\"))\n    ```\n\nExample:\n    Manual lifecycle management\n    ```python\n    from dagster_ray import Lifecycle\n\n    ray_resource = SomeRayResource(\n        lifecycle=Lifecycle(\n            create=False,  # Don't auto-create\n            connect=False  # Don't auto-connect\n        )\n    )\n    ```\n\nNote:\n    This is an abstract class and cannot be instantiated directly. Use concrete\n    implementations like LocalRay or KubeRayCluster instead.",
  "properties": {
    "lifecycle": {
      "$ref": "#/$defs/Lifecycle",
      "description": "Actions to perform during resource setup."
    },
    "timeout": {
      "default": 600.0,
      "description": "Timeout for Ray readiness in seconds",
      "title": "Timeout",
      "type": "number"
    },
    "ray_init_options": {
      "description": "Additional keyword arguments to pass to `ray.init()` call, such as `runtime_env`, `num_cpus`, etc. Dagster's `EnvVar` is supported. More details in [Ray docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).",
      "title": "Ray Init Options",
      "type": "object"
    },
    "data_execution_options": {
      "$ref": "#/$defs/RayDataExecutionOptions"
    },
    "redis_port": {
      "default": 10001,
      "description": "Redis port for connection. Make sure to match with the actual available port.",
      "title": "Redis Port",
      "type": "integer"
    },
    "dashboard_port": {
      "default": 8265,
      "description": "Dashboard port for connection. Make sure to match with the actual available port.",
      "title": "Dashboard Port",
      "type": "integer"
    },
    "env_vars": {
      "anyOf": [
        {
          "additionalProperties": {
            "type": "string"
          },
          "type": "object"
        },
        {
          "type": "null"
        }
      ],
      "description": "Environment variables to pass to the Ray cluster.",
      "title": "Env Vars"
    },
    "enable_tracing": {
      "default": false,
      "description": "Enable tracing: inject `RAY_PROFILING=1` and `RAY_task_events_report_interval_ms=0` into the Ray cluster configuration. This allows using `ray.timeline()` to fetch recorded task events. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.timeline.html#ray-timeline)",
      "title": "Enable Tracing",
      "type": "boolean"
    },
    "enable_actor_task_logging": {
      "default": false,
      "description": "Enable actor task logging: inject `RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1` into the Ray cluster configuration.",
      "title": "Enable Actor Task Logging",
      "type": "boolean"
    },
    "enable_debug_post_mortem": {
      "default": false,
      "description": "Enable post-mortem debugging: inject `RAY_DEBUG_POST_MORTEM=1` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/ray-distributed-debugger.html)",
      "title": "Enable Debug Post Mortem",
      "type": "boolean"
    },
    "enable_legacy_debugger": {
      "default": false,
      "description": "Enable legacy debugger: inject `RAY_DEBUG=legacy` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/user-guides/debug-apps/ray-debugging.html#using-the-ray-debugger)",
      "title": "Enable Legacy Debugger",
      "type": "boolean"
    }
  },
  "title": "RayResource",
  "type": "object"
}

Fields:

Attributes

lifecycle pydantic-field
lifecycle: Lifecycle

Actions to perform during resource setup.

timeout pydantic-field
timeout: float = 600.0

Timeout for Ray readiness in seconds

ray_init_options pydantic-field
ray_init_options: dict[str, Any]

Additional keyword arguments to pass to ray.init() call, such as runtime_env, num_cpus, etc. Dagster's EnvVar is supported. More details in Ray docs.

data_execution_options pydantic-field
data_execution_options: RayDataExecutionOptions
redis_port pydantic-field
redis_port: int = 10001

Redis port for connection. Make sure to match with the actual available port.

dashboard_port pydantic-field
dashboard_port: int = 8265

Dashboard port for connection. Make sure to match with the actual available port.

env_vars pydantic-field
env_vars: dict[str, str] | None

Environment variables to pass to the Ray cluster.

enable_tracing pydantic-field
enable_tracing: bool = False

Enable tracing: inject RAY_PROFILING=1 and RAY_task_events_report_interval_ms=0 into the Ray cluster configuration. This allows using ray.timeline() to fetch recorded task events. Learn more: KubeRay docs

enable_actor_task_logging pydantic-field
enable_actor_task_logging: bool = False

Enable actor task logging: inject RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1 into the Ray cluster configuration.

enable_debug_post_mortem pydantic-field
enable_debug_post_mortem: bool = False

Enable post-mortem debugging: inject RAY_DEBUG_POST_MORTEM=1 into the Ray cluster configuration. Learn more: KubeRay docs

enable_legacy_debugger pydantic-field
enable_legacy_debugger: bool = False

Enable legacy debugger: inject RAY_DEBUG=legacy into the Ray cluster configuration. Learn more: KubeRay docs

_context pydantic-field
_context: BaseContext | None
context property
context: BaseContext
host abstractmethod property
host: str
name abstractmethod property
name: str
display_name property
display_name: str
ray_address property
ray_address: str
dashboard_url property
dashboard_url: str
runtime_job_id property
runtime_job_id: str

Returns the Ray Job ID for the current job which was created with ray.init(). :return:

created property
created: bool
ready property
ready: bool
connected property
connected: bool

Functions

yield_for_execution
yield_for_execution(context: InitResourceContext) -> Generator[Self, None, None]
Source code in src/dagster_ray/_base/resources.py
@contextlib.contextmanager
def yield_for_execution(self, context: dg.InitResourceContext) -> Generator[Self, None, None]:
    exception_occurred = None
    try:
        if self.lifecycle.create:
            self._create(context)
            if self.lifecycle.wait:
                self._wait(context)
                if self.lifecycle.connect:
                    self._connect(context)
        yield self
    except BaseException as e:
        exception_occurred = e
        raise
    finally:
        self.cleanup(context, exception_occurred)
_create
_create(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
def _create(self, context: AnyDagsterContext):
    assert context.log is not None
    if not self.created:
        try:
            self.create(context)
            context.log.info(f"Created {self.display_name}.")
        except BaseException:
            context.log.exception(f"Failed to create {self.display_name}")
            raise
_wait
_wait(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
def _wait(self, context: AnyDagsterContext):
    assert context.log is not None
    self._create(context)
    if not self.ready:
        context.log.info(f"Waiting for {self.display_name} to become ready (timeout={self.timeout:.0f}s)...")
        try:
            self.wait(context)
        except BaseException:
            context.log.exception(f"Failed to wait for {self.display_name} readiness")
            raise
_connect
_connect(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
def _connect(self, context: AnyDagsterContext):
    assert context.log is not None
    self._wait(context)
    if not self.connected:
        try:
            self.connect(context)
        except BaseException:
            context.log.exception(f"Failed to connect to {self.display_name}")
            raise
        context.log.info(f"Initialized Ray Client with {self.display_name}")
create
create(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
def create(self, context: AnyDagsterContext):
    pass
wait
wait(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
def wait(self, context: AnyDagsterContext):
    pass
connect
connect(context: AnyDagsterContext) -> BaseContext
Source code in src/dagster_ray/_base/resources.py
@retry(stop=stop_after_delay(120), retry=retry_if_exception_type(ConnectionError), reraise=True)
def connect(self, context: AnyDagsterContext) -> RayBaseContext:
    assert context.log is not None

    import ray

    init_options = _process_dagster_env_vars(self.ray_init_options.copy())

    # cleanup None values from runtime_env.env_vars since Ray doesn't like them

    if "runtime_env" in init_options and "env_vars" in init_options["runtime_env"]:
        init_options["runtime_env"]["env_vars"] = {
            k: v for k, v in init_options["runtime_env"]["env_vars"].items() if v is not None
        }

    init_options["runtime_env"] = init_options.get("runtime_env", {})
    init_options["runtime_env"]["env_vars"] = init_options["runtime_env"].get("env_vars", {})

    for var, value in self.get_env_vars_to_inject().items():
        init_options["runtime_env"]["env_vars"][var] = value

    self.data_execution_options.apply()

    self._context = ray.init(
        address=self.ray_address,
        **init_options,
    )
    self.data_execution_options.apply()
    self.data_execution_options.apply_remote()
    context.log.info("Initialized Ray in client mode!")
    return cast("RayBaseContext", self._context)
delete
delete(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
def delete(self, context: AnyDagsterContext):
    pass
cleanup
cleanup(context: AnyDagsterContext, exception: BaseException | None)
Source code in src/dagster_ray/_base/resources.py
def cleanup(self, context: AnyDagsterContext, exception: BaseException | None):  # noqa: UP007
    assert context.log is not None

    if self.lifecycle.cleanup == "never":
        to_delete = False
    elif not self.created:
        to_delete = False
    elif self.lifecycle.cleanup == "always":
        to_delete = True
    elif self.lifecycle.cleanup == "on_exception":
        to_delete = exception is not None
    else:
        to_delete = False

    if to_delete:
        self.delete(context)
        context.log.info(f'Deleted {self.display_name} according to cleanup policy "{self.lifecycle.cleanup}"')

    if self.connected and hasattr(self, "_context") and self._context is not None:
        self._context.disconnect()
get_dagster_tags
get_dagster_tags(context: AnyDagsterContext) -> dict[str, str]
Source code in src/dagster_ray/_base/resources.py
def get_dagster_tags(self, context: AnyDagsterContext) -> dict[str, str]:
    tags = get_dagster_tags(context)
    return tags
get_env_vars_to_inject
get_env_vars_to_inject() -> dict[str, str]
Source code in src/dagster_ray/_base/resources.py
def get_env_vars_to_inject(self) -> dict[str, str]:
    vars: dict[str, str] = self.env_vars or {}
    if self.enable_debug_post_mortem:
        vars["RAY_DEBUG_POST_MORTEM"] = "1"
    if self.enable_tracing:
        vars["RAY_PROFILING"] = "1"
        vars["RAY_task_events_report_interval_ms"] = "0"
    if self.enable_actor_task_logging:
        vars["RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING"] = "1"
    if self.enable_legacy_debugger:
        vars["RAY_DEBUG"] = "legacy"
    return vars

The LocalRay can be used to connect to a local Ray cluster.

dagster_ray.core.resources.LocalRay pydantic-model

Bases: RayResource

Dummy Resource. Is useful for testing and local development. Provides the same interface as actual Resources.

Fields:

Attributes

host property
host: str
ray_address property
ray_address: None

Run Launcher

dagster_ray.core.run_launcher.RayRunLauncher

RayRunLauncher(
    address: str,
    metadata: dict[str, Any] | None = None,
    headers: dict[str, Any] | None = None,
    cookies: dict[str, Any] | None = None,
    env_vars: list[str] | None = None,
    runtime_env: dict[str, Any] | None = None,
    num_cpus: int | None = None,
    num_gpus: int | None = None,
    memory: int | None = None,
    resources: dict[str, float] | None = None,
    inst_data: ConfigurableClassData | None = None,
)

Bases: RunLauncher, ConfigurableClass

RunLauncher that submits Dagster runs as isolated Ray jobs to a Ray cluster.

Configuration can be provided via dagster.yaml and individual runs can override settings using the dagster-ray/config tag.

Example

Configure via dagster.yaml

run_launcher:
  module: dagster_ray
  class: RayRunLauncher
  config:
    address: "ray://head-node:10001"
    num_cpus: 2
    num_gpus: 0

Example

Override settings per job

import dagster as dg

@dg.job(
    tags={
        "dagster-ray/config": {
            "num_cpus": 16,
            "num_gpus": 1,
            "runtime_env": {"pip": {"packages": ["torch"]}},
        }
    }
)
def my_job():
    return my_op()

Source code in src/dagster_ray/core/run_launcher.py
def __init__(
    self,
    address: str,
    metadata: dict[str, Any] | None = None,
    headers: dict[str, Any] | None = None,
    cookies: dict[str, Any] | None = None,
    env_vars: list[str] | None = None,
    runtime_env: dict[str, Any] | None = None,
    num_cpus: int | None = None,
    num_gpus: int | None = None,
    memory: int | None = None,
    resources: dict[str, float] | None = None,
    inst_data: ConfigurableClassData | None = None,
):
    self._inst_data = dg._check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)

    self.address = address
    self.metadata = metadata
    self.headers = headers
    self.cookies = cookies
    self.env_vars = env_vars
    self.runtime_env = runtime_env
    self.num_cpus = num_cpus
    self.num_gpus = num_gpus
    self.memory = memory
    self.resources = resources

    super().__init__()

Functions


Executor

dagster_ray.core.executor.ray_executor

ray_executor(init_context: InitExecutorContext) -> Executor

Executes steps by submitting them as Ray jobs.

The steps are started inside the Ray cluster directly. When used together with RayRunLauncher, the executor can inherit the job submission client configuration. This behavior can be disabled by setting inherit_job_submission_client_from_ray_run_launcher to False.

Example

Use ray_executor for the entire code location

import dagster as dg
from dagster_ray import ray_executor

ray_executor = ray_executor.configured(
    {"address": EnvVar("RAY_ADDRESS"), "runtime_env": {"pip": ["polars"]}}
)

defs = dg.Definitions(..., executor=ray_executor])

Example

Override configuration for a specific asset

import dagster as dg

@dg.asset(
    op_tags={"dagster-ray/config": {"num_cpus": 2}}
)
def my_asset(): ...

Source code in src/dagster_ray/core/executor.py
@dg.executor(
    name="ray",
    config_schema=_RAY_EXECUTOR_CONFIG_SCHEMA,
    requirements=multiple_process_executor_requirements(),
)
def ray_executor(init_context: InitExecutorContext) -> Executor:
    """Executes steps by submitting them as Ray jobs.

    The steps are started inside the Ray cluster directly.
    When used together with [`RayRunLauncher`][dagster_ray.core.run_launcher.RayRunLauncher], the executor can inherit the job submission client configuration.
    This behavior can be disabled by setting `inherit_job_submission_client_from_ray_run_launcher` to `False`.

    Example:
        Use `ray_executor` for the entire code location
        ```python
        import dagster as dg
        from dagster_ray import ray_executor

        ray_executor = ray_executor.configured(
            {"address": EnvVar("RAY_ADDRESS"), "runtime_env": {"pip": ["polars"]}}
        )

        defs = dg.Definitions(..., executor=ray_executor])
        ```

    Example:
        Override configuration for a specific asset
        ```python
        import dagster as dg

        @dg.asset(
            op_tags={"dagster-ray/config": {"num_cpus": 2}}
        )
        def my_asset(): ...
        ```
    """
    from ray.job_submission import JobSubmissionClient

    exc_cfg = init_context.executor_config
    ray_cfg = RayExecutorConfig(**exc_cfg["ray"])  # type: ignore

    if ray_cfg.inherit_job_submission_client_from_ray_run_launcher and isinstance(
        init_context.instance.run_launcher, RayRunLauncher
    ):
        # TODO: some RunLauncher config values can be automatically passed to the executor
        client = init_context.instance.run_launcher.client
    else:
        client = JobSubmissionClient(
            ray_cfg.address, metadata=ray_cfg.metadata, headers=ray_cfg.headers, cookies=ray_cfg.cookies
        )

    return StepDelegatingExecutor(
        RayStepHandler(
            client=client,
            env_vars=ray_cfg.env_vars,
            runtime_env=ray_cfg.runtime_env,
            num_cpus=ray_cfg.num_cpus,
            num_gpus=ray_cfg.num_gpus,
            memory=ray_cfg.memory,
            resources=ray_cfg.resources,
        ),
        retries=RetryMode.from_config(exc_cfg["retries"]),  # type: ignore
        max_concurrent=dg._check.opt_int_elem(exc_cfg, "max_concurrent"),
        tag_concurrency_limits=dg._check.opt_list_elem(exc_cfg, "tag_concurrency_limits"),
        should_verify_step=True,
    )

Pipes

Run external Ray scripts as Ray jobs while streaming back logs and metadata into Dagster with Dagster Pipes.

dagster_ray.core.pipes.PipesRayJobClient

PipesRayJobClient(
    client: JobSubmissionClient,
    context_injector: PipesContextInjector | None = None,
    message_reader: PipesMessageReader | None = None,
    forward_termination: bool = True,
    timeout: float = 600,
    poll_interval: float = 1,
)

Bases: PipesClient, TreatAsResourceParam

A Pipes client for running Ray jobs on remote clusters.

Starts the job directly on the Ray cluster and reads the logs from the job.

Parameters:

  • client (JobSubmissionClient) –

    The Ray job submission client

  • context_injector (Optional[PipesContextInjector], default: None ) –

    A context injector to use to inject context into the Ray job. Defaults to PipesEnvContextInjector.

  • message_reader (Optional[PipesMessageReader], default: None ) –

    A message reader to use to read messages from the glue job run. Defaults to PipesRayJobMessageReader.

  • forward_termination (bool, default: True ) –

    Whether to cancel the Ray job run when the Dagster process receives a termination signal.

  • timeout (int, default: 600 ) –

    Timeout for various internal interactions with the Kubernetes RayJob.

  • poll_interval (int, default: 1 ) –

    Interval at which to poll Kubernetes for status updates. Is useful when running in a local environment.

Source code in src/dagster_ray/core/pipes.py
def __init__(
    self,
    client: JobSubmissionClient,
    context_injector: PipesContextInjector | None = None,
    message_reader: PipesMessageReader | None = None,
    forward_termination: bool = True,
    timeout: float = 600,
    poll_interval: float = 1,
):
    self.client = client
    self._context_injector = context_injector or PipesEnvContextInjector()
    self._message_reader = message_reader or PipesRayJobMessageReader()

    self.forward_termination = check.bool_param(forward_termination, "forward_termination")
    self.timeout = check.int_param(timeout, "timeout")
    self.poll_interval = check.int_param(poll_interval, "poll_interval")

    self._job_submission_client: JobSubmissionClient | None = None

Functions

run
run(
    *, context: OpOrAssetExecutionContext, submit_job_params: SubmitJobParams, extras: PipesExtras | None = None
) -> PipesClientCompletedInvocation

Execute a RayJob, enriched with the Pipes protocol.

Parameters:

  • context (OpExecutionContext) –

    Current Dagster op or asset context.

  • submit_job_params (Dict[str, Any]) –

    RayJob specification. API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>_.

  • extras (Optional[Dict[str, Any]], default: None ) –

    Additional information to pass to the Pipes session.

Source code in src/dagster_ray/core/pipes.py
def run(  # type: ignore
    self,
    *,
    context: OpOrAssetExecutionContext,
    submit_job_params: SubmitJobParams,
    extras: PipesExtras | None = None,
) -> PipesClientCompletedInvocation:
    """
    Execute a RayJob, enriched with the Pipes protocol.

    Args:
        context (OpExecutionContext): Current Dagster op or asset context.
        submit_job_params (Dict[str, Any]): RayJob specification. `API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>`_.
        extras (Optional[Dict[str, Any]]): Additional information to pass to the Pipes session.
    """

    with open_pipes_session(
        context=context,
        message_reader=self._message_reader,
        context_injector=self._context_injector,
        extras=extras,
    ) as session:
        enriched_submit_job_params = self._enrich_submit_job_params(context, session, submit_job_params)

        job_id = self._start(context, session, enriched_submit_job_params)

        try:
            # self._read_messages(context, job_id)
            self._wait_for_completion(context, job_id)
            return PipesClientCompletedInvocation(session, metadata={"Ray Job ID": job_id})

        except DagsterExecutionInterruptedError:
            if self.forward_termination:
                context.log.warning(f"[pipes] Dagster process interrupted! Will terminate RayJob {job_id}.")
                self._terminate(context, job_id)
            raise

dagster_ray.core.pipes.PipesRayJobMessageReader

PipesRayJobMessageReader(job_submission_client_kwargs: dict[str, Any] | None = None)

Bases: PipesMessageReader

Source code in src/dagster_ray/core/pipes.py
def __init__(self, job_submission_client_kwargs: dict[str, Any] | None = None):
    self._job_submission_client_kwargs = job_submission_client_kwargs
    self._thread: threading.Thread | None = None
    self.session_closed = threading.Event()
    self._job_id = None
    self._client = None
    self.thread_ready = threading.Event()

    self.completed = threading.Event()

Functions


IO Manager

Send data between Dagster steps while they are running inside a Ray cluster.

dagster_ray.core.io_manager.RayIOManager

Bases: ConfigurableIOManager

IO Manager that stores intermediate values in Ray's object store.

The RayIOManager allows storing and retrieving intermediate values in Ray's distributed object store, making it ideal for use with RayRunLauncher and ray_executor. It works by storing Dagster step keys in a global Ray actor that maintains a mapping between step keys and Ray ObjectRefs.

Attributes:

  • address (str | None) –

    Ray cluster address. If provided, will initialize Ray connection. If None, assumes Ray is already initialized.

Example

Basic usage

import dagster as dg
from dagster_ray import RayIOManager

@dg.asset(io_manager_key="ray_io_manager")
def upstream() -> int:
    return 42

@dg.asset
def downstream(upstream: int):
    return upstream * 2

definitions = dg.Definitions(
    assets=[upstream, downstream],
    resources={"ray_io_manager": RayIOManager()}
)

Example

With Ray cluster address

ray_io_manager = RayIOManager(address="ray://head-node:10001")

Info
  • Works with picklable Python objects
  • Supports partitioned assets and partition mappings
  • Uses Ray's automatic object movement for fault tolerance
  • Objects are stored with the Ray actor as owner for lifecycle management