Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,4 @@ environments/community/word_hunt/word_hunt_rollouts*.html

# Diplomacy artefacts
environments/game_environments/diplomacy_environment/logs/
.legacy/
49 changes: 49 additions & 0 deletions DEO_ARCHITECTURE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Atropos DEO: Architecture & Scaling State Machine

The **Dynamic Environment Orchestrator (DEO)** is a resilient scaling engine for managing environment workers in large-scale RL training.

## Core Components

1. **ScalingController**: Implements a dampened PID-style loop with hysteresis to determine the `target_actors` based on "Rollout Pressure" (Queue/BatchSize).
2. **ScalingStrategy**: The execution layer.
- `LocalActor`: Manages subprocesses on the local node with port/GPU isolation.
- `RemoteActor`: Manages remote processes via SSH.
3. **MetricsCollector**: Telemetry interface. Polls the Atropos API server for global workload metrics, including a multi-poll grace period for network resilience.

## The Scaling State Machine

Workers transition through four distinct phases to ensure zero data loss and cluster stability.

```mermaid
state_chart
[*] --> Pending : Launched (Port Assigned)
Pending --> Connected : Registered with API Server
Connected --> Draining : SIGUSR1 (Scale Down Triggered)
Draining --> [*] : Rollout Finished (Process Exit)

Pending --> [*] : Boot Timeout / Failure
Connected --> [*] : Crash / Termination
```

### 1. Pending Phase
- Orchestrator subtracts `pending` counts from scaling decisions to prevent **Launch Storms**.
- Tracked via PID and launch timestamp.

### 2. Connected Phase
- Worker is executing rollouts and contributing to the global throughput.
- Orchestrator monitors "Rollout Pressure" to decide if more are needed.

### 3. Draining Phase (Nous Maintainer Standard)
- When scaling down, the orchestrator DOES NOT kill the process immediately.
- It sends `SIGUSR1` to the worker.
- The worker stops accepting new tasks and finishes its current rollout.
- The orchestrator moves the worker to a `draining` list and continues managing the rest of the cluster.

### 4. Adoption Logic (Warm Startup)
- Upon restart, the DEO scans the process table for orphans matching the environment command.
- It "adopts" these workers into its management loop, preventing duplicate launches and port conflicts.

## Resilience Features
- **Port Isolation**: Dedicated port pools (`8001:8020`) for multi-instance scaling on a single IP.
- **Heartbeat Grace Period**: 3-poll window (~30s) where stale metrics are used during network flaps to prevent accidental mass scale-down.
- **Process Group Isolation**: `os.killpg` ensures that even a worker's sub-processes (e.g., a CUDA kernel launcher) are reaped correctly.
67 changes: 67 additions & 0 deletions PROD_DEPLOYMENT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Atropos DEO: Production Deployment Guide

Technical specification for deploying the Atropos Dynamic Environment Orchestrator (DEO) in GPU-accelerated training clusters.

## Cluster-Scale Deployment

### 1. Scaling LLM Workers (GPU Isolation)
The DEO leverages `CUDA_VISIBLE_DEVICES` to ensure that each worker has dedicated, non-overlapping access to GPUs.

**Example: Launching Tensor-Parallel Workers (TP=2)**
To run workers that each consume 2 GPUs on an 8-GPU node:
```bash
python -m atroposlib.cli.orchestrate \
--env-command "python main.py --tp 2 --port \$PORT" \
--gpus-per-actor 2 \
--max-actors 4
```
The DEO will automatically slice the device list:
- Worker 0: `CUDA_VISIBLE_DEVICES=0,1`
- Worker 1: `CUDA_VISIBLE_DEVICES=2,3`
- ...

### 2. Multi-Node Expansion
Use the `RemoteActor` strategy to manage a distributed fleet from a single controller. Ensure passwordless SSH and identical environment paths across the cluster.

---

## Production Resilience Patterns

### 1. Hardware Cordoning (Thermal Guard)
The DEO continuously monitors GPU health via NVML/SMI. If a GPU enters a `ThermalThrottled` or `HardwareFault` state (`0x0000000000000008`), the DEO will:
1. **Cordon** the GPU (mark it as unavailable).
2. **Skip** scale-up attempts that would utilize that hardware.
3. Log a `CRITICAL` alert to prevent training performance degradation.

### 2. CrashLoopBackOff (Self-Healing)
To prevent "Launch Storms" (rapidly failing workers consuming CPU/IO), the DEO tracks failure frequency.
- **Trigger**: 3 failures within a 60-second window.
- **Action**: Scale-up is **HALTED** until the cooldown period expires or the operator intervenes.

### 3. Graceful Draining (Zero Data Loss)
During scale-down (e.g., training efficiency adjustment or node maintenance), the DEO sends `SIGUSR1`.
- Workers finish the current rollout.
- Checkpoints are saved.
- Data is securely synchronized before process exit.

---

## Maintenance & Observability

### Diagnostic Audit
Run the status command to audit the current resource allocation:
```bash
python -m atroposlib.cli.orchestrate --status
```

### WandB Integration
All orchestration metadata is synchronized to WandB, allowing infra teams to monitor:
- `deo/rollout_pressure` (Scaling demand)
- `deo/num_draining` (Capacity withdrawal status)
- `deo/free_vram_mb` (Memory headroom)

---

## Troubleshooting
- **Zombie Processes**: If the DEO is killed via `SIGKILL`, some CUDA kernels may remain active. Restart the DEO; its **Warm Startup** logic will automatically adopt these orphans and reclaim them gracefully.
- **Port Hijacking**: The DEO performs a socket-level pre-flight check before every launch to prevent conflicts with unmanaged system processes.
71 changes: 64 additions & 7 deletions atroposlib/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send):

sent = False

# needed some odd logic here to handle gzip stream so just returning an empty body
# Handle gzip stream by returning empty body after first send
async def new_receive():
nonlocal sent
if sent:
Expand Down Expand Up @@ -200,6 +200,8 @@ def _process_scored_data(scored_data: ScoredData) -> Dict[str, Any]:
app.state.queue = []
if not hasattr(app.state, "buffer"):
app.state.buffer = {}
if not hasattr(app.state, "total_rollouts_processed"):
app.state.total_rollouts_processed = 0

data_dict = _scored_data_to_dict(scored_data)
env_id = data_dict.get("env_id")
Expand Down Expand Up @@ -233,6 +235,8 @@ def _process_scored_data(scored_data: ScoredData) -> Dict[str, Any]:

app.state.queue.append(data_dict)
app.state.latest = data_dict
if hasattr(app.state, "total_rollouts_processed"):
app.state.total_rollouts_processed += 1
return {"status": "received"}


Expand All @@ -253,6 +257,19 @@ class Info(BaseModel):
batch_size: int = -1


class GlobalStatus(BaseModel):
"""
Basemodel for global orchestration metrics
"""

current_step: int
queue_size: int
total_rollouts_processed: int
unallocated_fraction: float
num_connected_envs: int
batch_size: int


@app.post("/register")
async def register(registration: Registration):
# Initialize app state if not already done
Expand All @@ -269,7 +286,8 @@ async def register(registration: Registration):
app.state.curr_batch = []
app.state.started = False
app.state.envs = []
app.state.buffer = {} # Buffer for mixed-size groups per environment
app.state.buffer = {} # Mixed-size group buffer
app.state.total_rollouts_processed = 0

# Initialize requesters list if not already done
if not hasattr(app.state, "requesters"):
Expand All @@ -281,10 +299,10 @@ async def register(registration: Registration):

@app.post("/register-env")
async def register_env_url(register_env: RegisterEnv):
# Check if trainer has started
if not hasattr(app.state, "started") or not app.state.started:
# Check if trainer has registered
if not hasattr(app.state, "queue"):
return {
"status": "wait for trainer to start",
"status": "wait for trainer to register",
}

# Initialize envs list if not already done
Expand Down Expand Up @@ -461,13 +479,51 @@ async def scored_data_list(scored_data_list: List[ScoredData]):
async def get_status():
try:
return {
"current_step": app.state.status_dict["step"],
"current_step": app.state.status_dict.get("step", 0),
"queue_size": len(app.state.queue),
}
except AttributeError:
return {"current_step": 0, "queue_size": 0}


@app.get("/global-status", response_model=GlobalStatus)
async def get_global_status():
"""
Returns global metrics for the Elastic Orchestrator to monitor workload pressure.
"""
try:
# Calculate total unallocated fraction
total_min_allocation = 0.0
connected_envs = 0
for env_config in getattr(app.state, "envs", []):
if env_config.get("connected", False):
connected_envs += 1
if env_config.get("min_batch_allocation") is not None:
total_min_allocation += env_config["min_batch_allocation"]

unallocated_fraction = 1.0 - min(total_min_allocation, 1.0)

return {
"current_step": getattr(app.state, "status_dict", {}).get("step", 0),
"queue_size": len(getattr(app.state, "queue", [])),
"total_rollouts_processed": getattr(
app.state, "total_rollouts_processed", 0
),
"unallocated_fraction": unallocated_fraction,
"num_connected_envs": connected_envs,
"batch_size": getattr(app.state, "batchsize", -1),
}
except AttributeError:
return {
"current_step": 0,
"queue_size": 0,
"total_rollouts_processed": 0,
"unallocated_fraction": 1.0,
"num_connected_envs": 0,
"batch_size": -1,
}


@app.get("/status-env")
async def get_status_env(env: EnvIdentifier):
total = sum(
Expand All @@ -489,7 +545,8 @@ async def get_status_env(env: EnvIdentifier):

# Calculate total minimum allocations
total_min_allocation = 0.0
for env_config in app.state.envs:
envs = getattr(app.state, "envs", [])
for env_config in envs:
if (
env_config.get("connected", False)
and env_config.get("min_batch_allocation") is not None
Expand Down
Loading