Skip to content

Dagster + External Ray Clusters Tutorial

This tutorial covers how to use dagster-ray with external Ray clusters - clusters that are managed outside of Dagster. This approach is ideal when you have existing Ray infrastructure or want to separate cluster management from your data pipelines.

When to Use Each Approach

Use LocalRay when:

  • Developing and testing locally

Use RayRunLauncher when:

  • You want to run all Dagster pipelines on Ray
  • You want very fast Dagster run submission

Use ray_executor when:

  • You want selective Ray execution for specific assets
  • You want very fast Dagster step submission

Use PipesRayJobClient when:

  • You want to decouple Ray workloads from orchestration code
  • You have existing Ray scripts you want to integrate
  • You want full separation between Dagster and Ray environments

Prerequisites

Before getting started, you'll need:

  • A Ray cluster (can be local Ray for development, or remote Ray cluster for production)
  • dagster-ray installed:
    pip install dagster-ray
    
  • For remote clusters: Ray cluster address and appropriate network access

LocalRay - Development and Testing

LocalRay is perfect for local development and testing. It provides the same interface as other Ray resources but runs Ray locally on your machine.

import dagster as dg
from dagster_ray import LocalRay
import ray


@dg.asset
def batch_processing_results(ray_cluster: LocalRay) -> dict:
    """Process multiple batches in parallel using local Ray."""
    refs = [process_batch.remote(i, size) for i, size in enumerate(batch_sizes)]

    # Collect results
    results = ray.get(refs)

    return aggregate(results)


definitions = dg.Definitions(
    assets=[batch_processing_results], resources={"ray_cluster": LocalRay()}
)

You can customize the local Ray configuration:

from dagster_ray import LocalRay

local_ray = LocalRay(
    # Ray initialization options
    ray_init_options={
        "num_cpus": 8,
        "num_gpus": 1,
        "object_store_memory": 1000000000,  # 1GB
        "runtime_env": {"pip": ["numpy", "polars", "scikit-learn"]},
    },
)

RayRunLauncher

RayRunLauncher executes entire Dagster runs as Ray jobs. This is useful for Dagster deployments that need to be fully executed on Ray.

Tip

Make sure the Ray cluster has access to Dagster's metadata database!

Usage

Configure the run launcher in your dagster.yaml:

run_launcher:
  module: dagster_ray
  class: RayRunLauncher
  config:
    address:
      env: RAY_ADDRESS
    timeout: 1800
    metadata:
      foo: bar
      runtime_env:
      env_vars:
        FOO: bar
      pip:
        - polars

With RayRunLauncher enabled, your regular Dagster assets will automatically run on Ray:

import dagster as dg


@dg.asset
def regular_asset():
    """This asset will be submitted as a Ray job."""
    ...

All the steps will be executed in a single Ray job, unless a custom executor is used.

It's possible to provide additional runtime configuration via the dagster-ray/config run tag.

ray_executor

ray_executor runs Dagster steps (ops or assets) as Ray jobs (in parallel).

Tip

Make sure the Ray cluster has access to Dagster's metadata database!

Usage

The executor can be enabled at Definitions level:

import dagster as dg
from dagster_ray import ray_executor


definitions = dg.Definitions(
    executor=ray_executor.configured(
        {"address": dg.EnvVar("RAY_ADDRESS"), "runtime_env": {"pip": ["polars"]}}
    )
)

It's possible to configure individual assets via the dagster-ray/config op tag:

@dg.asset(
    op_tags={
        "dagster-ray/config": {
            "num_cpus": 2,
        }
    }
)
def my_asset(): ...

PipesRayJobClient - External Script Execution

PipesRayJobClient lets you submit external Python scripts to Ray clusters as Ray jobs. This is perfect for decoupling your Ray workloads from Dagster orchestration code and Python environment.

External Ray Script

First, create a script that will run on the Ray cluster:

ml_training.py
# ml_training.py - External Ray script
import ray
from dagster_pipes import open_dagster_pipes


@ray.remote
def train_ml_model(partition_id: int):
    """Dummy ML training function."""
    import time

    time.sleep(1)  # Simulate work
    return {"partition_id": partition_id, "accuracy": 0.95}


def main():
    with open_dagster_pipes() as context:
        context.log.info("Starting distributed ML training")

        # Get configuration from Dagster
        num_partitions = context.get_extra("num_partitions", 4)

        # Submit training jobs
        futures = [train_ml_model.remote(i) for i in range(num_partitions)]
        results = ray.get(futures)

        context.log.info(f"Training complete on {len(results)} partitions")

        # Report results
        context.report_asset_materialization(
            metadata={"num_partitions": len(results), "results": results},
            data_version="alpha",
        )


if __name__ == "__main__":
    main()

Dagster Asset Using Pipes

Now, let's define a Dagster asset that will be calling the above external script via Dagster Pipes.

import dagster as dg
from dagster_ray import PipesRayJobClient
from ray.job_submission import JobSubmissionClient


class MLTrainingConfig(dg.Config):
    num_partitions: int = 4


@dg.asset
def distributed_ml_training(
    context: dg.AssetExecutionContext,
    ray_client: PipesRayJobClient,
    config: MLTrainingConfig,
) -> dict:
    """Run distributed ML training using Ray Pipes."""

    return ray_client.run(
        context=context,
        submit_job_params={
            "entrypoint": "python ml_training.py",
            "runtime_env": {
                "pip": ["dagster-pipes", "torch"],  # (1)!
            },
        },
        extras={
            "num_partitions": config.num_partitions,
        },
    )


definitions = dg.Definitions(
    assets=[distributed_ml_training],
    resources={
        "ray_client": PipesRayJobClient(
            client=JobSubmissionClient(),
            timeout=1800,
        )
    },
)
  1. 💡 dagster-pipes have to be installed in the remote environment!

When materializing the asset, the PipesRayJobClient will submit the script as a Ray job, monitor its status, and stream back logs and Dagster metadata.

Conclusion

That's it! You now have a comprehensive understanding of how to use dagster-ray with external Ray clusters, from local development with LocalRay to production deployments with PipesRayJobClient.