Skip to content

Core API Reference

Core dagster-ray APIs for using external Ray clusters. Learn how to use it here.


Misc

LocalRay

Bases: BaseRayResource

Dummy Resource. Is useful for testing and local development. Provides the same interface as actual Resources.

Attributes

host property

host: str

ray_address property

ray_address: None

Run Launcher

RayRunLauncher

RayRunLauncher(
    address: str,
    metadata: dict[str, Any] | None = None,
    headers: dict[str, Any] | None = None,
    cookies: dict[str, Any] | None = None,
    env_vars: list[str] | None = None,
    runtime_env: dict[str, Any] | None = None,
    num_cpus: int | None = None,
    num_gpus: int | None = None,
    memory: int | None = None,
    resources: dict[str, float] | None = None,
    inst_data: ConfigurableClassData | None = None,
)

Bases: RunLauncher, ConfigurableClass

RunLauncher that submits Dagster runs as isolated Ray jobs to a Ray cluster.

Configuration can be provided via dagster.yaml and individual runs can override settings using the dagster-ray/config tag.

Example

Configure via dagster.yaml

run_launcher:
  module: dagster_ray
  class: RayRunLauncher
  config:
    address: "ray://head-node:10001"
    num_cpus: 2
    num_gpus: 0

Example

Override settings per job

import dagster as dg

@dg.job(
    tags={
        "dagster-ray/config": {
            "num_cpus": 16,
            "num_gpus": 1,
            "runtime_env": {"pip": {"packages": ["torch"]}},
        }
    }
)
def my_job():
    return my_op()

Source code in src/dagster_ray/run_launcher.py
def __init__(
    self,
    address: str,
    metadata: dict[str, Any] | None = None,
    headers: dict[str, Any] | None = None,
    cookies: dict[str, Any] | None = None,
    env_vars: list[str] | None = None,
    runtime_env: dict[str, Any] | None = None,
    num_cpus: int | None = None,
    num_gpus: int | None = None,
    memory: int | None = None,
    resources: dict[str, float] | None = None,
    inst_data: ConfigurableClassData | None = None,
):
    self._inst_data = dg._check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)

    self.address = address
    self.metadata = metadata
    self.headers = headers
    self.cookies = cookies
    self.env_vars = env_vars
    self.runtime_env = runtime_env
    self.num_cpus = num_cpus
    self.num_gpus = num_gpus
    self.memory = memory
    self.resources = resources

    super().__init__()

Functions


Executor

ray_executor

ray_executor(init_context: InitExecutorContext) -> Executor

Executes steps by submitting them as Ray jobs.

The steps are started inside the Ray cluster directly. When used together with the RayRunLauncher, the executor can inherit the job submission client configuration. This behavior can be disabled by setting inherit_job_submission_client_from_ray_run_launcher to False.

Example

Use ray_executor for the entire code location

import dagster as dg
from dagster_ray import ray_executor

ray_executor = ray_executor.configured(
    {"address": EnvVar("RAY_ADDRESS"), "runtime_env": {"pip": ["polars"]}}
)

defs = dg.Definitions(..., executor=ray_executor])

Example

Override configuration for a specific asset

import dagster as dg

@dg.asset(
    op_tags={"dagster-ray/config": {"num_cpus": 2}}
)
def my_asset(): ...

Source code in src/dagster_ray/executor.py
@dg.executor(
    name="ray",
    config_schema=_RAY_EXECUTOR_CONFIG_SCHEMA,
    requirements=multiple_process_executor_requirements(),
)
def ray_executor(init_context: InitExecutorContext) -> Executor:
    """Executes steps by submitting them as Ray jobs.

    The steps are started inside the Ray cluster directly.
    When used together with the `RayRunLauncher`, the executor can inherit the job submission client configuration.
    This behavior can be disabled by setting `inherit_job_submission_client_from_ray_run_launcher` to `False`.

    Example:
        Use `ray_executor` for the entire code location
        ```python
        import dagster as dg
        from dagster_ray import ray_executor

        ray_executor = ray_executor.configured(
            {"address": EnvVar("RAY_ADDRESS"), "runtime_env": {"pip": ["polars"]}}
        )

        defs = dg.Definitions(..., executor=ray_executor])
        ```

    Example:
        Override configuration for a specific asset
        ```python
        import dagster as dg

        @dg.asset(
            op_tags={"dagster-ray/config": {"num_cpus": 2}}
        )
        def my_asset(): ...
        ```
    """
    from ray.job_submission import JobSubmissionClient

    exc_cfg = init_context.executor_config
    ray_cfg = RayExecutorConfig(**exc_cfg["ray"])  # type: ignore

    if ray_cfg.inherit_job_submission_client_from_ray_run_launcher and isinstance(
        init_context.instance.run_launcher, RayRunLauncher
    ):
        # TODO: some RunLauncher config values can be automatically passed to the executor
        client = init_context.instance.run_launcher.client
    else:
        client = JobSubmissionClient(
            ray_cfg.address, metadata=ray_cfg.metadata, headers=ray_cfg.headers, cookies=ray_cfg.cookies
        )

    return StepDelegatingExecutor(
        RayStepHandler(
            client=client,
            env_vars=ray_cfg.env_vars,
            runtime_env=ray_cfg.runtime_env,
            num_cpus=ray_cfg.num_cpus,
            num_gpus=ray_cfg.num_gpus,
            memory=ray_cfg.memory,
            resources=ray_cfg.resources,
        ),
        retries=RetryMode.from_config(exc_cfg["retries"]),  # type: ignore
        max_concurrent=dg._check.opt_int_elem(exc_cfg, "max_concurrent"),
        tag_concurrency_limits=dg._check.opt_list_elem(exc_cfg, "tag_concurrency_limits"),
        should_verify_step=True,
    )

Pipes

Run external Ray scripts as Ray jobs while streaming back logs and metadata into Dagster.

PipesRayJobClient

PipesRayJobClient(
    client: JobSubmissionClient,
    context_injector: PipesContextInjector | None = None,
    message_reader: PipesMessageReader | None = None,
    forward_termination: bool = True,
    timeout: float = 600,
    poll_interval: float = 1,
)

Bases: PipesClient, TreatAsResourceParam

A Pipes client for running Ray jobs on remote clusters.

Starts the job directly on the Ray cluster and reads the logs from the job.

Parameters:

Name Type Description Default
client JobSubmissionClient

The Ray job submission client

required
context_injector Optional[PipesContextInjector]

A context injector to use to inject context into the Ray job. Defaults to PipesEnvContextInjector.

None
message_reader Optional[PipesMessageReader]

A message reader to use to read messages from the glue job run. Defaults to PipesRayJobMessageReader.

None
forward_termination bool

Whether to cancel the RayJob job run when the Dagster process receives a termination signal.

True
timeout int

Timeout for various internal interactions with the Kubernetes RayJob.

600
poll_interval int

Interval at which to poll the Kubernetes for status updates.

1
Source code in src/dagster_ray/pipes.py
def __init__(
    self,
    client: JobSubmissionClient,
    context_injector: PipesContextInjector | None = None,
    message_reader: PipesMessageReader | None = None,
    forward_termination: bool = True,
    timeout: float = 600,
    poll_interval: float = 1,
):
    self.client = client
    self._context_injector = context_injector or PipesEnvContextInjector()
    self._message_reader = message_reader or PipesRayJobMessageReader()

    self.forward_termination = check.bool_param(forward_termination, "forward_termination")
    self.timeout = check.int_param(timeout, "timeout")
    self.poll_interval = check.int_param(poll_interval, "poll_interval")

    self._job_submission_client: JobSubmissionClient | None = None

Functions

run

run(
    *, context: OpOrAssetExecutionContext, submit_job_params: SubmitJobParams, extras: PipesExtras | None = None
) -> PipesClientCompletedInvocation

Execute a RayJob, enriched with the Pipes protocol.

Parameters:

Name Type Description Default
context OpExecutionContext

Current Dagster op or asset context.

required
submit_job_params Dict[str, Any]

RayJob specification. API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>_.

required
extras Optional[Dict[str, Any]]

Additional information to pass to the Pipes session.

None
Source code in src/dagster_ray/pipes.py
def run(  # type: ignore
    self,
    *,
    context: OpOrAssetExecutionContext,
    submit_job_params: SubmitJobParams,
    extras: PipesExtras | None = None,
) -> PipesClientCompletedInvocation:
    """
    Execute a RayJob, enriched with the Pipes protocol.

    Args:
        context (OpExecutionContext): Current Dagster op or asset context.
        submit_job_params (Dict[str, Any]): RayJob specification. `API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>`_.
        extras (Optional[Dict[str, Any]]): Additional information to pass to the Pipes session.
    """

    with open_pipes_session(
        context=context,
        message_reader=self._message_reader,
        context_injector=self._context_injector,
        extras=extras,
    ) as session:
        enriched_submit_job_params = self._enrich_submit_job_params(context, session, submit_job_params)

        job_id = self._start(context, session, enriched_submit_job_params)

        try:
            # self._read_messages(context, job_id)
            self._wait_for_completion(context, job_id)
            return PipesClientCompletedInvocation(session, metadata={"Ray Job ID": job_id})

        except DagsterExecutionInterruptedError:
            if self.forward_termination:
                context.log.warning(f"[pipes] Dagster process interrupted! Will terminate RayJob {job_id}.")
                self._terminate(context, job_id)
            raise

PipesRayJobMessageReader

PipesRayJobMessageReader(job_submission_client_kwargs: dict[str, Any] | None = None)

Bases: PipesMessageReader

Source code in src/dagster_ray/pipes.py
def __init__(self, job_submission_client_kwargs: dict[str, Any] | None = None):
    self._job_submission_client_kwargs = job_submission_client_kwargs
    self._thread: threading.Thread | None = None
    self.session_closed = threading.Event()
    self._job_id = None
    self._client = None
    self.thread_ready = threading.Event()

    self.completed = threading.Event()

Functions


IO Manager

RayIOManager

Bases: ConfigurableIOManager

IO Manager that stores intermediate values in Ray's object store.

The RayIOManager allows storing and retrieving intermediate values in Ray's distributed object store, making it ideal for use with RayRunLauncher and ray_executor. It works by storing Dagster step keys in a global Ray actor that maintains a mapping between step keys and Ray ObjectRefs.

Parameters:

Name Type Description Default
address

Ray cluster address. If provided, will initialize Ray connection. If None, assumes Ray is already initialized.

required
Example

Basic usage

import dagster as dg
from dagster_ray import RayIOManager

@dg.asset(io_manager_key="ray_io_manager")
def upstream() -> int:
    return 42

@dg.asset
def downstream(upstream: int):
    return upstream * 2

definitions = dg.Definitions(
    assets=[upstream, downstream],
    resources={"ray_io_manager": RayIOManager()}
)

Example

With Ray cluster address

ray_io_manager = RayIOManager(address="ray://head-node:10001")

Info
  • Works with any pickable Python objects
  • Supports partitioned assets and partition mappings
  • Uses Ray's automatic object movement for fault tolerance
  • Objects are stored with the Ray actor as owner for lifecycle management

Types

Lifecycle

Bases: Config

Attributes

create class-attribute instance-attribute

create: bool = Field(
    default=True,
    description="Whether to create the resource. If set to `False`, the user can manually call `.create` instead.",
)

wait class-attribute instance-attribute

wait: bool = Field(
    default=True,
    description="Whether to wait for the remote Ray cluster to become ready to accept connections. If set to `False`, the user can manually call `.wait` instead.",
)

connect class-attribute instance-attribute

connect: bool = Field(
    default=True,
    description="Whether to run `ray.init` against the remote Ray cluster. If set to `False`, the user can manually call `.connect` instead.",
)

cleanup class-attribute instance-attribute

cleanup: Literal["never", "always", "on_exception"] = Field(
    default="always",
    description="Resource cleanup policy. Determines when the resource should be deleted after Dagster step execution or during interruption.",
)

AnyDagsterContext module-attribute

AnyDagsterContext: TypeAlias = Union[OpExecutionContext, AssetExecutionContext, InitResourceContext]

BaseRayResource

Bases: ConfigurableResource, ABC

Base class for Ray Resources providing a common interface for Ray cluster management.

This abstract base class defines the interface that all Ray resources must implement, providing a backend-agnostic way to interact with Ray clusters. Concrete implementations include LocalRay for local development and KubeRay resources for Kubernetes deployments.

The BaseRayResource handles the lifecycle of Ray clusters including creation, connection, and cleanup, with configurable policies for each stage.

Examples:

Use as a type annotation for backend-agnostic code:

import dagster as dg
from dagster_ray import RayResource

@dg.asset
def my_asset(ray_cluster: RayResource):
    # Works with any Ray backend
    import ray
    return ray.get(ray.put("hello"))

Manual lifecycle management:

from dagster_ray import Lifecycle

ray_resource = SomeRayResource(
    lifecycle=Lifecycle(
        create=False,  # Don't auto-create
        connect=False  # Don't auto-connect
    )
)

Note

This is an abstract class and cannot be instantiated directly. Use concrete implementations like LocalRay or KubeRayCluster instead.

Attributes

lifecycle class-attribute instance-attribute

lifecycle: Lifecycle = Field(default_factory=Lifecycle, description='Actions to perform during resource setup.')

timeout class-attribute instance-attribute

timeout: float = Field(default=600.0, description='Timeout for Ray readiness in seconds')

ray_init_options class-attribute instance-attribute

ray_init_options: dict[str, Any] = Field(
    default_factory=dict,
    description="Additional keyword arguments to pass to `ray.init()` call, such as `runtime_env`, `num_cpus`, etc. Dagster's `EnvVar` is supported. More details in [Ray docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).",
)

data_execution_options class-attribute instance-attribute

data_execution_options: RayDataExecutionOptions = Field(default_factory=RayDataExecutionOptions)

redis_port class-attribute instance-attribute

redis_port: int = Field(
    default=10001, description="Redis port for connection. Make sure to match with the actual available port."
)

dashboard_port class-attribute instance-attribute

dashboard_port: int = Field(
    default=8265, description="Dashboard port for connection. Make sure to match with the actual available port."
)

env_vars class-attribute instance-attribute

env_vars: dict[str, str] | None = Field(
    default_factory=dict, description="Environment variables to pass to the Ray cluster."
)

enable_tracing class-attribute instance-attribute

enable_tracing: bool = Field(
    default=False,
    description="Enable tracing: inject `RAY_PROFILING=1` and `RAY_task_events_report_interval_ms=0` into the Ray cluster configuration. This allows using `ray.timeline()` to fetch recorded task events. Learn more: https://docs.ray.io/en/latest/ray-core/api/doc/ray.timeline.html#ray-timeline",
)

enable_actor_task_logging class-attribute instance-attribute

enable_actor_task_logging: bool = Field(
    default=False,
    description="Enable actor task logging: inject `RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1` into the Ray cluster configuration.",
)

enable_debug_post_mortem class-attribute instance-attribute

enable_debug_post_mortem: bool = Field(
    default=False,
    description="Enable post-mortem debugging: inject `RAY_DEBUG_POST_MORTEM=1` into the Ray cluster configuration. Learn more: https://docs.ray.io/en/latest/ray-observability/ray-distributed-debugger.html",
)

enable_legacy_debugger class-attribute instance-attribute

enable_legacy_debugger: bool = Field(
    default=False,
    description="Enable legacy debugger: inject `RAY_DEBUG=legacy` into the Ray cluster configuration. Learn more: https://docs.ray.io/en/latest/ray-observability/user-guides/debug-apps/ray-debugging.html#using-the-ray-debugger",
)

_context class-attribute instance-attribute

_context: BaseContext | None = PrivateAttr()

context property

context: BaseContext

host abstractmethod property

host: str

name abstractmethod property

name: str

display_name property

display_name: str

ray_address property

ray_address: str

dashboard_url property

dashboard_url: str

runtime_job_id property

runtime_job_id: str

Returns the Ray Job ID for the current job which was created with ray.init(). :return:

created property

created: bool

ready property

ready: bool

connected property

connected: bool

Functions

yield_for_execution

yield_for_execution(context: InitResourceContext) -> Generator[Self, None, None]
Source code in src/dagster_ray/_base/resources.py
@contextlib.contextmanager
def yield_for_execution(self, context: dg.InitResourceContext) -> Generator[Self, None, None]:
    exception_occurred = None
    try:
        if self.lifecycle.create:
            self._create(context)
            if self.lifecycle.wait:
                self._wait(context)
                if self.lifecycle.connect:
                    self._connect(context)
        yield self
    except BaseException as e:
        exception_occurred = e
        raise
    finally:
        self.cleanup(context, exception_occurred)

_create

_create(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
def _create(self, context: AnyDagsterContext):
    assert context.log is not None
    if not self.created:
        try:
            self.create(context)
            context.log.info(f"Created {self.display_name}.")
        except BaseException:
            context.log.exception(f"Failed to create {self.display_name}")
            raise

_wait

_wait(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
def _wait(self, context: AnyDagsterContext):
    assert context.log is not None
    self._create(context)
    if not self.ready:
        context.log.info(f"Waiting for {self.display_name} to become ready (timeout={self.timeout:.0f}s)...")
        try:
            self.wait(context)
        except BaseException:
            context.log.exception(f"Failed to wait for {self.display_name} readiness")
            raise

_connect

_connect(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
def _connect(self, context: AnyDagsterContext):
    assert context.log is not None
    self._wait(context)
    if not self.connected:
        try:
            self.connect(context)
        except BaseException:
            context.log.exception(f"Failed to connect to {self.display_name}")
            raise
        context.log.info(f"Initialized Ray Client with {self.display_name}")

create

create(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
def create(self, context: AnyDagsterContext):
    pass

wait

wait(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
def wait(self, context: AnyDagsterContext):
    pass

connect

connect(context: AnyDagsterContext) -> BaseContext
Source code in src/dagster_ray/_base/resources.py
@retry(stop=stop_after_delay(120), retry=retry_if_exception_type(ConnectionError), reraise=True)
def connect(self, context: AnyDagsterContext) -> RayBaseContext:
    assert context.log is not None

    import ray

    init_options = _process_dagster_env_vars(self.ray_init_options.copy())

    # cleanup None values from runtime_env.env_vars since Ray doesn't like them

    if "runtime_env" in init_options and "env_vars" in init_options["runtime_env"]:
        init_options["runtime_env"]["env_vars"] = {
            k: v for k, v in init_options["runtime_env"]["env_vars"].items() if v is not None
        }

    init_options["runtime_env"] = init_options.get("runtime_env", {})
    init_options["runtime_env"]["env_vars"] = init_options["runtime_env"].get("env_vars", {})

    for var, value in self.get_env_vars_to_inject().items():
        init_options["runtime_env"]["env_vars"][var] = value

    self.data_execution_options.apply()

    self._context = ray.init(
        address=self.ray_address,
        **init_options,
    )
    self.data_execution_options.apply()
    self.data_execution_options.apply_remote()
    context.log.info("Initialized Ray in client mode!")
    return cast("RayBaseContext", self._context)

delete

delete(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
def delete(self, context: AnyDagsterContext):
    pass

cleanup

cleanup(context: AnyDagsterContext, exception: Optional[BaseException])
Source code in src/dagster_ray/_base/resources.py
def cleanup(self, context: AnyDagsterContext, exception: Optional[BaseException]):  # noqa: UP007
    assert context.log is not None

    if self.lifecycle.cleanup == "never":
        to_delete = False
    elif not self.created:
        to_delete = False
    elif self.lifecycle.cleanup == "always":
        to_delete = True
    elif self.lifecycle.cleanup == "on_exception":
        to_delete = exception is not None
    else:
        to_delete = False

    if to_delete:
        self.delete(context)
        context.log.info(f'Deleted {self.display_name} according to cleanup policy "{self.lifecycle.cleanup}"')

    if self.connected and hasattr(self, "_context") and self._context is not None:
        self._context.disconnect()

get_dagster_tags

get_dagster_tags(context: AnyDagsterContext) -> dict[str, str]
Source code in src/dagster_ray/_base/resources.py
def get_dagster_tags(self, context: AnyDagsterContext) -> dict[str, str]:
    tags = get_dagster_tags(context)
    return tags

get_env_vars_to_inject

get_env_vars_to_inject() -> dict[str, str]
Source code in src/dagster_ray/_base/resources.py
def get_env_vars_to_inject(self) -> dict[str, str]:
    vars: dict[str, str] = self.env_vars or {}
    if self.enable_debug_post_mortem:
        vars["RAY_DEBUG_POST_MORTEM"] = "1"
    if self.enable_tracing:
        vars["RAY_PROFILING"] = "1"
        vars["RAY_task_events_report_interval_ms"] = "0"
    if self.enable_actor_task_logging:
        vars["RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING"] = "1"
    if self.enable_legacy_debugger:
        vars["RAY_DEBUG"] = "legacy"
    return vars