Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions ray_data_robotics_droid_blip/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# DROID → BLIP Captioning Pipeline

A [Ray Data](https://docs.ray.io/en/latest/data/overview.html) pipeline that reads the [DROID Raw 1.0.1](https://droid-dataset.github.io/) robotics dataset from a public S3 bucket, generates image captions for each wrist-camera frame using [BLIP-large](https://huggingface.co/Salesforce/blip-image-captioning-large) on GPU, and writes annotated parquet partitioned by episode.

Demonstrates **heterogeneous Ray Data pipelines** — CPU workers stream data from S3 while GPU workers run model inference, with Ray Data managing backpressure automatically.

## Files

| File | Description |
|---|---|
| `pipeline.py` | Main entry point — pipeline wiring, CPU/GPU stages, and config knobs |
| `job.yaml` | Anyscale job config — compute, env vars, T4 GPUs |
| `pyproject.toml` | Dependencies (managed by `uv`) |

**Note:** The pipeline automatically downloads the DROID episode manifest (`episodes_droid_v1.0.1_s3.parquet`) from S3 on first run.

## Usage

```bash
# Install dependencies
uv sync

# Run on an existing Ray cluster
uv run python pipeline.py

# Smoke test (limit to N episodes)
# Set NUM_EPISODES in pipeline.py, then run as above
```

### Run on Anyscale

Submit as a job using the included `job.yaml`, which provisions T4 GPU workers:

```bash
uv run anyscale job submit -f job.yaml

# Override GPU concurrency or batch size via env vars
uv run anyscale job submit -f job.yaml --env GPU_STAGE_CONCURRENCY=8 --env GPU_BATCH_SIZE=64
```

## Configuration

Key settings in `pipeline.py`:

| Setting | Default | Description |
|---|---|---|
| `NUM_EPISODES` | `1000` | Limit episodes for testing (set to `None` for all) |
| `MODEL_NAME` | `Salesforce/blip-image-captioning-large` | HuggingFace model ID |
| `CPU_LOADER_NUM_CPUS` | `0.5` | Fractional CPUs per data-loading worker |
| `GPU_BATCH_SIZE` | `32` | Frames per GPU batch (env: `GPU_BATCH_SIZE`) |
| `GPU_CONCURRENCY` | `2` | Number of GPU actors (env: `GPU_STAGE_CONCURRENCY`) |
| `OUTPUT` | `/mnt/cluster_storage/droid-annotated` | Parquet output path |
49 changes: 49 additions & 0 deletions ray_data_robotics_droid_blip/job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# View the docs https://docs.anyscale.com/reference/job-api#jobconfig.
name: droid-blip-captioning

# When empty, use the default image. This can be an Anyscale-provided base image
# like anyscale/ray:2.43.0-slim-py312-cu125, a user-provided base image (provided
# that it meets certain specs), or you can build new images using the Anyscale
# image builder at https://console.anyscale-staging.com/v2/container-images.
image_uri: anyscale/ray:2.54.0-slim-py313-cu129

compute_config:
head_node:
required_resources:
CPU: 8
memory: 32Gi
worker_nodes:
# CPU nodes for I/O-bound episode loading (S3 download + video decode).
- required_resources:
CPU: 8
memory: 32Gi
min_nodes: 8
max_nodes: 8
# GPU nodes for BLIP inference (T4).
- required_resources:
CPU: 4
memory: 16Gi
GPU: 1
required_labels:
ray.io/accelerator-type: T4
min_nodes: 4
max_nodes: 4

# Path to a local directory or a remote URI to a .zip file (S3, GS, HTTP) that
# will be the working directory for the job. The files in the directory will be
# automatically uploaded to the job environment in Anyscale.
working_dir: .

env_vars:
GPU_STAGE_CONCURRENCY: "4"
GPU_BATCH_SIZE: "32"

# The script to run in your job. You can also do "uv run main.py" if you have a
# pyproject.toml file in your working_dir.
entrypoint: uv run python pipeline.py

# If there is an error, do not retry.
max_retries: 0

# Kill the job after 4 hours to control costs.
timeout_s: 14400
Loading