Core API Reference¶
Core dagster-ray
APIs for using external Ray clusters. Learn how to use it here.
Misc¶
LocalRay ¶
Bases: BaseRayResource
Dummy Resource. Is useful for testing and local development. Provides the same interface as actual Resources.
Run Launcher¶
RayRunLauncher ¶
RayRunLauncher(
address: str,
metadata: dict[str, Any] | None = None,
headers: dict[str, Any] | None = None,
cookies: dict[str, Any] | None = None,
env_vars: list[str] | None = None,
runtime_env: dict[str, Any] | None = None,
num_cpus: int | None = None,
num_gpus: int | None = None,
memory: int | None = None,
resources: dict[str, float] | None = None,
inst_data: ConfigurableClassData | None = None,
)
Bases: RunLauncher
, ConfigurableClass
RunLauncher that submits Dagster runs as isolated Ray jobs to a Ray cluster.
Configuration can be provided via dagster.yaml
and individual runs can override
settings using the dagster-ray/config
tag.
Example
Configure via dagster.yaml
Example
Override settings per job
Source code in src/dagster_ray/run_launcher.py
Functions¶
Executor¶
ray_executor ¶
Executes steps by submitting them as Ray jobs.
The steps are started inside the Ray cluster directly.
When used together with the RayRunLauncher
, the executor can inherit the job submission client configuration.
This behavior can be disabled by setting inherit_job_submission_client_from_ray_run_launcher
to False
.
Example
Use ray_executor
for the entire code location
Example
Override configuration for a specific asset
Source code in src/dagster_ray/executor.py
Pipes¶
Run external Ray scripts as Ray jobs while streaming back logs and metadata into Dagster.
PipesRayJobClient ¶
PipesRayJobClient(
client: JobSubmissionClient,
context_injector: PipesContextInjector | None = None,
message_reader: PipesMessageReader | None = None,
forward_termination: bool = True,
timeout: float = 600,
poll_interval: float = 1,
)
Bases: PipesClient
, TreatAsResourceParam
A Pipes client for running Ray jobs on remote clusters.
Starts the job directly on the Ray cluster and reads the logs from the job.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
JobSubmissionClient
|
The Ray job submission client |
required |
context_injector
|
Optional[PipesContextInjector]
|
A context injector to use to inject
context into the Ray job. Defaults to |
None
|
message_reader
|
Optional[PipesMessageReader]
|
A message reader to use to read messages
from the glue job run. Defaults to |
None
|
forward_termination
|
bool
|
Whether to cancel the |
True
|
timeout
|
int
|
Timeout for various internal interactions with the Kubernetes RayJob. |
600
|
poll_interval
|
int
|
Interval at which to poll the Kubernetes for status updates. |
1
|
Source code in src/dagster_ray/pipes.py
Functions¶
run ¶
run(
*, context: OpOrAssetExecutionContext, submit_job_params: SubmitJobParams, extras: PipesExtras | None = None
) -> PipesClientCompletedInvocation
Execute a RayJob, enriched with the Pipes protocol.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context
|
OpExecutionContext
|
Current Dagster op or asset context. |
required |
submit_job_params
|
Dict[str, Any]
|
RayJob specification. |
required |
extras
|
Optional[Dict[str, Any]]
|
Additional information to pass to the Pipes session. |
None
|
Source code in src/dagster_ray/pipes.py
PipesRayJobMessageReader ¶
Bases: PipesMessageReader
Source code in src/dagster_ray/pipes.py
Functions¶
IO Manager¶
RayIOManager ¶
Bases: ConfigurableIOManager
IO Manager that stores intermediate values in Ray's object store.
The RayIOManager allows storing and retrieving intermediate values in Ray's distributed object store, making it ideal for use with RayRunLauncher and ray_executor. It works by storing Dagster step keys in a global Ray actor that maintains a mapping between step keys and Ray ObjectRefs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address
|
Ray cluster address. If provided, will initialize Ray connection. If None, assumes Ray is already initialized. |
required |
Example
Basic usage
import dagster as dg
from dagster_ray import RayIOManager
@dg.asset(io_manager_key="ray_io_manager")
def upstream() -> int:
return 42
@dg.asset
def downstream(upstream: int):
return upstream * 2
definitions = dg.Definitions(
assets=[upstream, downstream],
resources={"ray_io_manager": RayIOManager()}
)
Info
- Works with any pickable Python objects
- Supports partitioned assets and partition mappings
- Uses Ray's automatic object movement for fault tolerance
- Objects are stored with the Ray actor as owner for lifecycle management
Types¶
Lifecycle ¶
Bases: Config
Attributes¶
create
class-attribute
instance-attribute
¶
create: bool = Field(
default=True,
description="Whether to create the resource. If set to `False`, the user can manually call `.create` instead.",
)
wait
class-attribute
instance-attribute
¶
wait: bool = Field(
default=True,
description="Whether to wait for the remote Ray cluster to become ready to accept connections. If set to `False`, the user can manually call `.wait` instead.",
)
connect
class-attribute
instance-attribute
¶
connect: bool = Field(
default=True,
description="Whether to run `ray.init` against the remote Ray cluster. If set to `False`, the user can manually call `.connect` instead.",
)
cleanup
class-attribute
instance-attribute
¶
cleanup: Literal["never", "always", "on_exception"] = Field(
default="always",
description="Resource cleanup policy. Determines when the resource should be deleted after Dagster step execution or during interruption.",
)
AnyDagsterContext
module-attribute
¶
AnyDagsterContext: TypeAlias = Union[OpExecutionContext, AssetExecutionContext, InitResourceContext]
BaseRayResource ¶
Bases: ConfigurableResource
, ABC
Base class for Ray Resources providing a common interface for Ray cluster management.
This abstract base class defines the interface that all Ray resources must implement, providing a backend-agnostic way to interact with Ray clusters. Concrete implementations include LocalRay for local development and KubeRay resources for Kubernetes deployments.
The BaseRayResource handles the lifecycle of Ray clusters including creation, connection, and cleanup, with configurable policies for each stage.
Examples:
Use as a type annotation for backend-agnostic code:
import dagster as dg
from dagster_ray import RayResource
@dg.asset
def my_asset(ray_cluster: RayResource):
# Works with any Ray backend
import ray
return ray.get(ray.put("hello"))
Manual lifecycle management:
from dagster_ray import Lifecycle
ray_resource = SomeRayResource(
lifecycle=Lifecycle(
create=False, # Don't auto-create
connect=False # Don't auto-connect
)
)
Note
This is an abstract class and cannot be instantiated directly. Use concrete implementations like LocalRay or KubeRayCluster instead.
Attributes¶
lifecycle
class-attribute
instance-attribute
¶
lifecycle: Lifecycle = Field(default_factory=Lifecycle, description='Actions to perform during resource setup.')
timeout
class-attribute
instance-attribute
¶
ray_init_options
class-attribute
instance-attribute
¶
ray_init_options: dict[str, Any] = Field(
default_factory=dict,
description="Additional keyword arguments to pass to `ray.init()` call, such as `runtime_env`, `num_cpus`, etc. Dagster's `EnvVar` is supported. More details in [Ray docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).",
)
data_execution_options
class-attribute
instance-attribute
¶
redis_port
class-attribute
instance-attribute
¶
redis_port: int = Field(
default=10001, description="Redis port for connection. Make sure to match with the actual available port."
)
dashboard_port
class-attribute
instance-attribute
¶
dashboard_port: int = Field(
default=8265, description="Dashboard port for connection. Make sure to match with the actual available port."
)
env_vars
class-attribute
instance-attribute
¶
env_vars: dict[str, str] | None = Field(
default_factory=dict, description="Environment variables to pass to the Ray cluster."
)
enable_tracing
class-attribute
instance-attribute
¶
enable_tracing: bool = Field(
default=False,
description="Enable tracing: inject `RAY_PROFILING=1` and `RAY_task_events_report_interval_ms=0` into the Ray cluster configuration. This allows using `ray.timeline()` to fetch recorded task events. Learn more: https://docs.ray.io/en/latest/ray-core/api/doc/ray.timeline.html#ray-timeline",
)
enable_actor_task_logging
class-attribute
instance-attribute
¶
enable_actor_task_logging: bool = Field(
default=False,
description="Enable actor task logging: inject `RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1` into the Ray cluster configuration.",
)
enable_debug_post_mortem
class-attribute
instance-attribute
¶
enable_debug_post_mortem: bool = Field(
default=False,
description="Enable post-mortem debugging: inject `RAY_DEBUG_POST_MORTEM=1` into the Ray cluster configuration. Learn more: https://docs.ray.io/en/latest/ray-observability/ray-distributed-debugger.html",
)
enable_legacy_debugger
class-attribute
instance-attribute
¶
enable_legacy_debugger: bool = Field(
default=False,
description="Enable legacy debugger: inject `RAY_DEBUG=legacy` into the Ray cluster configuration. Learn more: https://docs.ray.io/en/latest/ray-observability/user-guides/debug-apps/ray-debugging.html#using-the-ray-debugger",
)
runtime_job_id
property
¶
Returns the Ray Job ID for the current job which was created with ray.init()
.
:return:
Functions¶
yield_for_execution ¶
Source code in src/dagster_ray/_base/resources.py
_wait ¶
_wait(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
_connect ¶
_connect(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
create ¶
create(context: AnyDagsterContext)
wait ¶
wait(context: AnyDagsterContext)
connect ¶
connect(context: AnyDagsterContext) -> BaseContext
Source code in src/dagster_ray/_base/resources.py
delete ¶
delete(context: AnyDagsterContext)
cleanup ¶
cleanup(context: AnyDagsterContext, exception: Optional[BaseException])
Source code in src/dagster_ray/_base/resources.py
get_dagster_tags ¶
get_dagster_tags(context: AnyDagsterContext) -> dict[str, str]