Core API Reference¶
Core dagster-ray
APIs for using external Ray clusters. Learn how to use it here.
Ray Resources¶
RayResource
can be used to connect to external Ray clusters when provided as a Dagster resource, or as a type annotation (all other Ray resources in dagster-ray
inherit from RayResource
)
dagster_ray.RayResource
pydantic-model
¶
Bases: ConfigurableResource
, ABC
Base class for Ray Resources providing a common interface for Ray cluster management.
This abstract base class defines the interface that all Ray resources must implement, providing a backend-agnostic way to interact with Ray clusters. Concrete implementations include LocalRay for local development and KubeRay resources for Kubernetes deployments.
The RayResource handles the lifecycle of Ray clusters including creation, connection, and cleanup, with configurable policies for each stage.
Example
Use as a type annotation for backend-agnostic code
Example
Manual lifecycle management
Note
This is an abstract class and cannot be instantiated directly. Use concrete implementations like LocalRay or KubeRayCluster instead.
Show JSON schema:
{
"$defs": {
"ExecutionOptionsConfig": {
"properties": {
"cpu": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Cpu"
},
"gpu": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Gpu"
},
"object_store_memory": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Object Store Memory"
}
},
"title": "ExecutionOptionsConfig",
"type": "object"
},
"Lifecycle": {
"properties": {
"create": {
"default": true,
"description": "Whether to create the resource. If set to `False`, the user can manually call `.create` instead.",
"title": "Create",
"type": "boolean"
},
"wait": {
"default": true,
"description": "Whether to wait for the remote Ray cluster to become ready to accept connections. If set to `False`, the user can manually call `.wait` instead.",
"title": "Wait",
"type": "boolean"
},
"connect": {
"default": true,
"description": "Whether to run `ray.init` against the remote Ray cluster. If set to `False`, the user can manually call `.connect` instead.",
"title": "Connect",
"type": "boolean"
},
"cleanup": {
"default": "always",
"description": "Resource cleanup policy. Determines when the resource should be deleted after Dagster step execution or during interruption.",
"enum": [
"never",
"always",
"on_exception"
],
"title": "Cleanup",
"type": "string"
}
},
"title": "Lifecycle",
"type": "object"
},
"RayDataExecutionOptions": {
"properties": {
"execution_options": {
"$ref": "#/$defs/ExecutionOptionsConfig"
},
"cpu_limit": {
"default": 5000,
"title": "Cpu Limit",
"type": "integer"
},
"gpu_limit": {
"default": 0,
"title": "Gpu Limit",
"type": "integer"
},
"verbose_progress": {
"default": true,
"title": "Verbose Progress",
"type": "boolean"
},
"use_polars": {
"default": true,
"title": "Use Polars",
"type": "boolean"
}
},
"title": "RayDataExecutionOptions",
"type": "object"
}
},
"description": "Base class for Ray Resources providing a common interface for Ray cluster management.\n\nThis abstract base class defines the interface that all Ray resources must implement,\nproviding a backend-agnostic way to interact with Ray clusters. Concrete implementations\ninclude LocalRay for local development and KubeRay resources for Kubernetes deployments.\n\nThe RayResource handles the lifecycle of Ray clusters including creation, connection,\nand cleanup, with configurable policies for each stage.\n\nExample:\n Use as a type annotation for backend-agnostic code\n ```python\n import dagster as dg\n from dagster_ray import RayResource\n\n @dg.asset\n def my_asset(ray_cluster: RayResource):\n # Works with any Ray backend\n import ray\n return ray.get(ray.put(\"hello\"))\n ```\n\nExample:\n Manual lifecycle management\n ```python\n from dagster_ray import Lifecycle\n\n ray_resource = SomeRayResource(\n lifecycle=Lifecycle(\n create=False, # Don't auto-create\n connect=False # Don't auto-connect\n )\n )\n ```\n\nNote:\n This is an abstract class and cannot be instantiated directly. Use concrete\n implementations like LocalRay or KubeRayCluster instead.",
"properties": {
"lifecycle": {
"$ref": "#/$defs/Lifecycle",
"description": "Actions to perform during resource setup."
},
"timeout": {
"default": 600.0,
"description": "Timeout for Ray readiness in seconds",
"title": "Timeout",
"type": "number"
},
"ray_init_options": {
"description": "Additional keyword arguments to pass to `ray.init()` call, such as `runtime_env`, `num_cpus`, etc. Dagster's `EnvVar` is supported. More details in [Ray docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).",
"title": "Ray Init Options",
"type": "object"
},
"data_execution_options": {
"$ref": "#/$defs/RayDataExecutionOptions"
},
"redis_port": {
"default": 10001,
"description": "Redis port for connection. Make sure to match with the actual available port.",
"title": "Redis Port",
"type": "integer"
},
"dashboard_port": {
"default": 8265,
"description": "Dashboard port for connection. Make sure to match with the actual available port.",
"title": "Dashboard Port",
"type": "integer"
},
"env_vars": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"description": "Environment variables to pass to the Ray cluster.",
"title": "Env Vars"
},
"enable_tracing": {
"default": false,
"description": "Enable tracing: inject `RAY_PROFILING=1` and `RAY_task_events_report_interval_ms=0` into the Ray cluster configuration. This allows using `ray.timeline()` to fetch recorded task events. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.timeline.html#ray-timeline)",
"title": "Enable Tracing",
"type": "boolean"
},
"enable_actor_task_logging": {
"default": false,
"description": "Enable actor task logging: inject `RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1` into the Ray cluster configuration.",
"title": "Enable Actor Task Logging",
"type": "boolean"
},
"enable_debug_post_mortem": {
"default": false,
"description": "Enable post-mortem debugging: inject `RAY_DEBUG_POST_MORTEM=1` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/ray-distributed-debugger.html)",
"title": "Enable Debug Post Mortem",
"type": "boolean"
},
"enable_legacy_debugger": {
"default": false,
"description": "Enable legacy debugger: inject `RAY_DEBUG=legacy` into the Ray cluster configuration. Learn more: [KubeRay docs](https://docs.ray.io/en/latest/ray-observability/user-guides/debug-apps/ray-debugging.html#using-the-ray-debugger)",
"title": "Enable Legacy Debugger",
"type": "boolean"
}
},
"title": "RayResource",
"type": "object"
}
Fields:
-
lifecycle
(Lifecycle
) -
timeout
(float
) -
ray_init_options
(dict[str, Any]
) -
data_execution_options
(RayDataExecutionOptions
) -
redis_port
(int
) -
dashboard_port
(int
) -
env_vars
(dict[str, str] | None
) -
enable_tracing
(bool
) -
enable_actor_task_logging
(bool
) -
enable_debug_post_mortem
(bool
) -
enable_legacy_debugger
(bool
) -
_context
(BaseContext | None
)
Attributes¶
ray_init_options
pydantic-field
¶
Additional keyword arguments to pass to ray.init()
call, such as runtime_env
, num_cpus
, etc. Dagster's EnvVar
is supported. More details in Ray docs.
redis_port
pydantic-field
¶
redis_port: int = 10001
Redis port for connection. Make sure to match with the actual available port.
dashboard_port
pydantic-field
¶
dashboard_port: int = 8265
Dashboard port for connection. Make sure to match with the actual available port.
env_vars
pydantic-field
¶
Environment variables to pass to the Ray cluster.
enable_tracing
pydantic-field
¶
enable_tracing: bool = False
Enable tracing: inject RAY_PROFILING=1
and RAY_task_events_report_interval_ms=0
into the Ray cluster configuration. This allows using ray.timeline()
to fetch recorded task events. Learn more: KubeRay docs
enable_actor_task_logging
pydantic-field
¶
enable_actor_task_logging: bool = False
Enable actor task logging: inject RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1
into the Ray cluster configuration.
enable_debug_post_mortem
pydantic-field
¶
enable_debug_post_mortem: bool = False
Enable post-mortem debugging: inject RAY_DEBUG_POST_MORTEM=1
into the Ray cluster configuration. Learn more: KubeRay docs
enable_legacy_debugger
pydantic-field
¶
enable_legacy_debugger: bool = False
Enable legacy debugger: inject RAY_DEBUG=legacy
into the Ray cluster configuration. Learn more: KubeRay docs
runtime_job_id
property
¶
runtime_job_id: str
Returns the Ray Job ID for the current job which was created with ray.init()
.
:return:
Functions¶
yield_for_execution ¶
yield_for_execution(context: InitResourceContext) -> Generator[Self, None, None]
Source code in src/dagster_ray/_base/resources.py
_wait ¶
_wait(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
_connect ¶
_connect(context: AnyDagsterContext)
Source code in src/dagster_ray/_base/resources.py
create ¶
create(context: AnyDagsterContext)
wait ¶
wait(context: AnyDagsterContext)
connect ¶
connect(context: AnyDagsterContext) -> BaseContext
Source code in src/dagster_ray/_base/resources.py
delete ¶
delete(context: AnyDagsterContext)
cleanup ¶
cleanup(context: AnyDagsterContext, exception: BaseException | None)
Source code in src/dagster_ray/_base/resources.py
get_dagster_tags ¶
get_dagster_tags(context: AnyDagsterContext) -> dict[str, str]
get_env_vars_to_inject ¶
Source code in src/dagster_ray/_base/resources.py
The LocalRay
can be used to connect to a local Ray cluster.
dagster_ray.core.resources.LocalRay
pydantic-model
¶
Bases: RayResource
Dummy Resource. Is useful for testing and local development. Provides the same interface as actual Resources.
Fields:
-
lifecycle
(Lifecycle
) -
timeout
(float
) -
ray_init_options
(dict[str, Any]
) -
data_execution_options
(RayDataExecutionOptions
) -
redis_port
(int
) -
dashboard_port
(int
) -
env_vars
(dict[str, str] | None
) -
enable_tracing
(bool
) -
enable_actor_task_logging
(bool
) -
enable_debug_post_mortem
(bool
) -
enable_legacy_debugger
(bool
) -
_context
(BaseContext | None
)
Run Launcher¶
dagster_ray.core.run_launcher.RayRunLauncher ¶
RayRunLauncher(
address: str,
metadata: dict[str, Any] | None = None,
headers: dict[str, Any] | None = None,
cookies: dict[str, Any] | None = None,
env_vars: list[str] | None = None,
runtime_env: dict[str, Any] | None = None,
num_cpus: int | None = None,
num_gpus: int | None = None,
memory: int | None = None,
resources: dict[str, float] | None = None,
inst_data: ConfigurableClassData | None = None,
)
Bases: RunLauncher
, ConfigurableClass
RunLauncher that submits Dagster runs as isolated Ray jobs to a Ray cluster.
Configuration can be provided via dagster.yaml
and individual runs can override
settings using the dagster-ray/config
tag.
Example
Configure via dagster.yaml
Example
Override settings per job
Source code in src/dagster_ray/core/run_launcher.py
Functions¶
Executor¶
dagster_ray.core.executor.ray_executor ¶
ray_executor(init_context: InitExecutorContext) -> Executor
Executes steps by submitting them as Ray jobs.
The steps are started inside the Ray cluster directly.
When used together with RayRunLauncher
, the executor can inherit the job submission client configuration.
This behavior can be disabled by setting inherit_job_submission_client_from_ray_run_launcher
to False
.
Example
Use ray_executor
for the entire code location
Example
Override configuration for a specific asset
Source code in src/dagster_ray/core/executor.py
Pipes¶
Run external Ray scripts as Ray jobs while streaming back logs and metadata into Dagster with Dagster Pipes.
dagster_ray.core.pipes.PipesRayJobClient ¶
PipesRayJobClient(
client: JobSubmissionClient,
context_injector: PipesContextInjector | None = None,
message_reader: PipesMessageReader | None = None,
forward_termination: bool = True,
timeout: float = 600,
poll_interval: float = 1,
)
Bases: PipesClient
, TreatAsResourceParam
A Pipes client for running Ray jobs on remote clusters.
Starts the job directly on the Ray cluster and reads the logs from the job.
Parameters:
-
client
(JobSubmissionClient
) –The Ray job submission client
-
context_injector
(Optional[PipesContextInjector]
, default:None
) –A context injector to use to inject context into the Ray job. Defaults to
PipesEnvContextInjector
. -
message_reader
(Optional[PipesMessageReader]
, default:None
) –A message reader to use to read messages from the glue job run. Defaults to
PipesRayJobMessageReader
. -
forward_termination
(bool
, default:True
) –Whether to cancel the Ray job run when the Dagster process receives a termination signal.
-
timeout
(int
, default:600
) –Timeout for various internal interactions with the Kubernetes RayJob.
-
poll_interval
(int
, default:1
) –Interval at which to poll Kubernetes for status updates. Is useful when running in a local environment.
Source code in src/dagster_ray/core/pipes.py
Functions¶
run ¶
run(
*, context: OpOrAssetExecutionContext, submit_job_params: SubmitJobParams, extras: PipesExtras | None = None
) -> PipesClientCompletedInvocation
Execute a RayJob, enriched with the Pipes protocol.
Parameters:
-
context
(OpExecutionContext
) –Current Dagster op or asset context.
-
submit_job_params
(Dict[str, Any]
) –RayJob specification.
API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>
_. -
extras
(Optional[Dict[str, Any]]
, default:None
) –Additional information to pass to the Pipes session.
Source code in src/dagster_ray/core/pipes.py
dagster_ray.core.pipes.PipesRayJobMessageReader ¶
Bases: PipesMessageReader
Source code in src/dagster_ray/core/pipes.py
Functions¶
IO Manager¶
Send data between Dagster steps while they are running inside a Ray cluster.
dagster_ray.core.io_manager.RayIOManager ¶
Bases: ConfigurableIOManager
IO Manager that stores intermediate values in Ray's object store.
The RayIOManager allows storing and retrieving intermediate values in Ray's distributed object store, making it ideal for use with RayRunLauncher and ray_executor. It works by storing Dagster step keys in a global Ray actor that maintains a mapping between step keys and Ray ObjectRefs.
Attributes:
-
address
(str | None
) –Ray cluster address. If provided, will initialize Ray connection. If None, assumes Ray is already initialized.
Example
Basic usage
import dagster as dg
from dagster_ray import RayIOManager
@dg.asset(io_manager_key="ray_io_manager")
def upstream() -> int:
return 42
@dg.asset
def downstream(upstream: int):
return upstream * 2
definitions = dg.Definitions(
assets=[upstream, downstream],
resources={"ray_io_manager": RayIOManager()}
)
Info
- Works with picklable Python objects
- Supports partitioned assets and partition mappings
- Uses Ray's automatic object movement for fault tolerance
- Objects are stored with the Ray actor as owner for lifecycle management