diff --git a/3rdparty/Gym-workspace/Gym b/3rdparty/Gym-workspace/Gym index 23cdeb3807..01a9765f8c 160000 --- a/3rdparty/Gym-workspace/Gym +++ b/3rdparty/Gym-workspace/Gym @@ -1 +1 @@ -Subproject commit 23cdeb38077d7b72a5fbae0927a2e1a74bfc15f7 +Subproject commit 01a9765f8cba758c12a018b0e0e4d861ee4b916c diff --git a/examples/nemo_gym/run_grpo_nemo_gym.py b/examples/nemo_gym/run_grpo_nemo_gym.py index 34b6f0b5db..9ee0cc3238 100644 --- a/examples/nemo_gym/run_grpo_nemo_gym.py +++ b/examples/nemo_gym/run_grpo_nemo_gym.py @@ -212,6 +212,29 @@ def main() -> None: base_urls=policy_generation.dp_openai_server_base_urls, initial_global_config_dict=config["env"]["nemo_gym"], ) + # Support disaggregated Gym: connect to a remote Gym service instead of spawning local subprocesses. + # Two modes: (1) static URL via env.remote_gym_url, or (2) K8s endpoint registry via env.disagg_job_id. + remote_gym_url = config["env"].get("remote_gym_url") + disagg_job_id = config["env"].get("disagg_job_id") + if disagg_job_id: + import json + + from nemo_rl.distributed.k8s_endpoint_registry import K8sEndpointRegistry + + registry = K8sEndpointRegistry(job_id=disagg_job_id) + registry.create(owner_raycluster_name=os.environ.get("RAY_CLUSTER_NAME")) + + # Publish vLLM URLs so the Gym cluster can discover them. + vllm_urls = [u for u in policy_generation.dp_openai_server_base_urls if u] + registry.set("vllm_base_urls", json.dumps(vllm_urls)) + + # Wait for the Gym cluster to register its head server address. + print("Waiting for Gym head server to register in endpoint registry...") + remote_gym_url = registry.get("gym_head_server") + print(f"Discovered remote Gym service at: {remote_gym_url}") + if remote_gym_url: + nemo_gym_config["remote_gym_url"] = remote_gym_url + print(f"Using remote Gym service at: {remote_gym_url}") nemo_gym = create_env(env_name="nemo_gym", env_config=nemo_gym_config) # Blocking wait for NeMo-Gym to spin up ray.get(nemo_gym.health_check.remote()) diff --git a/extensions/k8s_cli/pyproject.toml b/extensions/k8s_cli/pyproject.toml new file mode 100644 index 0000000000..b9ef1e2094 --- /dev/null +++ b/extensions/k8s_cli/pyproject.toml @@ -0,0 +1,19 @@ +[project] +name = "nrl-k8s" +version = "0.1.0" +description = "CLI for managing nemo-rl workloads on Kubernetes with KAI scheduler" +requires-python = ">=3.10" +dependencies = ["kubernetes>=28.0", "click>=8.0", "rich>=13.0"] + +[project.optional-dependencies] +dev = ["pytest>=8.0", "pytest-mock>=3.0"] + +[project.scripts] +nrl-k8s = "nrl_k8s.cli:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/nrl_k8s"] diff --git a/extensions/k8s_cli/src/nrl_k8s/__init__.py b/extensions/k8s_cli/src/nrl_k8s/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/extensions/k8s_cli/src/nrl_k8s/cli.py b/extensions/k8s_cli/src/nrl_k8s/cli.py new file mode 100644 index 0000000000..7518c1bb0a --- /dev/null +++ b/extensions/k8s_cli/src/nrl_k8s/cli.py @@ -0,0 +1,149 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""CLI for managing nemo-rl workloads on Kubernetes with KAI scheduler.""" + +import click +from rich.console import Console +from rich.table import Table + +from nrl_k8s.k8s_client import get_gpu_occupancy, get_queues, submit_gang_rayjob + +console = Console() + + +@click.group() +def main(): + """nrl-k8s: Manage nemo-rl workloads on Kubernetes with KAI scheduler.""" + + +@main.command() +def fairshare(): + """Show KAI scheduler queue fairshare configuration.""" + queues = get_queues() + table = Table(title="KAI Scheduler Queues (Fairshare)") + table.add_column("Queue", style="cyan") + table.add_column("Parent", style="dim") + table.add_column("Priority", justify="right") + table.add_column("GPU Quota", justify="right") + table.add_column("GPU Limit", justify="right") + table.add_column("Weight", justify="right") + table.add_column("Preempt Min", style="dim") + table.add_column("Reclaim Min", style="dim") + + for q in queues: + table.add_row( + q["name"], + q["parent"] or "-", + str(q["priority"]) if q["priority"] else "-", + str(q["gpu_quota"]), + str(q["gpu_limit"]), + str(q["gpu_weight"]), + q["preempt_min_runtime"] or "-", + q["reclaim_min_runtime"] or "-", + ) + + console.print(table) + + +@main.command() +def occupancy(): + """Show current GPU occupancy per node and per queue.""" + data = get_gpu_occupancy() + + # Node table. + node_table = Table(title="GPU Occupancy by Node") + node_table.add_column("Node", style="cyan") + node_table.add_column("Allocatable", justify="right") + node_table.add_column("Allocated", justify="right") + node_table.add_column("Free", justify="right", style="green") + + for n in data["nodes"]: + if n["allocatable"] > 0: + node_table.add_row( + n["name"], + str(n["allocatable"]), + str(n["allocated"]), + str(n["allocatable"] - n["allocated"]), + ) + + node_table.add_section() + node_table.add_row( + "TOTAL", + str(data["total_allocatable"]), + str(data["total_allocated"]), + str(data["total_allocatable"] - data["total_allocated"]), + style="bold", + ) + console.print(node_table) + + # Queue table. + if data["queues"]: + queue_table = Table(title="GPU Occupancy by Queue") + queue_table.add_column("Queue", style="cyan") + queue_table.add_column("Allocated GPUs", justify="right") + for q in data["queues"]: + queue_table.add_row(q["name"], str(q["allocated_gpus"])) + console.print(queue_table) + else: + console.print("[dim]No GPU workloads running.[/dim]") + + +@main.command() +@click.argument("name") +@click.option("--queue", required=True, help="KAI scheduler queue name") +@click.option("--image", required=True, help="Container image") +@click.option("--entrypoint", required=True, help="Entrypoint command") +@click.option("--num-gpus", required=True, type=int, help="Total GPUs requested") +@click.option("--gpus-per-worker", default=1, type=int, help="GPUs per worker pod") +@click.option("--namespace", default="default", help="Kubernetes namespace") +@click.option( + "--segment-size", + default=None, + type=int, + help="Topology segment size (nodes per rack). Creates PodGroup subgroups.", +) +def submit( + name, queue, image, entrypoint, num_gpus, gpus_per_worker, namespace, segment_size +): + """Submit a gang-scheduled RayJob.""" + console.print( + f"Submitting RayJob [cyan]{name}[/cyan] to queue [yellow]{queue}[/yellow]" + ) + console.print( + f" GPUs: {num_gpus} ({num_gpus // gpus_per_worker} workers × {gpus_per_worker} GPU/worker)" + ) + if segment_size: + import math + + num_segments = math.ceil(num_gpus // gpus_per_worker / segment_size) + console.print( + f" Segments: {num_segments} × {segment_size} workers (topology-constrained per rack)" + ) + + result_name = submit_gang_rayjob( + name=name, + queue=queue, + image=image, + entrypoint=entrypoint, + num_gpus=num_gpus, + gpus_per_worker=gpus_per_worker, + namespace=namespace, + segment_size=segment_size, + ) + console.print(f"[green]Created RayJob: {result_name}[/green]") + console.print(f"Watch: kubectl get rayjob {result_name} -w") + + +if __name__ == "__main__": + main() diff --git a/extensions/k8s_cli/src/nrl_k8s/k8s_client.py b/extensions/k8s_cli/src/nrl_k8s/k8s_client.py new file mode 100644 index 0000000000..77f38a6698 --- /dev/null +++ b/extensions/k8s_cli/src/nrl_k8s/k8s_client.py @@ -0,0 +1,270 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Thin wrapper around the Kubernetes API for KAI scheduler queries.""" + +from __future__ import annotations + +from kubernetes import client, config +from kubernetes.client.exceptions import ApiException + + +def load_k8s_config(): + """Load in-cluster or kubeconfig.""" + try: + config.load_incluster_config() + except config.ConfigException: + config.load_kube_config() + + +def get_queues(namespace: str | None = None) -> list[dict]: + """Fetch all KAI scheduler queues and their resource status.""" + load_k8s_config() + custom = client.CustomObjectsApi() + result = custom.list_cluster_custom_object( + group="scheduling.run.ai", version="v2", plural="queues" + ) + queues = [] + for q in result.get("items", []): + spec = q.get("spec", {}) + gpu = spec.get("resources", {}).get("gpu", {}) + queues.append( + { + "name": q["metadata"]["name"], + "parent": spec.get("parentQueue", ""), + "priority": spec.get("priority", ""), + "gpu_quota": gpu.get("quota", -1), + "gpu_limit": gpu.get("limit", -1), + "gpu_weight": gpu.get("overQuotaWeight", 1), + "preempt_min_runtime": spec.get("preemptMinRuntime", ""), + "reclaim_min_runtime": spec.get("reclaimMinRuntime", ""), + } + ) + return queues + + +def get_gpu_occupancy(namespace: str = "default") -> dict: + """Get current GPU allocation per node and per queue. + + Returns: + { + "nodes": [{"name": ..., "allocatable": N, "allocated": M}], + "queues": [{"name": ..., "allocated_gpus": N}], + "total_allocatable": N, + "total_allocated": M, + } + """ + load_k8s_config() + v1 = client.CoreV1Api() + + # Node-level GPU info. + nodes_info = [] + total_allocatable = 0 + total_allocated = 0 + nodes = v1.list_node() + for node in nodes.items: + alloc = int(node.status.allocatable.get("nvidia.com/gpu", "0")) + total_allocatable += alloc + nodes_info.append( + {"name": node.metadata.name, "allocatable": alloc, "allocated": 0} + ) + + # Count allocated GPUs per node from running pods. + pods = v1.list_pod_for_all_namespaces(field_selector="status.phase=Running") + for pod in pods.items: + node_name = pod.spec.node_name + for container in pod.spec.containers: + limits = container.resources.limits or {} + gpu_req = int(limits.get("nvidia.com/gpu", "0")) + if gpu_req > 0: + total_allocated += gpu_req + for n in nodes_info: + if n["name"] == node_name: + n["allocated"] += gpu_req + + # Queue-level allocation from PodGroups. + queue_alloc: dict[str, int] = {} + for pod in pods.items: + queue = ( + pod.metadata.labels.get("kai.scheduler/queue", "") + if pod.metadata.labels + else "" + ) + if not queue: + continue + for container in pod.spec.containers: + limits = container.resources.limits or {} + gpu_req = int(limits.get("nvidia.com/gpu", "0")) + queue_alloc[queue] = queue_alloc.get(queue, 0) + gpu_req + + return { + "nodes": nodes_info, + "queues": [ + {"name": k, "allocated_gpus": v} for k, v in sorted(queue_alloc.items()) + ], + "total_allocatable": total_allocatable, + "total_allocated": total_allocated, + } + + +def submit_gang_rayjob( + name: str, + queue: str, + image: str, + entrypoint: str, + num_gpus: int, + gpus_per_worker: int = 1, + namespace: str = "default", + segment_size: int | None = None, +) -> str: + """Submit a RayJob with gang scheduling via KAI. + + If segment_size is specified, creates a PodGroup with subgroups for + topology-aware segment scheduling (equivalent to Slurm --segment=N). + + Returns the created RayJob name. + """ + load_k8s_config() + custom = client.CustomObjectsApi() + + num_workers = num_gpus // gpus_per_worker + + rayjob = { + "apiVersion": "ray.io/v1", + "kind": "RayJob", + "metadata": { + "name": name, + "namespace": namespace, + "labels": {"kai.scheduler/queue": queue}, + }, + "spec": { + "entrypoint": entrypoint, + "submissionMode": "HTTPMode", + "shutdownAfterJobFinishes": True, + "ttlSecondsAfterFinished": 60, + "rayClusterSpec": { + "rayVersion": "2.52.0", + "headGroupSpec": { + "rayStartParams": {"object-store-memory": "200000000"}, + "template": { + "spec": { + "schedulerName": "kai-scheduler", + "containers": [ + { + "name": "ray-head", + "image": image, + "resources": { + "requests": {"cpu": "1", "memory": "4Gi"}, + "limits": {"cpu": "4", "memory": "16Gi"}, + }, + "ports": [ + {"containerPort": 6379, "name": "gcs-server"}, + {"containerPort": 8265, "name": "dashboard"}, + ], + } + ], + } + }, + }, + "workerGroupSpecs": [ + { + "groupName": "gpu-workers", + "replicas": num_workers, + "minReplicas": num_workers, + "maxReplicas": num_workers, + "rayStartParams": { + "num-gpus": str(gpus_per_worker), + "object-store-memory": "200000000", + }, + "template": { + "spec": { + "schedulerName": "kai-scheduler", + "containers": [ + { + "name": "ray-worker", + "image": image, + "resources": { + "requests": { + "cpu": "1", + "memory": "4Gi", + "nvidia.com/gpu": str(gpus_per_worker), + }, + "limits": { + "cpu": "8", + "memory": "32Gi", + "nvidia.com/gpu": str(gpus_per_worker), + }, + }, + } + ], + } + }, + } + ], + }, + }, + } + + result = custom.create_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayjobs", + body=rayjob, + ) + + # If segment_size is specified, create a PodGroup with topology subgroups. + if segment_size and segment_size < num_workers: + import math + + num_segments = math.ceil(num_workers / segment_size) + cluster_name = result["status"].get("rayClusterName", name) + subgroups = [] + for i in range(num_segments): + subgroups.append( + { + "name": f"segment-{i}", + "minMember": min(segment_size, num_workers - i * segment_size), + "topologyConstraint": { + "topology": "cluster-topology", + "requiredTopologyLevel": "topology-rack", + }, + } + ) + + podgroup = { + "apiVersion": "scheduling.run.ai/v2alpha2", + "kind": "PodGroup", + "metadata": { + "name": f"pg-{name}", + "namespace": namespace, + }, + "spec": { + "minMember": num_workers, + "queue": queue, + "subGroups": subgroups, + }, + } + try: + custom.create_namespaced_custom_object( + group="scheduling.run.ai", + version="v2alpha2", + namespace=namespace, + plural="podgroups", + body=podgroup, + ) + except ApiException as e: + if e.status != 409: + raise + + return result["metadata"]["name"] diff --git a/extensions/k8s_cli/tests/__init__.py b/extensions/k8s_cli/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/extensions/k8s_cli/tests/test_k8s_client.py b/extensions/k8s_cli/tests/test_k8s_client.py new file mode 100644 index 0000000000..13d3f9f32a --- /dev/null +++ b/extensions/k8s_cli/tests/test_k8s_client.py @@ -0,0 +1,209 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for nrl_k8s.k8s_client (mocked K8s API).""" + +from unittest.mock import MagicMock, patch + +import pytest + + +@pytest.fixture(autouse=True) +def mock_k8s_config(): + """Prevent real K8s config loading in all tests.""" + with patch("nrl_k8s.k8s_client.load_k8s_config"): + yield + + +class TestGetQueues: + def test_parses_queue_fields(self): + mock_custom = MagicMock() + mock_custom.list_cluster_custom_object.return_value = { + "items": [ + { + "metadata": {"name": "org"}, + "spec": { + "resources": { + "gpu": {"quota": -1, "limit": -1, "overQuotaWeight": 1} + } + }, + }, + { + "metadata": {"name": "priority-team"}, + "spec": { + "parentQueue": "org", + "priority": 200, + "preemptMinRuntime": "4h", + "reclaimMinRuntime": "15m", + "resources": { + "gpu": {"quota": 1, "limit": 2, "overQuotaWeight": 2} + }, + }, + }, + ] + } + + with patch( + "nrl_k8s.k8s_client.client.CustomObjectsApi", return_value=mock_custom + ): + from nrl_k8s.k8s_client import get_queues + + queues = get_queues() + + assert len(queues) == 2 + assert queues[0]["name"] == "org" + assert queues[0]["gpu_quota"] == -1 + assert queues[1]["name"] == "priority-team" + assert queues[1]["parent"] == "org" + assert queues[1]["priority"] == 200 + assert queues[1]["gpu_quota"] == 1 + assert queues[1]["gpu_limit"] == 2 + assert queues[1]["gpu_weight"] == 2 + assert queues[1]["preempt_min_runtime"] == "4h" + assert queues[1]["reclaim_min_runtime"] == "15m" + + def test_empty_cluster(self): + mock_custom = MagicMock() + mock_custom.list_cluster_custom_object.return_value = {"items": []} + + with patch( + "nrl_k8s.k8s_client.client.CustomObjectsApi", return_value=mock_custom + ): + from nrl_k8s.k8s_client import get_queues + + assert get_queues() == [] + + +class TestGetGpuOccupancy: + def _make_node(self, name, gpu_count): + node = MagicMock() + node.metadata.name = name + node.status.allocatable = {"nvidia.com/gpu": str(gpu_count)} + return node + + def _make_pod(self, name, node_name, gpu_count, queue=""): + pod = MagicMock() + pod.metadata.name = name + pod.spec.node_name = node_name + pod.metadata.labels = {"kai.scheduler/queue": queue} if queue else {} + container = MagicMock() + container.resources.limits = ( + {"nvidia.com/gpu": str(gpu_count)} if gpu_count else {} + ) + pod.spec.containers = [container] + return pod + + def test_counts_gpus(self): + mock_v1 = MagicMock() + nodes = MagicMock() + nodes.items = [self._make_node("worker-0", 2)] + mock_v1.list_node.return_value = nodes + + pods = MagicMock() + pods.items = [ + self._make_pod("pod-a", "worker-0", 1, queue="priority-team"), + self._make_pod("pod-b", "worker-0", 1, queue="community"), + ] + mock_v1.list_pod_for_all_namespaces.return_value = pods + + with patch("nrl_k8s.k8s_client.client.CoreV1Api", return_value=mock_v1): + from nrl_k8s.k8s_client import get_gpu_occupancy + + result = get_gpu_occupancy() + + assert result["total_allocatable"] == 2 + assert result["total_allocated"] == 2 + assert result["nodes"][0]["allocated"] == 2 + assert len(result["queues"]) == 2 + + def test_empty_cluster(self): + mock_v1 = MagicMock() + nodes = MagicMock() + nodes.items = [self._make_node("worker-0", 2)] + mock_v1.list_node.return_value = nodes + pods = MagicMock() + pods.items = [] + mock_v1.list_pod_for_all_namespaces.return_value = pods + + with patch("nrl_k8s.k8s_client.client.CoreV1Api", return_value=mock_v1): + from nrl_k8s.k8s_client import get_gpu_occupancy + + result = get_gpu_occupancy() + + assert result["total_allocated"] == 0 + assert result["queues"] == [] + + +class TestSubmitGangRayjob: + def test_creates_rayjob(self): + mock_custom = MagicMock() + mock_custom.create_namespaced_custom_object.return_value = { + "metadata": {"name": "test-job"}, + "status": {}, + } + + with patch( + "nrl_k8s.k8s_client.client.CustomObjectsApi", return_value=mock_custom + ): + from nrl_k8s.k8s_client import submit_gang_rayjob + + result = submit_gang_rayjob( + name="test-job", + queue="priority-team", + image="rayproject/ray:2.52.0", + entrypoint="echo hello", + num_gpus=2, + ) + + assert result == "test-job" + call_args = mock_custom.create_namespaced_custom_object.call_args + body = call_args.kwargs["body"] + assert body["metadata"]["labels"]["kai.scheduler/queue"] == "priority-team" + assert body["spec"]["submissionMode"] == "HTTPMode" + workers = body["spec"]["rayClusterSpec"]["workerGroupSpecs"][0] + assert workers["replicas"] == 2 + + def test_with_segment_size(self): + mock_custom = MagicMock() + mock_custom.create_namespaced_custom_object.return_value = { + "metadata": {"name": "seg-job"}, + "status": {"rayClusterName": "seg-job-abc"}, + } + + with patch( + "nrl_k8s.k8s_client.client.CustomObjectsApi", return_value=mock_custom + ): + from nrl_k8s.k8s_client import submit_gang_rayjob + + submit_gang_rayjob( + name="seg-job", + queue="priority-team", + image="rayproject/ray:2.52.0", + entrypoint="echo hello", + num_gpus=4, + segment_size=2, + ) + + # Should have 2 calls: RayJob + PodGroup + assert mock_custom.create_namespaced_custom_object.call_count == 2 + pg_call = mock_custom.create_namespaced_custom_object.call_args_list[1] + pg_body = pg_call.kwargs["body"] + assert pg_body["kind"] == "PodGroup" + assert len(pg_body["spec"]["subGroups"]) == 2 + assert pg_body["spec"]["subGroups"][0]["minMember"] == 2 + assert ( + pg_body["spec"]["subGroups"][0]["topologyConstraint"][ + "requiredTopologyLevel" + ] + == "topology-rack" + ) diff --git a/infra/examples/disagg_gym_raycluster.yaml b/infra/examples/disagg_gym_raycluster.yaml new file mode 100644 index 0000000000..eaa951ff48 --- /dev/null +++ b/infra/examples/disagg_gym_raycluster.yaml @@ -0,0 +1,73 @@ +# Disaggregated Gym RayCluster with peer-watcher sidecar. +# Runs NeMo Gym servers as a standalone HTTP service. +# Uses K8s ConfigMap endpoint registry for service discovery with the RL cluster. +# The peer-watcher sidecar monitors raycluster-rl and tears down both clusters +# if the RL cluster fails or the application signals an error. +# +# Prerequisites: +# kubectl apply -f endpoint-registry-rbac.yaml +apiVersion: ray.io/v1 +kind: RayCluster +metadata: + name: raycluster-gym + labels: + kai.scheduler/queue: priority-team +spec: + rayVersion: "2.52.0" + headGroupSpec: + rayStartParams: + object-store-memory: "200000000" + template: + spec: + serviceAccountName: nemo-rl-endpoint-registry + schedulerName: kai-scheduler + imagePullSecrets: + - name: nvcr-secret + containers: + - name: ray-head + image: nvcr.io/nvidian/nemo-rl:e5a729c-47084432 + resources: + limits: + cpu: "4" + memory: "16Gi" + requests: + cpu: "1" + memory: "4Gi" + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 + name: dashboard + - containerPort: 9090 + name: head-server + volumeMounts: + - name: peer-watcher-script + mountPath: /opt/peer-watcher + # Sidecar: monitors raycluster-rl, tears down both on failure. + - name: peer-watcher + image: python:3.12-slim + command: ["python3", "/opt/peer-watcher/peer-watcher.py"] + env: + - name: SELF_CLUSTER_NAME + value: raycluster-gym + - name: PEER_CLUSTER_NAME + value: raycluster-rl + - name: POLL_INTERVAL + value: "10" + - name: MAX_PEER_FAILURES + value: "3" + resources: + requests: + cpu: "50m" + memory: "64Mi" + limits: + cpu: "100m" + memory: "128Mi" + volumeMounts: + - name: peer-watcher-script + mountPath: /opt/peer-watcher + volumes: + - name: peer-watcher-script + configMap: + name: peer-watcher-script + defaultMode: 0755 diff --git a/infra/examples/disagg_rl_raycluster.yaml b/infra/examples/disagg_rl_raycluster.yaml new file mode 100644 index 0000000000..b4b30cf862 --- /dev/null +++ b/infra/examples/disagg_rl_raycluster.yaml @@ -0,0 +1,120 @@ +# Disaggregated RL RayCluster with peer-watcher sidecar. +# Runs nemo-rl GRPO training with vLLM workers. +# Uses K8s ConfigMap endpoint registry for service discovery with the Gym cluster. +# The peer-watcher sidecar monitors raycluster-gym and tears down both clusters +# if the Gym cluster fails or the application signals an error. +# +# Prerequisites: +# kubectl apply -f endpoint-registry-rbac.yaml +apiVersion: ray.io/v1 +kind: RayCluster +metadata: + name: raycluster-rl + labels: + kai.scheduler/queue: priority-team +spec: + rayVersion: "2.52.0" + headGroupSpec: + rayStartParams: + object-store-memory: "200000000" + template: + spec: + serviceAccountName: nemo-rl-endpoint-registry + schedulerName: kai-scheduler + imagePullSecrets: + - name: nvcr-secret + containers: + - name: ray-head + image: nvcr.io/nvidian/nemo-rl:e5a729c-47084432 + env: + - name: RAY_CLUSTER_NAME + value: raycluster-rl + resources: + limits: + cpu: "4" + memory: "16Gi" + requests: + cpu: "1" + memory: "4Gi" + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 + name: dashboard + - containerPort: 10001 + name: client + volumeMounts: + - name: nemo-rl-src + mountPath: /workspace/nemo-rl + - name: peer-watcher-script + mountPath: /opt/peer-watcher + # Sidecar: monitors raycluster-gym, tears down both on failure. + - name: peer-watcher + image: python:3.12-slim + command: ["python3", "/opt/peer-watcher/peer-watcher.py"] + env: + - name: SELF_CLUSTER_NAME + value: raycluster-rl + - name: PEER_CLUSTER_NAME + value: raycluster-gym + - name: POLL_INTERVAL + value: "10" + - name: MAX_PEER_FAILURES + value: "3" + # JOB_ID should be set to match +env.disagg_job_id in the GRPO command. + # Uncomment and set when using the endpoint registry. + # - name: JOB_ID + # value: "my-job-id" + resources: + requests: + cpu: "50m" + memory: "64Mi" + limits: + cpu: "100m" + memory: "128Mi" + volumeMounts: + - name: peer-watcher-script + mountPath: /opt/peer-watcher + volumes: + - name: nemo-rl-src + hostPath: + path: /workspace/nemo-rl + type: DirectoryOrCreate + - name: peer-watcher-script + configMap: + name: peer-watcher-script + defaultMode: 0755 + workerGroupSpecs: + - groupName: gpu-workers + replicas: 1 + minReplicas: 1 + maxReplicas: 1 + rayStartParams: + num-gpus: "2" + object-store-memory: "200000000" + template: + spec: + serviceAccountName: nemo-rl-endpoint-registry + schedulerName: kai-scheduler + imagePullSecrets: + - name: nvcr-secret + containers: + - name: ray-worker + image: nvcr.io/nvidian/nemo-rl:e5a729c-47084432 + resources: + limits: + cpu: "8" + memory: "32Gi" + nvidia.com/gpu: "2" + requests: + cpu: "2" + memory: "8Gi" + nvidia.com/gpu: "2" + volumeMounts: + - name: nemo-rl-src + mountPath: /workspace/nemo-rl + volumes: + - name: nemo-rl-src + hostPath: + path: /workspace/nemo-rl + type: DirectoryOrCreate diff --git a/infra/examples/endpoint-registry-rbac.yaml b/infra/examples/endpoint-registry-rbac.yaml new file mode 100644 index 0000000000..5e46c03b65 --- /dev/null +++ b/infra/examples/endpoint-registry-rbac.yaml @@ -0,0 +1,31 @@ +# RBAC for disaggregated RL-Gym service discovery. +# Allows Ray pods to CRUD ConfigMaps (as an endpoint registry) +# and read RayClusters (for ownerReference UID lookup). +apiVersion: v1 +kind: ServiceAccount +metadata: + name: nemo-rl-endpoint-registry +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: nemo-rl-endpoint-registry +rules: +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "create", "update", "patch", "delete"] +- apiGroups: ["ray.io"] + resources: ["rayclusters"] + verbs: ["get", "delete"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: nemo-rl-endpoint-registry +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: nemo-rl-endpoint-registry +subjects: +- kind: ServiceAccount + name: nemo-rl-endpoint-registry diff --git a/infra/examples/gym_standalone_config.yaml b/infra/examples/gym_standalone_config.yaml new file mode 100644 index 0000000000..6cd45c7b54 --- /dev/null +++ b/infra/examples/gym_standalone_config.yaml @@ -0,0 +1,12 @@ +# Gym standalone server config (extracted from env.nemo_gym section). +# Used with: python -m nemo_gym.standalone_server --config-yaml this_file.yaml +config_paths: +- responses_api_models/vllm_model/configs/vllm_model_for_training.yaml +- resources_servers/workplace_assistant/configs/workplace_assistant.yaml +policy_model: + responses_api_models: + vllm_model: + extra_body: + chat_template_kwargs: + enable_thinking: false + uses_reasoning_parser: false diff --git a/infra/examples/kai-grafana-dashboard.yaml b/infra/examples/kai-grafana-dashboard.yaml new file mode 100644 index 0000000000..3d3a1ae88c --- /dev/null +++ b/infra/examples/kai-grafana-dashboard.yaml @@ -0,0 +1,117 @@ +# Grafana dashboard for KAI scheduler fairshare monitoring. +# Deployed as a ConfigMap — Grafana's sidecar auto-discovers it. +# +# Panels: +# 1. GPU Allocation vs Fair Share per Queue (bar chart) +# 2. GPU Allocation Over Time (time series) +# 3. Preemption Events (counter) +# 4. Scheduling Latency (gauge) +apiVersion: v1 +kind: ConfigMap +metadata: + name: kai-fairshare-dashboard + namespace: monitoring + labels: + grafana_dashboard: "1" +data: + kai-fairshare.json: | + { + "annotations": { "list": [] }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "links": [], + "panels": [ + { + "title": "GPU Allocation vs Fair Share per Queue", + "description": "Current GPU allocation (solid) vs computed fair share (dashed) for each queue. Queues above their fair share may have resources reclaimed.", + "type": "bargauge", + "gridPos": { "h": 8, "w": 24, "x": 0, "y": 0 }, + "targets": [ + { + "expr": "kai_queue_allocated_gpus", + "legendFormat": "{{queue_name}} allocated", + "refId": "A" + }, + { + "expr": "kai_queue_deserved_gpus", + "legendFormat": "{{queue_name}} deserved (fair share)", + "refId": "B" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] } + } + } + }, + { + "title": "GPU Allocation Over Time", + "description": "How GPU allocation per queue changes over time. Useful for observing fairshare oscillation and reclaim events.", + "type": "timeseries", + "gridPos": { "h": 10, "w": 24, "x": 0, "y": 8 }, + "targets": [ + { + "expr": "kai_queue_allocated_gpus", + "legendFormat": "{{queue_name}} allocated", + "refId": "A" + }, + { + "expr": "kai_queue_deserved_gpus", + "legendFormat": "{{queue_name}} fair share", + "refId": "B" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "custom": { "lineWidth": 2, "fillOpacity": 10 } + } + } + }, + { + "title": "Preemption & Eviction Events", + "description": "Count of preemption attempts and pod evictions. Spikes indicate resource contention between queues.", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 18 }, + "targets": [ + { + "expr": "rate(kai_total_preemption_attempts[5m])", + "legendFormat": "preemption attempts/s", + "refId": "A" + }, + { + "expr": "rate(kai_pod_group_evicted_pods_total[5m])", + "legendFormat": "evictions/s ({{queue_name}})", + "refId": "B" + } + ], + "fieldConfig": { + "defaults": { "unit": "ops", "custom": { "lineWidth": 2 } } + } + }, + { + "title": "Scheduling Latency", + "description": "End-to-end scheduling cycle latency. High latency may indicate resource fragmentation or too many pending workloads.", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 18 }, + "targets": [ + { + "expr": "kai_e2e_scheduling_latency_milliseconds", + "legendFormat": "e2e latency (ms)", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { "unit": "ms", "custom": { "lineWidth": 2 } } + } + } + ], + "schemaVersion": 39, + "tags": ["kai", "fairshare", "gpu"], + "templating": { "list": [] }, + "time": { "from": "now-1h", "to": "now" }, + "title": "KAI Scheduler Fairshare", + "uid": "kai-fairshare" + } diff --git a/infra/examples/kai-queue-prod.yaml b/infra/examples/kai-queue-prod.yaml new file mode 100644 index 0000000000..1b34306502 --- /dev/null +++ b/infra/examples/kai-queue-prod.yaml @@ -0,0 +1,131 @@ +# KAI Scheduler queue hierarchy for a 256-GPU production cluster. +# Imbalanced setup: priority teams get guarantees and preferential treatment, +# community teams run best-effort on idle resources. +# +# Queue hierarchy: +# org (root, unlimited) +# ├── priority (department, priority 200, 192 GPU quota, burst to 240) +# │ ├── priority-training (128 GPU quota, burst to 192, weight 2) +# │ └── priority-inference (64 GPU quota, burst to 80, weight 3) +# └── community (department, priority 50, 32 GPU quota, burst to 128) +# ├── community-team-a (16 GPU quota, burst to 64) +# └── community-team-b (16 GPU quota, burst to 64) +# +# Total guaranteed: 192 + 32 = 224 GPUs (out of 256). +# Remaining 32 GPUs go to priority department first (priority 200 > 50). +# +# preemptMinRuntime: "4h" — protects running jobs from higher-priority preemption +# for at least 4 hours. Prevents expensive training runs from being killed early. +# reclaimMinRuntime: "15m" — allows fast reclaim of over-quota resources. +# When a team is using more than its fair share and another team needs its +# guaranteed quota, reclaim happens after a 15-minute grace period. +# This is shorter than preempt because reclaim is about fairness (giving back +# what you owe), not priority (a VIP taking your resources). + +# ---------- Root ---------- +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: org +spec: + resources: + gpu: { quota: -1, limit: -1, overQuotaWeight: 1 } + cpu: { quota: -1, limit: -1, overQuotaWeight: 1 } + memory: { quota: -1, limit: -1, overQuotaWeight: 1 } + +--- +# ---------- Priority department ---------- +# VIP teams with guaranteed resources and highest over-quota priority. +# Gets surplus GPUs before community, reclaimed last. +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: priority +spec: + parentQueue: org + priority: 200 # served first for over-quota, reclaimed last + resources: + gpu: { quota: 192, limit: 240, overQuotaWeight: 3 } + cpu: { quota: -1, limit: -1, overQuotaWeight: 3 } + memory: { quota: -1, limit: -1, overQuotaWeight: 3 } + +--- +# Priority training — large distributed training jobs. +# 128 GPUs guaranteed, burst to 192 for big runs. +# Weight 2 within priority department (gets 2/5 of priority's surplus vs inference's 3/5). +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: priority-training +spec: + parentQueue: priority + preemptMinRuntime: "4h" + reclaimMinRuntime: "15m" + resources: + gpu: { quota: 128, limit: 192, overQuotaWeight: 2 } + cpu: { quota: -1, limit: -1, overQuotaWeight: 2 } + memory: { quota: -1, limit: -1, overQuotaWeight: 2 } + +--- +# Priority inference — latency-sensitive serving. +# 64 GPUs guaranteed, capped at 80 (don't over-provision serving). +# Weight 3 within priority department (gets 3/5 of priority's surplus). +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: priority-inference +spec: + parentQueue: priority + preemptMinRuntime: "4h" + reclaimMinRuntime: "15m" + resources: + gpu: { quota: 64, limit: 80, overQuotaWeight: 3 } + cpu: { quota: -1, limit: -1, overQuotaWeight: 3 } + memory: { quota: -1, limit: -1, overQuotaWeight: 3 } + +--- +# ---------- Community department ---------- +# Best-effort teams that use idle resources. Low priority, small guarantees. +# First to be reclaimed when priority department needs resources. +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: community +spec: + parentQueue: org + priority: 50 # lowest priority — served last, reclaimed first + resources: + gpu: { quota: 32, limit: 128, overQuotaWeight: 1 } + cpu: { quota: -1, limit: -1, overQuotaWeight: 1 } + memory: { quota: -1, limit: -1, overQuotaWeight: 1 } + +--- +# Community Team A — equal split of community's small guarantee. +# Can burst to 64 GPUs when the cluster is idle. +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: community-team-a +spec: + parentQueue: community + preemptMinRuntime: "4h" + reclaimMinRuntime: "15m" + resources: + gpu: { quota: 16, limit: 64, overQuotaWeight: 1 } + cpu: { quota: -1, limit: -1, overQuotaWeight: 1 } + memory: { quota: -1, limit: -1, overQuotaWeight: 1 } + +--- +# Community Team B — same as Team A. +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: community-team-b +spec: + parentQueue: community + preemptMinRuntime: "4h" + reclaimMinRuntime: "15m" + resources: + gpu: { quota: 16, limit: 64, overQuotaWeight: 1 } + cpu: { quota: -1, limit: -1, overQuotaWeight: 1 } + memory: { quota: -1, limit: -1, overQuotaWeight: 1 } diff --git a/infra/examples/kai-queue.yaml b/infra/examples/kai-queue.yaml new file mode 100644 index 0000000000..1e8e4f1764 --- /dev/null +++ b/infra/examples/kai-queue.yaml @@ -0,0 +1,61 @@ +# KAI Scheduler queue hierarchy for a 2-GPU kind cluster. +# One priority team and one best-effort team to demonstrate fairshare imbalance. +# +# Queue hierarchy: +# org (root, unlimited) +# ├── priority-team (1 GPU guaranteed, burst to 2, higher priority + weight) +# └── community (0 GPU guaranteed, uses idle GPUs only, lower priority) +# +# Behavior: +# - priority-team always gets its 1 GPU quota, and gets surplus first (priority 200). +# - community has no guarantee — it runs on whatever priority-team isn't using. +# - If both compete, priority-team gets 2x the surplus (overQuotaWeight 2 vs 1). +# - community jobs can be reclaimed after 15m if priority-team needs resources back. +# - priority-team jobs are protected from preemption for 4 hours. + +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: org +spec: + resources: + gpu: { quota: -1, limit: -1, overQuotaWeight: 1 } + cpu: { quota: -1, limit: -1, overQuotaWeight: 1 } + memory: { quota: -1, limit: -1, overQuotaWeight: 1 } +--- +# Priority team — guaranteed resources, high priority, protected from preemption. +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: priority-team +spec: + parentQueue: org + priority: 200 # served first for over-quota, reclaimed last + preemptMinRuntime: "4h" # 4 hours before a higher-priority queue can preempt + reclaimMinRuntime: "15m" # 15 min grace before over-quota resources reclaimed + resources: + gpu: + quota: 1 # guaranteed 1 GPU + limit: 2 # can burst to full cluster when idle + overQuotaWeight: 2 # gets 2x surplus share vs community + cpu: { quota: -1, limit: -1, overQuotaWeight: 2 } + memory: { quota: -1, limit: -1, overQuotaWeight: 2 } +--- +# Community — best-effort, no guarantee, uses idle resources. +# First to be reclaimed when priority-team needs resources back. +apiVersion: scheduling.run.ai/v2 +kind: Queue +metadata: + name: community +spec: + parentQueue: org + priority: 50 # lowest priority — served last, reclaimed first + preemptMinRuntime: "4h" # still protect running jobs for 4 hours + reclaimMinRuntime: "15m" # 15 min grace (same as priority-team for fairness) + resources: + gpu: + quota: 0 # no guaranteed GPUs + limit: 2 # can use the whole cluster if nobody else needs it + overQuotaWeight: 1 # half the surplus weight of priority-team + cpu: { quota: -1, limit: -1, overQuotaWeight: 1 } + memory: { quota: -1, limit: -1, overQuotaWeight: 1 } diff --git a/infra/examples/kai-service-monitors.yaml b/infra/examples/kai-service-monitors.yaml new file mode 100644 index 0000000000..3ee42c8fab --- /dev/null +++ b/infra/examples/kai-service-monitors.yaml @@ -0,0 +1,53 @@ +# ServiceMonitors for KAI scheduler components. +# These tell Prometheus to scrape metrics from the scheduler, binder, and queue-controller. +# +# Prerequisites: kube-prometheus-stack must be installed (provides Prometheus Operator). +# Apply after: helmfile sync + +# Scheduler metrics: scheduling latency, fairshare allocation, preemption counts. +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: kai-scheduler + namespace: kai-scheduler + labels: + app: kai-scheduler +spec: + selector: + matchLabels: + app: kai-scheduler-default + endpoints: + - port: metrics + interval: 15s +--- +# Binder metrics: bind latency, bind success/failure rates. +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: kai-binder + namespace: kai-scheduler + labels: + app: kai-binder +spec: + selector: + matchLabels: + app: binder + endpoints: + - port: metrics + interval: 15s +--- +# Queue controller metrics: queue allocation, deserved GPUs, fair share. +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: kai-queue-controller + namespace: kai-scheduler + labels: + app: kai-queue-controller +spec: + selector: + matchLabels: + app: queue-controller + endpoints: + - port: metrics + interval: 15s diff --git a/infra/examples/kai_scheduled_pods.yaml b/infra/examples/kai_scheduled_pods.yaml new file mode 100644 index 0000000000..aeceda565c --- /dev/null +++ b/infra/examples/kai_scheduled_pods.yaml @@ -0,0 +1,47 @@ +# Gang-schedule two GPU pods via KAI scheduler. +# Both pods are scheduled simultaneously or not at all. +apiVersion: scheduling.run.ai/v2alpha2 +kind: PodGroup +metadata: + name: gpu-test-group +spec: + minMember: 2 + queue: priority-team +--- +apiVersion: v1 +kind: Pod +metadata: + name: gpu-test-0 + labels: + kai.scheduler/queue: priority-team + annotations: + pod-group-name: gpu-test-group +spec: + schedulerName: kai-scheduler + restartPolicy: Never + containers: + - name: gpu-test + image: nvidia/cuda:12.8.1-base-ubuntu24.04 + command: ["nvidia-smi"] + resources: + limits: + nvidia.com/gpu: "1" +--- +apiVersion: v1 +kind: Pod +metadata: + name: gpu-test-1 + labels: + kai.scheduler/queue: priority-team + annotations: + pod-group-name: gpu-test-group +spec: + schedulerName: kai-scheduler + restartPolicy: Never + containers: + - name: gpu-test + image: nvidia/cuda:12.8.1-base-ubuntu24.04 + command: ["nvidia-smi"] + resources: + limits: + nvidia.com/gpu: "1" diff --git a/infra/examples/kai_scheduled_rayclusters.yaml b/infra/examples/kai_scheduled_rayclusters.yaml new file mode 100644 index 0000000000..2e91a18956 --- /dev/null +++ b/infra/examples/kai_scheduled_rayclusters.yaml @@ -0,0 +1,111 @@ +# Gang-schedule two RayClusters via KAI scheduler. +# Each cluster gets 1 GPU worker. Both clusters should come up simultaneously. +apiVersion: ray.io/v1 +kind: RayCluster +metadata: + name: raycluster-a + labels: + kai.scheduler/queue: priority-team +spec: + rayVersion: "2.52.0" + headGroupSpec: + rayStartParams: + object-store-memory: "100000000" # 100MB, minimum for kind + template: + spec: + schedulerName: kai-scheduler + containers: + - name: ray-head + image: rayproject/ray:2.52.0 + resources: + limits: + cpu: "1" + memory: "2Gi" + requests: + cpu: "500m" + memory: "512Mi" + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 + name: dashboard + - containerPort: 10001 + name: client + workerGroupSpecs: + - groupName: gpu-workers + replicas: 1 + minReplicas: 1 + maxReplicas: 1 + rayStartParams: + num-gpus: "1" + object-store-memory: "100000000" + template: + spec: + schedulerName: kai-scheduler + containers: + - name: ray-worker + image: rayproject/ray:2.52.0-gpu + resources: + limits: + cpu: "1" + memory: "2Gi" + nvidia.com/gpu: "1" + requests: + cpu: "500m" + memory: "512Mi" + nvidia.com/gpu: "1" +--- +apiVersion: ray.io/v1 +kind: RayCluster +metadata: + name: raycluster-b + labels: + kai.scheduler/queue: priority-team +spec: + rayVersion: "2.52.0" + headGroupSpec: + rayStartParams: + object-store-memory: "100000000" + template: + spec: + schedulerName: kai-scheduler + containers: + - name: ray-head + image: rayproject/ray:2.52.0 + resources: + limits: + cpu: "1" + memory: "2Gi" + requests: + cpu: "500m" + memory: "512Mi" + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 + name: dashboard + - containerPort: 10001 + name: client + workerGroupSpecs: + - groupName: gpu-workers + replicas: 1 + minReplicas: 1 + maxReplicas: 1 + rayStartParams: + num-gpus: "1" + object-store-memory: "100000000" + template: + spec: + schedulerName: kai-scheduler + containers: + - name: ray-worker + image: rayproject/ray:2.52.0-gpu + resources: + limits: + cpu: "1" + memory: "2Gi" + nvidia.com/gpu: "1" + requests: + cpu: "500m" + memory: "512Mi" + nvidia.com/gpu: "1" diff --git a/infra/examples/kai_scheduled_sft.yaml b/infra/examples/kai_scheduled_sft.yaml new file mode 100644 index 0000000000..ea678b63df --- /dev/null +++ b/infra/examples/kai_scheduled_sft.yaml @@ -0,0 +1,127 @@ +# Gang-schedule two 1-GPU SFT RayJobs via KAI scheduler. +# Each runs SFT with cluster.gpus_per_node=1, then tears down. +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: sft-job-a + labels: + kai.scheduler/queue: team-a +spec: + entrypoint: "cd /opt/nemo-rl && bash tests/functional/sft.sh cluster.gpus_per_node=1" + # HTTPMode required with KAI scheduler (K8sJobMode breaks gang scheduling). + submissionMode: HTTPMode + shutdownAfterJobFinishes: true + ttlSecondsAfterFinished: 60 + rayClusterSpec: + rayVersion: "2.52.0" + headGroupSpec: + rayStartParams: + object-store-memory: "200000000" + template: + spec: + schedulerName: kai-scheduler + imagePullSecrets: + - name: nvcr-secret + containers: + - name: ray-head + image: nvcr.io/nvidian/nemo-rl:e5a729c-47084432 + resources: + limits: + cpu: "4" + memory: "16Gi" + requests: + cpu: "1" + memory: "4Gi" + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 + name: dashboard + workerGroupSpecs: + - groupName: gpu-workers + replicas: 1 + minReplicas: 1 + maxReplicas: 1 + rayStartParams: + num-gpus: "1" + object-store-memory: "200000000" + template: + spec: + schedulerName: kai-scheduler + imagePullSecrets: + - name: nvcr-secret + containers: + - name: ray-worker + image: nvcr.io/nvidian/nemo-rl:e5a729c-47084432 + resources: + limits: + cpu: "4" + memory: "16Gi" + nvidia.com/gpu: "1" + requests: + cpu: "1" + memory: "4Gi" + nvidia.com/gpu: "1" +--- +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: sft-job-b + labels: + kai.scheduler/queue: team-a +spec: + entrypoint: "cd /opt/nemo-rl && bash tests/functional/sft.sh cluster.gpus_per_node=1" + # HTTPMode required with KAI scheduler (K8sJobMode breaks gang scheduling). + submissionMode: HTTPMode + shutdownAfterJobFinishes: true + ttlSecondsAfterFinished: 60 + rayClusterSpec: + rayVersion: "2.52.0" + headGroupSpec: + rayStartParams: + object-store-memory: "200000000" + template: + spec: + schedulerName: kai-scheduler + imagePullSecrets: + - name: nvcr-secret + containers: + - name: ray-head + image: nvcr.io/nvidian/nemo-rl:e5a729c-47084432 + resources: + limits: + cpu: "4" + memory: "16Gi" + requests: + cpu: "1" + memory: "4Gi" + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 + name: dashboard + workerGroupSpecs: + - groupName: gpu-workers + replicas: 1 + minReplicas: 1 + maxReplicas: 1 + rayStartParams: + num-gpus: "1" + object-store-memory: "200000000" + template: + spec: + schedulerName: kai-scheduler + imagePullSecrets: + - name: nvcr-secret + containers: + - name: ray-worker + image: nvcr.io/nvidian/nemo-rl:e5a729c-47084432 + resources: + limits: + cpu: "4" + memory: "16Gi" + nvidia.com/gpu: "1" + requests: + cpu: "1" + memory: "4Gi" + nvidia.com/gpu: "1" diff --git a/infra/examples/kyverno-kai-policies.yaml b/infra/examples/kyverno-kai-policies.yaml new file mode 100644 index 0000000000..f500edc292 --- /dev/null +++ b/infra/examples/kyverno-kai-policies.yaml @@ -0,0 +1,106 @@ +# Kyverno policies for KAI scheduler queue enforcement. +# +# Policy 1: RayCluster and RayJob must have a kai.scheduler/queue label. +# Validates at the CRD level (not Pod level) because the KubeRay operator +# creates pods — pod-level validation would check the operator's identity, +# not the actual user who submitted the workload. +# +# Policy 2 (optional): Validate that the queue is allowed for the requesting user. +# Uses a ConfigMap (kai-queue-permissions) for user→queue mapping. +# The user identity comes from the K8s AdmissionReview userInfo (OIDC email, +# certificate CN, or ServiceAccount name). +# +# Prerequisites: +# helm install kyverno kyverno/kyverno -n kyverno --create-namespace + +# --- Policy 1: Require queue label --- +apiVersion: kyverno.io/v1 +kind: ClusterPolicy +metadata: + name: require-kai-queue-label + annotations: + policies.kyverno.io/title: Require KAI queue label on Ray workloads + policies.kyverno.io/description: >- + RayCluster and RayJob resources must specify a kai.scheduler/queue label + so the KAI scheduler can assign them to the correct fairshare queue. +spec: + validationFailureAction: Enforce + background: true + rules: + - name: require-kai-queue-on-raycluster + match: + any: + - resources: + kinds: + - ray.io/v1/RayCluster + validate: + message: >- + RayCluster "{{request.object.metadata.name}}" must have a + 'kai.scheduler/queue' label. Add it to metadata.labels. + pattern: + metadata: + labels: + kai.scheduler/queue: "?*" + - name: require-kai-queue-on-rayjob + match: + any: + - resources: + kinds: + - ray.io/v1/RayJob + validate: + message: >- + RayJob "{{request.object.metadata.name}}" must have a + 'kai.scheduler/queue' label. Add it to metadata.labels. + pattern: + metadata: + labels: + kai.scheduler/queue: "?*" + +--- +# --- Policy 2: Validate user is allowed to use the queue (optional) --- +# Uncomment and configure the ConfigMap below to enable. +# The ConfigMap maps K8s usernames to comma-separated allowed queues. +# +# apiVersion: v1 +# kind: ConfigMap +# metadata: +# name: kai-queue-permissions +# namespace: kyverno +# data: +# # K8s username → allowed queues (comma-separated) +# alice@company.com: "team-a,team-b" +# bob@company.com: "team-b" +# # ServiceAccounts use the format: system:serviceaccount:namespace:name +# system:serviceaccount:default:nemo-rl-endpoint-registry: "team-a,team-b" +# --- +# apiVersion: kyverno.io/v1 +# kind: ClusterPolicy +# metadata: +# name: validate-kai-queue-permission +# spec: +# validationFailureAction: Enforce +# background: false +# rules: +# - name: check-queue-permission +# match: +# any: +# - resources: +# kinds: +# - ray.io/v1/RayCluster +# - ray.io/v1/RayJob +# context: +# - name: permissions +# configMap: +# name: kai-queue-permissions +# namespace: kyverno +# validate: +# message: >- +# User "{{request.userInfo.username}}" is not allowed to use queue +# "{{request.object.metadata.labels."kai.scheduler/queue"}}". +# Allowed queues: {{permissions.data[request.userInfo.username] || 'none'}} +# deny: +# conditions: +# all: +# - key: "{{request.object.metadata.labels.\"kai.scheduler/queue\"}}" +# operator: AnyNotIn +# value: "{{permissions.data[request.userInfo.username] || '' | split(@, ',')}}" diff --git a/infra/examples/peer-watcher.py b/infra/examples/peer-watcher.py new file mode 100644 index 0000000000..97a2e7371e --- /dev/null +++ b/infra/examples/peer-watcher.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Sidecar that monitors a peer RayCluster and tears down both clusters on failure. + +Used for disaggregated RL-Gym setups where both clusters should fail fast together. +Runs as a sidecar container in the head pod of each RayCluster. + +Monitors: + 1. Peer RayCluster status (deleted / failed / suspended) + 2. ConfigMap "error" key (set by the application via K8sEndpointRegistry.signal_error()) + +Environment variables: + SELF_CLUSTER_NAME - Name of this RayCluster (to self-delete) + PEER_CLUSTER_NAME - Name of the peer RayCluster to watch + JOB_ID - Job ID for the ConfigMap endpoint registry (optional) + POLL_INTERVAL - Seconds between status checks (default: 10) + MAX_PEER_FAILURES - Consecutive failures before teardown (default: 3) +""" + +import json +import os +import ssl +import sys +import time +import urllib.request +from pathlib import Path + +SELF_CLUSTER_NAME = os.environ["SELF_CLUSTER_NAME"] +PEER_CLUSTER_NAME = os.environ["PEER_CLUSTER_NAME"] +JOB_ID = os.environ.get("JOB_ID", "") +POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "10")) +MAX_PEER_FAILURES = int(os.environ.get("MAX_PEER_FAILURES", "3")) + +SA_PATH = Path("/var/run/secrets/kubernetes.io/serviceaccount") +NAMESPACE = ( + (SA_PATH / "namespace").read_text().strip() + if (SA_PATH / "namespace").exists() + else "default" +) +TOKEN = (SA_PATH / "token").read_text().strip() if (SA_PATH / "token").exists() else "" +APISERVER = "https://kubernetes.default.svc" + +# Trust the in-cluster CA. +SSL_CTX = ( + ssl.create_default_context(cafile=str(SA_PATH / "ca.crt")) + if (SA_PATH / "ca.crt").exists() + else ssl._create_unverified_context() +) + + +def kube_request(path: str, method: str = "GET") -> dict: + req = urllib.request.Request(f"{APISERVER}{path}", method=method) + req.add_header("Authorization", f"Bearer {TOKEN}") + try: + with urllib.request.urlopen(req, context=SSL_CTX, timeout=10) as resp: + return json.loads(resp.read()) + except urllib.error.HTTPError as e: + return {"code": e.code, "message": e.reason} + except Exception as e: + return {"code": 0, "message": str(e)} + + +def teardown(reason: str): + print(f"[peer-watcher] TEARING DOWN: {reason}", flush=True) + path_prefix = f"/apis/ray.io/v1/namespaces/{NAMESPACE}/rayclusters" + for name in (SELF_CLUSTER_NAME, PEER_CLUSTER_NAME): + print(f"[peer-watcher] Deleting RayCluster: {name}", flush=True) + kube_request(f"{path_prefix}/{name}", method="DELETE") + if JOB_ID: + print( + f"[peer-watcher] Deleting ConfigMap: nemo-rl-endpoints-{JOB_ID}", flush=True + ) + kube_request( + f"/api/v1/namespaces/{NAMESPACE}/configmaps/nemo-rl-endpoints-{JOB_ID}", + method="DELETE", + ) + sys.exit(1) + + +def main(): + print( + f"[peer-watcher] Watching peer={PEER_CLUSTER_NAME}, self={SELF_CLUSTER_NAME}", + flush=True, + ) + print( + f"[peer-watcher] namespace={NAMESPACE}, poll={POLL_INTERVAL}s, max_failures={MAX_PEER_FAILURES}", + flush=True, + ) + + consecutive_failures = 0 + + while True: + time.sleep(POLL_INTERVAL) + + # Check peer RayCluster. + resp = kube_request( + f"/apis/ray.io/v1/namespaces/{NAMESPACE}/rayclusters/{PEER_CLUSTER_NAME}" + ) + code = resp.get("code", 0) + if code == 404: + teardown(f"Peer {PEER_CLUSTER_NAME} not found (deleted)") + + status = resp.get("status", {}).get("state", "") + if status in ("failed", "suspended"): + consecutive_failures += 1 + print( + f"[peer-watcher] Peer {PEER_CLUSTER_NAME} is {status} ({consecutive_failures}/{MAX_PEER_FAILURES})", + flush=True, + ) + if consecutive_failures >= MAX_PEER_FAILURES: + teardown( + f"Peer {PEER_CLUSTER_NAME} failed {MAX_PEER_FAILURES} consecutive checks" + ) + continue + + # Check ConfigMap error signal. + if JOB_ID: + cm = kube_request( + f"/api/v1/namespaces/{NAMESPACE}/configmaps/nemo-rl-endpoints-{JOB_ID}" + ) + error = cm.get("data", {}).get("error", "") + if error: + teardown(f"Error signaled via ConfigMap: {error}") + + # Unknown state (e.g., K8s API error, transient network issue) — treat as failure. + if code != 0 and status == "": + consecutive_failures += 1 + print( + f"[peer-watcher] Could not determine peer status (code={code}), treating as failure ({consecutive_failures}/{MAX_PEER_FAILURES})", + flush=True, + ) + if consecutive_failures >= MAX_PEER_FAILURES: + teardown( + f"Peer {PEER_CLUSTER_NAME} unreachable for {MAX_PEER_FAILURES} consecutive checks" + ) + continue + + # Peer healthy. + if consecutive_failures > 0: + print( + f"[peer-watcher] Peer {PEER_CLUSTER_NAME} recovered (status={status})", + flush=True, + ) + consecutive_failures = 0 + + +if __name__ == "__main__": + main() diff --git a/infra/examples/raycluster-blocker.yaml b/infra/examples/raycluster-blocker.yaml new file mode 100644 index 0000000000..c83a4013b6 --- /dev/null +++ b/infra/examples/raycluster-blocker.yaml @@ -0,0 +1,17 @@ +# A simple pod that occupies 1 GPU, used to test KAI gang scheduling. +# Uses the default scheduler (not KAI) so it won't be preempted. +# Deploy this first, then try to deploy kai_scheduled_rayclusters.yaml. +# With only 1 GPU free, KAI should hold BOTH rayclusters pending (all-or-nothing). +apiVersion: v1 +kind: Pod +metadata: + name: gpu-blocker +spec: + restartPolicy: Never + containers: + - name: blocker + image: nvidia/cuda:12.8.1-base-ubuntu24.04 + command: ["sleep", "infinity"] + resources: + limits: + nvidia.com/gpu: "1" diff --git a/infra/examples/sft_rayjob.yaml b/infra/examples/sft_rayjob.yaml new file mode 100644 index 0000000000..030f422fca --- /dev/null +++ b/infra/examples/sft_rayjob.yaml @@ -0,0 +1,83 @@ +# 2-GPU RayJob for running nemo-rl SFT training. +# Creates a cluster, runs SFT, then tears down automatically. +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: sft-job + labels: + kai.scheduler/queue: priority-team +spec: + entrypoint: "cd /opt/nemo-rl && bash tests/functional/sft.sh" + # HTTPMode is required with KAI scheduler — K8sJobMode creates a separate + # submitter pod after the cluster is ready, which breaks gang scheduling. + submissionMode: HTTPMode + shutdownAfterJobFinishes: true + ttlSecondsAfterFinished: 60 + rayClusterSpec: + rayVersion: "2.52.0" + headGroupSpec: + rayStartParams: + object-store-memory: "200000000" + template: + spec: + schedulerName: kai-scheduler + imagePullSecrets: + - name: nvcr-secret + containers: + - name: ray-head + image: nvcr.io/nvidian/nemo-rl:e5a729c-47084432 + resources: + limits: + cpu: "4" + memory: "16Gi" + requests: + cpu: "1" + memory: "4Gi" + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 + name: dashboard + - containerPort: 10001 + name: client + volumeMounts: + - name: nemo-rl-src + mountPath: /workspace/nemo-rl + volumes: + - name: nemo-rl-src + hostPath: + path: /workspace/nemo-rl + type: DirectoryOrCreate + workerGroupSpecs: + - groupName: gpu-workers + replicas: 1 + minReplicas: 1 + maxReplicas: 1 + rayStartParams: + num-gpus: "2" + object-store-memory: "200000000" + template: + spec: + schedulerName: kai-scheduler + imagePullSecrets: + - name: nvcr-secret + containers: + - name: ray-worker + image: nvcr.io/nvidian/nemo-rl:e5a729c-47084432 + resources: + limits: + cpu: "8" + memory: "32Gi" + nvidia.com/gpu: "2" + requests: + cpu: "2" + memory: "8Gi" + nvidia.com/gpu: "2" + volumeMounts: + - name: nemo-rl-src + mountPath: /workspace/nemo-rl + volumes: + - name: nemo-rl-src + hostPath: + path: /workspace/nemo-rl + type: DirectoryOrCreate diff --git a/infra/helm/helmfile.yaml b/infra/helm/helmfile.yaml new file mode 100644 index 0000000000..0ac8858b6e --- /dev/null +++ b/infra/helm/helmfile.yaml @@ -0,0 +1,98 @@ +environments: + kind: + values: + - gpu_backend: device-plugin # nvkind handles toolkit/runtime, only need device plugin + prod: + values: + - gpu_backend: gpu-operator # full operator manages driver, toolkit, device plugin, NFD + +--- + +repositories: +- name: nvdp + url: https://nvidia.github.io/k8s-device-plugin +- name: nvidia + url: https://helm.ngc.nvidia.com/nvidia +- name: kuberay + url: https://ray-project.github.io/kuberay-helm/ +- name: prometheus-community + url: https://prometheus-community.github.io/helm-charts +- name: kyverno + url: https://kyverno.github.io/kyverno/ + +releases: +# +# GPU: device plugin only (kind) — nvkind handles the rest +# +{{ if eq .Environment.Values.gpu_backend "device-plugin" }} +- name: nvidia-device-plugin + namespace: nvidia + createNamespace: true + chart: nvdp/nvidia-device-plugin + wait: true + values: + - values/nvidia-device-plugin.yaml +{{ end }} + +# +# GPU: full operator (prod) — manages driver, toolkit, device plugin, NFD, DCGM +# +{{ if eq .Environment.Values.gpu_backend "gpu-operator" }} +- name: gpu-operator + namespace: gpu-operator + createNamespace: true + chart: nvidia/gpu-operator + wait: true + waitForJobs: true + values: + - values/gpu-operator.yaml +{{ end }} + +# +# KAI Scheduler: gang scheduling + fairshare for GPU workloads +# +- name: kai-scheduler + namespace: kai-scheduler + createNamespace: true + chart: https://github.com/kai-scheduler/KAI-Scheduler/releases/download/v0.14.0/kai-scheduler-v0.14.0.tgz + wait: true + values: + - values/kai-scheduler.yaml + +# +# KubeRay Operator: manages RayCluster/RayJob CRDs, integrated with KAI +# +- name: kuberay-operator + namespace: kuberay-system + createNamespace: true + chart: kuberay/kuberay-operator + version: 1.6.0 + wait: true + values: + - values/kuberay-operator.yaml + +# +# Kyverno: policy engine for enforcing queue labels on Ray workloads. +# Policies are applied separately: kubectl apply -f kyverno-kai-policies.yaml +# +- name: kyverno + namespace: kyverno + createNamespace: true + chart: kyverno/kyverno + version: 3.7.1 + wait: true + values: + - values/kyverno.yaml + +# +# Prometheus + Grafana: fairshare monitoring and dashboards. +# Access Grafana: kubectl port-forward svc/kube-prometheus-stack-grafana -n monitoring 3000:80 +# +- name: kube-prometheus-stack + namespace: monitoring + createNamespace: true + chart: prometheus-community/kube-prometheus-stack + wait: true + waitForJobs: true + values: + - values/kube-prometheus-stack.yaml diff --git a/infra/helm/values/gpu-operator.yaml b/infra/helm/values/gpu-operator.yaml new file mode 100644 index 0000000000..962fe5145f --- /dev/null +++ b/infra/helm/values/gpu-operator.yaml @@ -0,0 +1,4 @@ +# GPU Operator values for production clusters. +# Set driver.enabled=false if the host already has the NVIDIA driver installed. +driver: + enabled: true diff --git a/infra/helm/values/kai-scheduler.yaml b/infra/helm/values/kai-scheduler.yaml new file mode 100644 index 0000000000..3e50318b6c --- /dev/null +++ b/infra/helm/values/kai-scheduler.yaml @@ -0,0 +1,10 @@ +# KAI Scheduler configuration. +# Creates a default queue and enables Prometheus metrics for fairshare monitoring. +defaultQueue: + createDefaultQueue: true + +# Enable Prometheus metrics endpoint on KAI components. +# Metrics are scraped by the kube-prometheus-stack ServiceMonitors. +global: + prometheus: + enabled: true diff --git a/infra/helm/values/kube-prometheus-stack.yaml b/infra/helm/values/kube-prometheus-stack.yaml new file mode 100644 index 0000000000..6e05689900 --- /dev/null +++ b/infra/helm/values/kube-prometheus-stack.yaml @@ -0,0 +1,46 @@ +# Prometheus + Grafana for KAI scheduler fairshare monitoring. +# +# Access Grafana: kubectl port-forward svc/kube-prometheus-stack-grafana -n monitoring 3000:80 +# Default credentials: admin / prom-operator + +prometheus: + prometheusSpec: + # Scrape KAI metrics from the kai-scheduler namespace. + serviceMonitorSelectorNilUsesHelmValues: false + serviceMonitorNamespaceSelector: {} + serviceMonitorSelector: {} + # Reduce resource usage for kind dev cluster. + resources: + requests: + cpu: 200m + memory: 512Mi + limits: + memory: 2Gi + retention: 7d + storageSpec: + volumeClaimTemplate: + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 5Gi + +grafana: + enabled: true + adminPassword: prom-operator + persistence: + enabled: false + sidecar: + dashboards: + enabled: true # auto-discovers ConfigMaps with grafana_dashboard: "1" + searchNamespace: ALL + datasources: + enabled: true + +# Disable components we don't need on a kind dev cluster. +alertmanager: + enabled: false +nodeExporter: + enabled: false +kubeStateMetrics: + enabled: false diff --git a/infra/helm/values/kuberay-operator.yaml b/infra/helm/values/kuberay-operator.yaml new file mode 100644 index 0000000000..1164f666b1 --- /dev/null +++ b/infra/helm/values/kuberay-operator.yaml @@ -0,0 +1,2 @@ +batchScheduler: + name: kai-scheduler diff --git a/infra/helm/values/kyverno.yaml b/infra/helm/values/kyverno.yaml new file mode 100644 index 0000000000..3c9f7d5099 --- /dev/null +++ b/infra/helm/values/kyverno.yaml @@ -0,0 +1,10 @@ +# Kyverno policy engine — enforces queue labels on Ray workloads. +# Policies are applied separately via: kubectl apply -f kyverno-kai-policies.yaml +admissionController: + replicas: 1 +backgroundController: + replicas: 1 +cleanupController: + replicas: 1 +reportsController: + replicas: 1 diff --git a/infra/helm/values/nvidia-device-plugin.yaml b/infra/helm/values/nvidia-device-plugin.yaml new file mode 100644 index 0000000000..e2c13125d4 --- /dev/null +++ b/infra/helm/values/nvidia-device-plugin.yaml @@ -0,0 +1,18 @@ +# NOTE: These overrides are kind-specific. On a real cluster with the GPU Operator, +# you would NOT use this file at all — the GPU Operator deploys its own device plugin. + +# Override default affinity which requires NFD labels (not available in kind). +# On a real cluster, NFD is deployed by the GPU Operator, so the default affinity works. +affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/os + operator: In + values: + - linux + +# The device plugin pod needs the nvidia runtime to access NVML for GPU discovery. +# On a real cluster, the GPU Operator handles this automatically. +runtimeClassName: nvidia diff --git a/infra/kind/SETUP.md b/infra/kind/SETUP.md new file mode 100644 index 0000000000..421f2d7b11 --- /dev/null +++ b/infra/kind/SETUP.md @@ -0,0 +1,207 @@ +# Local K8s GPU Development Environment + +nvkind-based Kubernetes cluster with NVIDIA GPU support, KAI scheduler (gang scheduling), and KubeRay. + +## Prerequisites + +- Docker with systemd cgroup driver and **cgroup v2** +- NVIDIA driver installed on host (`nvidia-smi` works) +- `nvidia-container-toolkit` installed on host +- `go` (for nvkind installation) +- `helmfile` (install: `curl -sSL https://github.com/helmfile/helmfile/releases/latest/download/helmfile_$(uname -s | tr '[:upper:]' '[:lower:]')_amd64.tar.gz | tar xz -C ~/bin helmfile`) + +### One-time host setup (requires sudo) + +```sh +# Set nvidia as Docker's default runtime and enable CDI +sudo nvidia-ctk runtime configure --runtime=docker --set-as-default --cdi.enabled +sudo nvidia-ctk config --set accept-nvidia-visible-devices-as-volume-mounts=true --in-place +sudo systemctl restart docker + +# Verify +docker info | grep "Default Runtime" # should show "nvidia" +stat -fc %T /sys/fs/cgroup/ # should show "cgroup2fs" +``` + +## Quick Start (local kind cluster) + +```sh +# 1. Install tools +cd infra/kind +bash install-nvkind.sh +bash get-kubectl.sh +bash get-helm.sh + +# 2. Create cluster (all host GPUs exposed to a single worker node) +bash create-cluster.sh + +# 3. Deploy infrastructure +cd ../helm +helmfile -e kind sync + +# 4. Create KAI scheduler queues +kubectl apply -f ../examples/kai-queue.yaml + +# 5. Test: gang-schedule two GPU pods +kubectl apply -f ../examples/kai_scheduled_pods.yaml +kubectl get pods -w # both go Running at the same time +kubectl logs gpu-test-0 # nvidia-smi output +kubectl delete -f ../examples/kai_scheduled_pods.yaml + +# 6. Test: gang-schedule two RayClusters (each with 1 GPU worker) +kubectl apply -f ../examples/kai_scheduled_rayclusters.yaml +kubectl get rayclusters -w # both become "ready" +kubectl delete -f ../examples/kai_scheduled_rayclusters.yaml +``` + +## Deploy on a real cluster + +```sh +cd infra/helm +helmfile -e prod sync +``` + +This installs the full **GPU Operator** (instead of just the device plugin) along with KAI scheduler and KubeRay. The GPU Operator manages the NVIDIA driver, container toolkit, device plugin, NFD, and DCGM exporter. + +Set `driver.enabled=false` in `values/gpu-operator.yaml` if the cluster nodes already have the NVIDIA driver installed. + +## Helmfile environments + +| Environment | GPU component | Use case | +|-------------|---------------|----------| +| `kind` | nvidia-device-plugin | Local dev — nvkind handles toolkit/runtime | +| `prod` | gpu-operator (full) | Real clusters — operator manages everything | + +Both environments include KAI scheduler and KubeRay operator. + +## Tear down (kind only) + +```sh +kind delete cluster --name nemo-rl +``` + +## Architecture + +``` +infra/ +├── kind/ # Cluster setup (local dev only) +│ ├── create-cluster.sh # Creates nvkind cluster +│ ├── install-nvkind.sh # Installs kind + nvkind +│ ├── get-kubectl.sh / get-helm.sh # Tool installers +│ ├── nvkind-config-values.yaml # Default: workers with all GPUs +│ ├── nvkind-config-values-dev.yaml # Dev: + local code mount +│ └── nvkind-config-template.yaml # Custom template with extraMounts +├── helm/ # Infrastructure (helmfile) +│ ├── helmfile.yaml # environments: kind, prod +│ └── values/ +│ ├── nvidia-device-plugin.yaml # kind only +│ ├── gpu-operator.yaml # prod only +│ ├── kai-scheduler.yaml +│ ├── kuberay-operator.yaml +│ ├── kyverno.yaml # Kyverno policy engine +│ └── kube-prometheus-stack.yaml # Prometheus + Grafana +└── examples/ + ├── kai-queue.yaml # KAI queue hierarchy + ├── kai_scheduled_pods.yaml # Gang-scheduled GPU test pods + ├── kai_scheduled_rayclusters.yaml # Gang-scheduled RayClusters + ├── kai_scheduled_sft.yaml # Two gang-scheduled SFT RayJobs + ├── sft_rayjob.yaml # 2-GPU SFT RayJob + ├── raycluster-blocker.yaml # GPU blocker for testing KAI + ├── gym_standalone_config.yaml # Gym standalone server config + ├── disagg_rl_raycluster.yaml # Disagg RL cluster + peer-watcher + ├── disagg_gym_raycluster.yaml # Disagg Gym cluster + peer-watcher + ├── endpoint-registry-rbac.yaml # RBAC for ConfigMap endpoint registry + ├── peer-watcher.py # Sidecar for failure cascading + ├── kai-queue-prod.yaml # 256-GPU prod queue config + ├── kyverno-kai-policies.yaml # Queue enforcement policies + ├── kai-service-monitors.yaml # Prometheus ServiceMonitors for KAI + └── kai-grafana-dashboard.yaml # Grafana dashboard for fairshare +``` + +## Notes + +- **nvkind vs vanilla kind**: nvkind automates GPU device injection, nvidia-container-toolkit installation inside nodes, containerd nvidia runtime configuration, and RuntimeClass registration. +- **nvidia-device-plugin** (kind only): The full GPU Operator fails in kind because its driver validation doesn't work inside kind nodes. The lightweight device plugin with CDI discovery is sufficient since nvkind handles the runtime setup. On a real cluster, use the full GPU Operator (`helmfile -e prod sync`). +- **Device plugin kind overrides**: Affinity is overridden because NFD isn't installed in kind. `runtimeClassName: nvidia` is set so the plugin pod gets NVIDIA libraries injected for NVML discovery. Neither override is needed on a real cluster. +- **KAI scheduler** creates PodGroups automatically for recognized workload types (RayCluster, Job, PyTorchJob, etc.). For bare pods, create a PodGroup manually and annotate pods with `pod-group-name`. +- **RayJob** (not RayCluster) is preferred for batch workloads — it auto-tears down the cluster after the job finishes, avoiding stale Ray state. Requires `submissionMode: HTTPMode` for KAI compatibility. + +## Failure cascading for disaggregated Gym + +The disaggregated RL/Gym manifests include a **peer-watcher sidecar** on each head pod that monitors the peer cluster via the K8s API. If the peer is deleted, fails, or signals an error via the ConfigMap, the watcher tears down both clusters to release resources. + +- `peer-watcher.py` — Python sidecar script (deployed as a ConfigMap) +- Monitors: peer RayCluster status + ConfigMap `error` key +- `MAX_PEER_FAILURES` (default 3) consecutive failures before teardown +- Applications can signal errors via `K8sEndpointRegistry.signal_error("message")` + +Setup: +```sh +kubectl create configmap peer-watcher-script --from-file=peer-watcher.py=infra/examples/peer-watcher.py +kubectl apply -f infra/examples/endpoint-registry-rbac.yaml +``` + +## Fairshare scheduling + +KAI distributes GPU resources using hierarchical fair-share with two phases: +1. **Guaranteed quota**: Each queue gets its `quota` first, unconditionally. +2. **Over-quota surplus**: Remaining GPUs distributed by `priority` (higher served first), then `overQuotaWeight` within the same priority level. + +### Queue fields + +| Field | Description | +|-------|-------------| +| `quota` | Guaranteed GPUs. `-1` = unlimited, `0` = no guarantee | +| `limit` | Hard cap on total GPUs. `-1` = no limit | +| `overQuotaWeight` | Weight for surplus distribution (higher = bigger share) | +| `priority` | Over-quota allocation order (higher = served first, reclaimed last) | +| `preemptMinRuntime` | Min runtime before a higher-priority queue can preempt (default: `"4h"`) | +| `reclaimMinRuntime` | Min runtime before over-quota resources can be reclaimed (default: `"15m"`) | + +### Preempt vs reclaim + +- **Preempt**: A higher-priority queue takes from a lower-priority queue. (VIP takes your table.) +- **Reclaim**: A queue takes back what it's entitled to from an over-allocated queue. (Fairness — give back what you owe.) + +`reclaimMinRuntime` is shorter than `preemptMinRuntime` because reclaim is about fairness (returning over-quota resources quickly), while preempt protects long-running jobs from priority-based interruption. + +### Example configs + +- `kai-queue.yaml` — 2-GPU kind cluster (team-a, team-b, equal quotas) +- `kai-queue-prod.yaml` — 256-GPU production cluster (3 departments, 6 teams) + +### Monitoring (Grafana) + +```sh +kubectl port-forward svc/kube-prometheus-stack-grafana -n monitoring 3000:80 +# Open http://localhost:3000 +# Login: admin / prom-operator +# Dashboard: search "KAI Scheduler Fairshare" or go to http://localhost:3000/d/kai-fairshare +``` + +Key metrics: `kai_queue_allocated_gpus`, `kai_queue_deserved_gpus`, `kai_e2e_scheduling_latency_milliseconds`. + +### Kyverno queue enforcement + +RayCluster and RayJob resources must have a `kai.scheduler/queue` label or they're rejected by Kyverno. To enable user→queue access control, uncomment Policy 2 in `kyverno-kai-policies.yaml` and configure the `kai-queue-permissions` ConfigMap. + +## TODO: NVL72 topology-aware scheduling + +KAI v0.14.0 added Ray topology-aware subgroup scheduling ([PR #1125](https://github.com/kai-scheduler/KAI-Scheduler/pull/1125)). Need to test on an actual NVL72 cluster: + +- **Confirm `--segment=N` equivalent works**: KAI's `subGroups` with per-subgroup `topologyConstraint.requiredTopologyLevel: "rack"` should be the equivalent of Slurm's `--segment=N`. Each subgroup of N nodes is constrained to one rack. Unclear if this works correctly for cross-rack scheduling (e.g., `--segment=16` with 32 total nodes = 2 racks). +- **Auto-segmentation not yet implemented**: The design doc at [`docs/developer/designs/segmented-subgroups/`](https://github.com/kai-scheduler/KAI-Scheduler/blob/main/docs/developer/designs/segmented-subgroups/README.md) proposes `kai.scheduler/segment-size` annotation for automatic subgroup creation, but it depends on "Replica-Type SubGrouping" which isn't shipped yet. See [Issue #1189](https://github.com/kai-scheduler/KAI-Scheduler/issues/1189) and [PR #1127](https://github.com/kai-scheduler/KAI-Scheduler/pull/1127) (minSubGroup field, still open). +- **Test with our k8s CLI**: The `nrl-k8s submit` command (see `extensions/k8s_cli/`) should support `--segment-size` that auto-generates the PodGroup subgroups until KAI ships native support. + +## TODO: Log persistence + +Currently, logs are lost when RayJob pods are cleaned up (`ttlSecondsAfterFinished`). Two levels of log persistence are needed: + +1. **Container stdout/stderr** (`kubectl logs`): Captured by containerd at `/var/log/pods/` on the node, but not queryable after pod deletion. +2. **Ray file logs** (`/tmp/ray/session_*/logs/`): Worker/driver logs, system logs — not sent to stdout at all. + +**Planned approach**: Deploy a Loki stack (Loki + Promtail + Grafana) via helmfile for both `kind` and `prod` environments: +- Promtail DaemonSet captures container stdout/stderr from each node, auto-labels with K8s metadata (namespace, pod, job name) +- Fluent Bit sidecar in each RayJob pod tails `/tmp/ray` logs and ships to Loki +- Grafana UI for querying logs by job name, time range, and content (LogQL) +- kind: Loki stores on local PVC; prod: Loki stores on S3/GCS diff --git a/infra/kind/create-cluster.sh b/infra/kind/create-cluster.sh new file mode 100644 index 0000000000..8216473116 --- /dev/null +++ b/infra/kind/create-cluster.sh @@ -0,0 +1,72 @@ +#!/bin/bash +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Creates an nvkind cluster with GPU support. +# Prerequisites: +# - kind and nvkind installed (see install-nvkind.sh) +# - NVIDIA driver + nvidia-container-toolkit on the host +# - Docker running with systemd cgroup driver +# +# One-time host setup (run manually before first use): +# sudo nvidia-ctk runtime configure --runtime=docker --set-as-default --cdi.enabled +# sudo nvidia-ctk config --set accept-nvidia-visible-devices-as-volume-mounts=true --in-place +# sudo systemctl restart docker + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +set -eoux pipefail + +KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME:-nemo-rl} +CONFIG_VALUES=${CONFIG_VALUES:-$SCRIPT_DIR/nvkind-config-values.yaml} + +echo "======================" +echo "Existing kind clusters" +echo "======================" +kind get clusters || true + +# Build nvkind args. Use custom template if it exists alongside the values file. +NVKIND_ARGS=(--name "${KIND_CLUSTER_NAME}" --config-values "$CONFIG_VALUES") +CONFIG_TEMPLATE="$SCRIPT_DIR/nvkind-config-template.yaml" +if [[ -f "$CONFIG_TEMPLATE" && "$CONFIG_VALUES" != "$SCRIPT_DIR/nvkind-config-values.yaml" ]]; then + echo "Using custom config template: $CONFIG_TEMPLATE" + NVKIND_ARGS+=(--config-template "$CONFIG_TEMPLATE") +fi + +# nvkind may fail at the /proc/driver/nvidia patching step if the host +# doesn't have a mounted /proc/driver/nvidia (non-MIG setups). This is +# non-fatal — the cluster and GPU access still work. We catch the error +# and verify the cluster came up. +nvkind cluster create "${NVKIND_ARGS[@]}" || true + +# nvkind installs the nvidia-container-toolkit and configures containerd +# inside worker nodes, but containerd needs a restart to pick up the config. +echo "Restarting containerd on worker nodes..." +for worker in $(docker ps --format '{{.Names}}' | grep -E "${KIND_CLUSTER_NAME}-worker"); do + docker exec "$worker" systemctl restart containerd +done + +echo "Waiting for nodes to become Ready..." +kubectl wait --for=condition=ready nodes --all --timeout=120s + +echo "======================" +echo "Verifying cluster..." +echo "======================" +docker ps +kubectl get nodes -o wide +kubectl get pods -A + +echo "" +echo "Cluster '${KIND_CLUSTER_NAME}' is ready." +echo "Next: cd ../helm && helmfile sync" diff --git a/infra/kind/get-helm.sh b/infra/kind/get-helm.sh new file mode 100644 index 0000000000..a280a395bc --- /dev/null +++ b/infra/kind/get-helm.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eou pipefail +HELM_VERSION=${HELM_VERSION:-v3.17.3} + +mkdir -p ~/bin/ +NAMED_HELM=~/bin/helm-$HELM_VERSION + +if [[ ! -f $NAMED_HELM ]]; then + ARCH=$(uname -m) + case $ARCH in + x86_64) ARCH=amd64 ;; + aarch64) ARCH=arm64 ;; + *) echo "Unsupported architecture: $ARCH" >&2; exit 1 ;; + esac + tmp_helm_dir=$(mktemp -d) + curl -sSL "https://get.helm.sh/helm-${HELM_VERSION}-linux-${ARCH}.tar.gz" | tar -xz -C "$tmp_helm_dir" --strip-components=1 + cp "$tmp_helm_dir/helm" "$NAMED_HELM" + rm -rf "$tmp_helm_dir" +fi + +echo "Installed helm at $NAMED_HELM" +echo "To use, you may set 'alias helm=$NAMED_HELM'" diff --git a/infra/kind/get-kubectl.sh b/infra/kind/get-kubectl.sh new file mode 100644 index 0000000000..c02d38e995 --- /dev/null +++ b/infra/kind/get-kubectl.sh @@ -0,0 +1,71 @@ +#!/bin/bash +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eou pipefail + +KUBECTL_VERSION=${KUBECTL_VERSION:-latest} + +echo ============================================== +echo "Currently installed versions of kubectl:" +ls ~/bin/kubectl* 2>/dev/null || true +echo ============================================== + +mkdir -p ~/bin +NAMED_KUBECTL=~/bin/kubectl-$KUBECTL_VERSION + +if [[ ! -f $NAMED_KUBECTL ]]; then + ARCH=$(uname -m) + case $ARCH in + x86_64) ARCH=amd64 ;; + aarch64) ARCH=arm64 ;; + *) echo "Unsupported architecture: $ARCH" >&2; exit 1 ;; + esac + if [[ $KUBECTL_VERSION == latest ]]; then + curl -L "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/$ARCH/kubectl" -o "$NAMED_KUBECTL" + else + curl -L "https://dl.k8s.io/release/${KUBECTL_VERSION}/bin/linux/$ARCH/kubectl" -o "$NAMED_KUBECTL" + fi +fi +chmod +x "$NAMED_KUBECTL" + +echo "Installed kubectl at $NAMED_KUBECTL" +echo "To use, you may set 'alias kubectl=$NAMED_KUBECTL'" + +if [[ ! -f ~/.krew/bin/kubectl-krew ]]; then + ( + set -x; cd "$(mktemp -d)" && + OS="$(uname | tr '[:upper:]' '[:lower:]')" && + ARCH="$(uname -m | sed -e 's/x86_64/amd64/' -e 's/\(arm\)\(64\)\?.*/\1\2/' -e 's/aarch64$/arm64/')" && + KREW="krew-${OS}_${ARCH}" && + curl -fsSLO "https://github.com/kubernetes-sigs/krew/releases/latest/download/${KREW}.tar.gz" && + tar zxvf "${KREW}.tar.gz" && + ./"${KREW}" install krew + ) +else + echo "krew already installed" +fi + +$NAMED_KUBECTL krew install ctx +$NAMED_KUBECTL krew install ns +$NAMED_KUBECTL krew install stern +$NAMED_KUBECTL krew install view-allocations +$NAMED_KUBECTL krew install whoami + +cat <&2; exit 1 ;; + esac + curl -Lo "$NAMED_KIND" "https://kind.sigs.k8s.io/dl/$KIND_VERSION/kind-linux-$ARCH" + chmod +x "$NAMED_KIND" +fi +ln -sf "$NAMED_KIND" ~/bin/kind +echo "Installed kind $KIND_VERSION at $NAMED_KIND" + +# --- Install nvkind --- +if ! command -v nvkind &>/dev/null; then + GOBIN=~/bin go install github.com/NVIDIA/nvkind/cmd/nvkind@latest +fi +echo "Installed nvkind at $(which nvkind || echo ~/bin/nvkind)" diff --git a/infra/kind/nvkind-config-template.yaml b/infra/kind/nvkind-config-template.yaml new file mode 100644 index 0000000000..7e3f2a05b7 --- /dev/null +++ b/infra/kind/nvkind-config-template.yaml @@ -0,0 +1,52 @@ +# Custom nvkind config template with support for extraMounts. +# Based on the default nvkind template, extended to mount host directories +# into kind nodes (e.g., for local code development). +# +# Usage: +# nvkind cluster create \ +# --config-template nvkind-config-template.yaml \ +# --config-values nvkind-config-values-dev.yaml + +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +{{- if hasKey $ "name" }} +name: {{ $.name }} +{{- end }} +nodes: +- role: control-plane + {{- if hasKey $ "image" }} + image: {{ $.image }} + {{- end }} + {{- if hasKey $ "extraMounts" }} + extraMounts: + {{- range $.extraMounts }} + - hostPath: {{ .hostPath }} + containerPath: {{ .containerPath }} + {{- end }} + {{- end }} +{{- range $.workers }} +- role: worker + {{- if hasKey $ "image" }} + image: {{ $.image }} + {{- end }} + + {{- if hasKey . "devices" }} + {{- $devices := .devices }} + {{- if not (kindIs "slice" $devices) }} + {{- $devices = list .devices }} + {{- end }} + extraMounts: + # GPU device injection + {{- range $d := $devices }} + - hostPath: /dev/null + containerPath: /var/run/nvidia-container-devices/{{ $d }} + {{- end }} + # Additional host mounts + {{- if hasKey $ "extraMounts" }} + {{- range $.extraMounts }} + - hostPath: {{ .hostPath }} + containerPath: {{ .containerPath }} + {{- end }} + {{- end }} + {{- end }} +{{- end }} diff --git a/infra/kind/nvkind-config-values-dev.yaml b/infra/kind/nvkind-config-values-dev.yaml new file mode 100644 index 0000000000..368264ea7d --- /dev/null +++ b/infra/kind/nvkind-config-values-dev.yaml @@ -0,0 +1,7 @@ +# Dev config: mount local nemo-rl source into kind nodes. +# Pods can then use hostPath: /workspace/nemo-rl to access the code. +workers: +- devices: all +extraMounts: +- hostPath: /home/terryk/nemo-rl + containerPath: /workspace/nemo-rl diff --git a/infra/kind/nvkind-config-values.yaml b/infra/kind/nvkind-config-values.yaml new file mode 100644 index 0000000000..f94f6475c4 --- /dev/null +++ b/infra/kind/nvkind-config-values.yaml @@ -0,0 +1,2 @@ +workers: +- devices: all diff --git a/nemo_rl/distributed/k8s_endpoint_registry.py b/nemo_rl/distributed/k8s_endpoint_registry.py new file mode 100644 index 0000000000..d2e6497912 --- /dev/null +++ b/nemo_rl/distributed/k8s_endpoint_registry.py @@ -0,0 +1,225 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ConfigMap-backed endpoint registry for disaggregated RL-Gym service discovery. + +Each (RL, Gym) job pair shares a ConfigMap named 'nemo-rl-endpoints-{job_id}'. +Both sides read/write their dynamic addresses (IP:port) to it. The ConfigMap +has an ownerReference to the RL RayCluster so it's garbage collected on teardown. + +Usage (RL side): + registry = K8sEndpointRegistry(job_id="my-job") + registry.create(owner_raycluster_name="raycluster-rl") + registry.set("vllm_base_urls", json.dumps(["http://10.0.0.1:8000/v1"])) + gym_url = registry.get("gym_head_server") # blocks until Gym registers + +Usage (Gym side): + registry = K8sEndpointRegistry(job_id="my-job") + registry.set("gym_head_server", "http://10.0.0.2:8080") + vllm_urls = json.loads(registry.get("vllm_base_urls")) # blocks until RL registers +""" + +from __future__ import annotations + +import time +from pathlib import Path + +from kubernetes import client, config +from kubernetes.client.exceptions import ApiException + +CONFIGMAP_PREFIX = "nemo-rl-endpoints" + + +class K8sEndpointRegistry: + """Shared endpoint registry backed by a K8s ConfigMap.""" + + def __init__(self, job_id: str, namespace: str | None = None): + self.job_id = job_id + self.configmap_name = f"{CONFIGMAP_PREFIX}-{job_id}" + + # Auto-detect namespace from in-pod mount, or fall back to "default". + if namespace is not None: + self.namespace = namespace + else: + ns_path = Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + self.namespace = ( + ns_path.read_text().strip() if ns_path.exists() else "default" + ) + + # Load K8s client config — in-cluster when running in a pod, kubeconfig otherwise. + try: + config.load_incluster_config() + except config.ConfigException: + config.load_kube_config() + + self._v1 = client.CoreV1Api() + self._custom = client.CustomObjectsApi() + + def create(self, owner_raycluster_name: str | None = None) -> None: + """Create the ConfigMap. Idempotent — no-op if it already exists. + + Args: + owner_raycluster_name: If set, the ConfigMap gets an ownerReference + to this RayCluster so K8s garbage-collects it on teardown. + """ + owner_references = None + if owner_raycluster_name: + owner_references = self._build_owner_reference(owner_raycluster_name) + + cm = client.V1ConfigMap( + metadata=client.V1ObjectMeta( + name=self.configmap_name, + namespace=self.namespace, + owner_references=owner_references, + ), + data={}, + ) + + try: + self._v1.create_namespaced_config_map(namespace=self.namespace, body=cm) + print(f"Created endpoint registry ConfigMap: {self.configmap_name}") + except ApiException as e: + if e.status == 409: + # Already exists — patch ownerReferences if we have them + # (handles race where Gym's set() created it before RL's create()). + if owner_references: + self._v1.patch_namespaced_config_map( + name=self.configmap_name, + namespace=self.namespace, + body=client.V1ConfigMap( + metadata=client.V1ObjectMeta( + owner_references=owner_references, + ) + ), + ) + print( + f"Patched ownerReference on existing ConfigMap: {self.configmap_name}" + ) + else: + print( + f"Endpoint registry ConfigMap already exists: {self.configmap_name}" + ) + else: + raise + + def set(self, key: str, value: str) -> None: + """Write a key-value pair to the ConfigMap. Creates the ConfigMap if needed.""" + try: + cm = self._v1.read_namespaced_config_map( + name=self.configmap_name, namespace=self.namespace + ) + if cm.data is None: + cm.data = {} + cm.data[key] = value + self._v1.patch_namespaced_config_map( + name=self.configmap_name, namespace=self.namespace, body=cm + ) + except ApiException as e: + if e.status == 404: + # ConfigMap doesn't exist yet — create it with this key. + try: + cm = client.V1ConfigMap( + metadata=client.V1ObjectMeta( + name=self.configmap_name, namespace=self.namespace + ), + data={key: value}, + ) + self._v1.create_namespaced_config_map( + namespace=self.namespace, body=cm + ) + except ApiException as create_err: + if create_err.status == 409: + # Another process created it between our read and create — retry patch. + self.set(key, value) + return + raise + else: + raise + print(f"Registered endpoint: {key} = {value}") + + def get(self, key: str, timeout: float = 600, poll_interval: float = 2) -> str: + """Poll until a key appears in the ConfigMap, then return its value. + + Args: + key: The key to wait for. + timeout: Max seconds to wait before raising TimeoutError. + poll_interval: Seconds between polls. + """ + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + value = self.get_nowait(key) + if value is not None: + return value + remaining = deadline - time.monotonic() + if remaining <= 0: + break + time.sleep(min(poll_interval, remaining)) + raise TimeoutError( + f"Timed out after {timeout}s waiting for key '{key}' " + f"in ConfigMap '{self.configmap_name}'" + ) + + def get_nowait(self, key: str) -> str | None: + """Non-blocking read. Returns None if key or ConfigMap doesn't exist.""" + try: + cm = self._v1.read_namespaced_config_map( + name=self.configmap_name, namespace=self.namespace + ) + if cm.data is None: + return None + return cm.data.get(key) + except ApiException as e: + if e.status == 404: + return None + raise + + def signal_error(self, message: str) -> None: + """Write an error to the ConfigMap. + + The peer-watcher sidecar monitors this and triggers teardown + when it sees a non-empty 'error' key. + """ + self.set("error", message) + + def _build_owner_reference( + self, raycluster_name: str + ) -> list[client.V1OwnerReference]: + """Look up the RayCluster's UID and build an ownerReference.""" + try: + rc = self._custom.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=self.namespace, + plural="rayclusters", + name=raycluster_name, + ) + uid = rc["metadata"]["uid"] + except ApiException as e: + if e.status == 404: + print( + f"Warning: RayCluster '{raycluster_name}' not found " + f"for ownerReference — ConfigMap will not be auto-cleaned." + ) + return None + raise + + return [ + client.V1OwnerReference( + api_version="ray.io/v1", + kind="RayCluster", + name=raycluster_name, + uid=uid, + controller=True, + block_owner_deletion=False, + ) + ] diff --git a/nemo_rl/environments/nemo_gym.py b/nemo_rl/environments/nemo_gym.py index b32f76919f..aa4200f4c3 100644 --- a/nemo_rl/environments/nemo_gym.py +++ b/nemo_rl/environments/nemo_gym.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from pathlib import Path -from typing import Any, Dict, List, TypedDict +from typing import Any, Dict, List, NotRequired, TypedDict import ray import torch @@ -27,6 +27,9 @@ class NemoGymConfig(TypedDict): model_name: str base_urls: List[str] initial_global_config_dict: Dict[str, Any] + remote_gym_url: NotRequired[ + str + ] # If set, connects to a remote Gym service instead of spawning local subprocesses @ray.remote(max_restarts=-1, max_task_retries=-1) # pragma: no cover @@ -35,27 +38,55 @@ class NemoGym(EnvironmentInterface): def __init__(self, cfg: NemoGymConfig): self.cfg = cfg + self._remote_mode = bool(cfg.get("remote_gym_url")) - self.node_ip = _get_node_ip_local() - self.head_server_port = _get_free_port_local() + from nemo_gym.rollout_collection import RolloutCollectionHelper + from nemo_gym.server_utils import BaseServerConfig + + if self._remote_mode: + # Remote mode: connect to an external Gym HTTP service. + # No local subprocesses are spawned. + remote_url = cfg["remote_gym_url"] + # Parse host:port from URL like "http://gym-service:8080" or "gym-service:8080" + url = remote_url.removeprefix("http://").removeprefix("https://") + if ":" in url: + host, port_str = url.rsplit(":", 1) + port = int(port_str.rstrip("/")) + else: + host, port = url.rstrip("/"), 8080 + print(f"NemoGym remote mode: connecting to {host}:{port}") + + self.rh = None + self.head_server_config = BaseServerConfig(host=host, port=port) + self.rch = RolloutCollectionHelper() + initial_global_config_dict = cfg.get("initial_global_config_dict") or {} + self.rollout_max_attempts_to_avoid_lp_nan = initial_global_config_dict.pop( + "rollout_max_attempts_to_avoid_lp_nan", 1 + ) + else: + # Colocated mode: spawn Gym subprocesses locally (original behavior). + self._init_colocated(cfg) + + def _init_colocated(self, cfg: NemoGymConfig): from nemo_gym.cli import GlobalConfigDictParserConfig, RunHelper from nemo_gym.rollout_collection import RolloutCollectionHelper from nemo_gym.server_utils import HEAD_SERVER_KEY_NAME, BaseServerConfig from omegaconf import DictConfig + self.node_ip = _get_node_ip_local() + self.head_server_port = _get_free_port_local() + RELATIVE_PATH = "nemo_rl/environments/nemo_gym.py" assert __file__.endswith(RELATIVE_PATH) - initial_global_config_dict = ( - self.cfg.get("initial_global_config_dict") or dict() - ) + initial_global_config_dict = cfg.get("initial_global_config_dict") or dict() # Policy information - initial_global_config_dict["policy_model_name"] = self.cfg["model_name"] + initial_global_config_dict["policy_model_name"] = cfg["model_name"] initial_global_config_dict["policy_api_key"] = ( "dummy_key" # No key necessary for training. ) - initial_global_config_dict["policy_base_url"] = self.cfg["base_urls"] + initial_global_config_dict["policy_base_url"] = cfg["base_urls"] initial_global_config_dict.setdefault( "global_aiohttp_connector_limit_per_host", 16_384 @@ -253,7 +284,8 @@ def _postprocess_nemo_gym_to_nemo_rl_result( } def shutdown(self) -> None: - self.rh.shutdown() + if self.rh is not None: + self.rh.shutdown() def step(self, message_log_batch, metadata): # This is not used since NeMo-Gym will handle the rollouts entirely.