Skip to content

Welcome to dagster-ray docs

PyPI version License Python versions CI pre-commit basedpyright - checked Ruff


Ray integration for Dagster.

dagster-ray enables working with distributed Ray compute from Dagster pipelines, combining Dagster's excellent orchestration capabilities and Ray's distributed computing power together.

Note

This project is ready for production use, but some APIs may change between minor releases.

πŸš€ Key Features

  • 🎯 Run Launchers & Executors: Submit Dagster runs or individual steps by submitting Ray jobs
  • πŸ”§ Ray Resources: Automatically create and destroy ephemeral Ray clusters and connect to them in client mode
  • πŸ“‘ Dagster Pipes Integration: Submit external scripts as Ray jobs, stream back logs and rich Dagster metadata
  • ☸️ KubeRay Support: Utilize RayJob and RayCluster custom resources in client or job submission mode (tutorial)
  • 🏭 Production Ready: Tested against a matrix of core dependencies, integrated with Dagster+

⚑ Quick Start

Installation

pip install dagster-ray
pip install 'dagster-ray[kuberay]'

Basic Usage

Execute Dagster steps on an existing Ray cluster

Example

import dagster as dg
from dagster_ray import ray_executor

defs = dg.Definitions(..., executor=ray_executor)

Execute an asset on Ray in client mode

Example

Define a Dagster asset that uses Ray in client mode

import dagster as dg
from dagster_ray import RayResource
import ray


@ray.remote
def compute_square(x: int) -> int:
    return x**2


@dg.asset
def my_distributed_computation(ray_cluster: RayResource) -> int:  # (2)!
    futures = [compute_square.remote(i) for i in range(10)]  # (1)!
    return sum(ray.get(futures))

  1. ⚑ I am already running in Ray!
  2. πŸ’‘ RayResource is a type annotation that provides a common interface for Ray resources

Now use LocalRay for development and swap it with a thick cluster in Kubernetes!

from dagster_ray import LocalRay
from dagster_ray.kuberay import in_k8s, KubeRayInteractiveJob

ray_cluster = LocalRay() if not in_k8s else KubeRayInteractiveJob()

definitions = dg.Definitions(
    assets=[my_distributed_computation],
    resources={"ray_cluster": ray_cluster},
)

KubeRayInteractiveJob will create a RayJob, connect to it, and optionally perform cleanup according to the configured policy.

Learn more by reading the tutorials.

πŸ› οΈ Choosing Your Integration

dagster-ray offers multiple ways to integrate Ray with your Dagster pipelines. The right choice depends on your deployment setup and use case:

πŸ€” Key Questions to Consider

  • Do you want to manage Ray clusters automatically? If yes, use KubeRay components
  • Do you prefer to submit external scripts or run code directly? External scripts offer better separation of concerns and environments, but interactive code is more convenient
  • Do you need per-asset configuration? Some components allow fine-grained control per asset

πŸ“Š Feature Comparison

Feature RayRunLauncher ray_executor PipesRayJobClient PipesKubeRayJobClient KubeRayCluster KubeRayInteractiveJob
Manages the cluster ❌ ❌ ❌ βœ… βœ… βœ…
Uses Ray Jobs API βœ… βœ… βœ… βœ… ❌ ❌
Enabled per-asset ❌ ❌ βœ… βœ… βœ… βœ…
Configurable per-asset ❌ βœ… βœ… βœ… βœ… βœ…
No external script needed βœ… βœ… ❌ ❌ βœ… βœ…
No Dagster DB access needed ❌ ❌ βœ… βœ… βœ… βœ…

🎯 Which One Should You Use?

You have a Ray cluster already running

You want dagster-ray to handle cluster lifecycle

dagster-ray supports running Ray on Kubernetes with KubeRay.

πŸ“š What's Next?

  • Tutorial


    Step-by-step guide with practical examples to get you started with dagster-ray

  • API Reference


    Complete documentation of all classes, methods, and configuration options