diff --git a/distributed/ddp-tutorial-series/README.md b/distributed/ddp-tutorial-series/README.md index d0ce17c00f..3a27f3d8e0 100644 --- a/distributed/ddp-tutorial-series/README.md +++ b/distributed/ddp-tutorial-series/README.md @@ -15,7 +15,27 @@ Each code file extends upon the previous one. The series starts with a non-distr * [slurm/setup_pcluster_slurm.md](slurm/setup_pcluster_slurm.md): instructions to set up an AWS cluster * [slurm/config.yaml.template](slurm/config.yaml.template): configuration to set up an AWS cluster * [slurm/sbatch_run.sh](slurm/sbatch_run.sh): slurm script to launch the training job - - - - +## Installation +``` +pip install -r requirements.txt +``` +## Running Examples +For running the examples to run for 20 Epochs and save checkpoints every 5 Epochs, you can use the following command: +### Single GPU +``` +python single_gpu.py 20 5 +``` +### Multi-GPU +``` +python multigpu.py 20 5 +``` +### Multi-GPU Torchrun +``` +torchrun --nnodes=1 --nproc_per_node=4 multigpu_torchrun.py 20 5 +``` +### Multi-Node +``` +torchrun --nnodes=2 --nproc_per_node=4 multinode.py 20 5 +``` + +For more details, check the [run_examples.sh](distributed/ddp-tutorial-series/run_examples.sh) script. \ No newline at end of file diff --git a/distributed/ddp-tutorial-series/multigpu.py b/distributed/ddp-tutorial-series/multigpu.py index 9f1478ef3e..cfec4b7307 100644 --- a/distributed/ddp-tutorial-series/multigpu.py +++ b/distributed/ddp-tutorial-series/multigpu.py @@ -17,20 +17,15 @@ def ddp_setup(rank, world_size): world_size: Total number of processes """ os.environ["MASTER_ADDR"] = "localhost" - os.environ["MASTER_PORT"] = "12455" + os.environ["MASTER_PORT"] = "12355" + device = torch.device(f"{torch.accelerator.current_accelerator()}:{rank}") + torch.accelerator.set_device_index(rank) + print(f"Running on rank {rank} on device {device}") - rank = int(os.environ["LOCAL_RANK"]) - if torch.accelerator.is_available(): - device_type = torch.accelerator.current_accelerator() - device = torch.device(f"{device_type}:{rank}") - torch.accelerator.device_index(rank) - print(f"Running on rank {rank} on device {device}") - else: - device = torch.device("cpu") - print(f"Running on device {device}") - backend = torch.distributed.get_default_backend_for_device(device) + init_process_group(backend=backend, rank=rank, world_size=world_size) + class Trainer: def __init__( @@ -106,8 +101,8 @@ def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_s if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') - parser.add_argument('total_epochs', type=int, help='Total epochs to train the model') - parser.add_argument('save_every', type=int, help='How often to save a snapshot') + parser.add_argument('total_epochs', default=50, type=int, help='Total epochs to train the model') + parser.add_argument('save_every', default=5, type=int, help='How often to save a snapshot') parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)') args = parser.parse_args() diff --git a/distributed/ddp-tutorial-series/multigpu_torchrun.py b/distributed/ddp-tutorial-series/multigpu_torchrun.py index 6bf7da0e19..f936385282 100644 --- a/distributed/ddp-tutorial-series/multigpu_torchrun.py +++ b/distributed/ddp-tutorial-series/multigpu_torchrun.py @@ -12,19 +12,14 @@ def ddp_setup(): rank = int(os.environ["LOCAL_RANK"]) - if torch.accelerator.is_available(): - device_type = torch.accelerator.current_accelerator() - device = torch.device(f"{device_type}:{rank}") - torch.accelerator.device_index(rank) - print(f"Running on rank {rank} on device {device}") - else: - device = torch.device("cpu") - print(f"Running on device {device}") - - backend = torch.distributed.get_default_backend_for_device(device) - torch.distributed.init_process_group(backend=backend, device_id=device) - return device + device = torch.device(f"{torch.accelerator.current_accelerator()}:{rank}") + torch.accelerator.set_device_index(rank) + print(f"Running on rank {rank} on device {device}") + + backend = torch.distributed.get_default_backend_for_device(rank) + torch.distributed.init_process_group(backend=backend, rank=rank, device_id=rank) + class Trainer: def __init__( @@ -51,7 +46,9 @@ def __init__( self.model = DDP(self.model, device_ids=[self.gpu_id]) def _load_snapshot(self, snapshot_path): - loc = str(self.device) + + loc = str(torch.accelerator.current_accelerator()) + snapshot = torch.load(snapshot_path, map_location=loc) self.model.load_state_dict(snapshot["MODEL_STATE"]) self.epochs_run = snapshot["EPOCHS_RUN"] @@ -117,8 +114,8 @@ def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') - parser.add_argument('total_epochs', type=int, help='Total epochs to train the model') - parser.add_argument('save_every', type=int, help='How often to save a snapshot') + parser.add_argument('total_epochs', default=50, type=int, help='Total epochs to train the model') + parser.add_argument('save_every', default=5, type=int, help='How often to save a snapshot') parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)') args = parser.parse_args() diff --git a/distributed/ddp-tutorial-series/multinode.py b/distributed/ddp-tutorial-series/multinode.py index 838056a42c..882af2b1e6 100644 --- a/distributed/ddp-tutorial-series/multinode.py +++ b/distributed/ddp-tutorial-series/multinode.py @@ -12,19 +12,14 @@ def ddp_setup(): rank = int(os.environ["LOCAL_RANK"]) - if torch.accelerator.is_available(): - device_type = torch.accelerator.current_accelerator() - device: torch.device = torch.device(f"{device_type}:{rank}") - torch.accelerator.device_index(rank) - print(f"Running on rank {rank} on device {device}") - backend = torch.distributed.get_default_backend_for_device(device) - torch.distributed.init_process_group(backend=backend) - return device_type - else: - device = torch.device("cpu") - print(f"Running on device {device}") - torch.distributed.init_process_group(backend="gloo") - return device + + device = torch.device(f"{torch.accelerator.current_accelerator()}:{rank}") + torch.accelerator.set_device_index(rank) + print(f"Running on rank {rank} on device {device}") + + backend = torch.distributed.get_default_backend_for_device(rank) + torch.distributed.init_process_group(backend=backend, rank=rank, device_id=rank) + class Trainer: def __init__( @@ -52,7 +47,8 @@ def __init__( self.model = DDP(self.model, device_ids=[self.local_rank]) def _load_snapshot(self, snapshot_path): - loc = str(self.device) + loc = str(torch.accelerator.current_accelerator()) + snapshot = torch.load(snapshot_path, map_location=loc) self.model.load_state_dict(snapshot["MODEL_STATE"]) self.epochs_run = snapshot["EPOCHS_RUN"] @@ -118,8 +114,8 @@ def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') - parser.add_argument('total_epochs', type=int, help='Total epochs to train the model') - parser.add_argument('save_every', type=int, help='How often to save a snapshot') + parser.add_argument('total_epochs', default=50, type=int, help='Total epochs to train the model') + parser.add_argument('save_every', default=5, type=int, help='How often to save a snapshot') parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)') args = parser.parse_args() diff --git a/distributed/ddp-tutorial-series/run_example.sh b/distributed/ddp-tutorial-series/run_example.sh index d439b681b4..ea840158f5 100644 --- a/distributed/ddp-tutorial-series/run_example.sh +++ b/distributed/ddp-tutorial-series/run_example.sh @@ -4,7 +4,10 @@ # num_gpus = num local gpus to use (must be at least 2). Default = 2 # samples to run include: -# example.py + +# multigpu_torchrun.py +# multinode.py echo "Launching ${1:-example.py} with ${2:-2} gpus" -torchrun --nnodes=1 --nproc_per_node=${2:-2} --rdzv_id=101 --rdzv_endpoint="localhost:5972" ${1:-example.py} +torchrun --nnodes=1 --nproc_per_node=${2:-2} --rdzv_id=101 --rdzv_endpoint="localhost:5972" ${1:-example.py} 10 1 + diff --git a/distributed/ddp-tutorial-series/single_gpu.py b/distributed/ddp-tutorial-series/single_gpu.py index e91ab81cc1..c739d18cb7 100644 --- a/distributed/ddp-tutorial-series/single_gpu.py +++ b/distributed/ddp-tutorial-series/single_gpu.py @@ -73,10 +73,10 @@ def main(device, total_epochs, save_every, batch_size): if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') - parser.add_argument('total_epochs', type=int, help='Total epochs to train the model') - parser.add_argument('save_every', type=int, help='How often to save a snapshot') + parser.add_argument('total_epochs', default=50, type=int, help='Total epochs to train the model') + parser.add_argument('save_every', default=5, type=int, help='How often to save a snapshot') parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)') args = parser.parse_args() - device = 0 # shorthand for cuda:0 + device = 0 main(device, args.total_epochs, args.save_every, args.batch_size) diff --git a/run_distributed_examples.sh b/run_distributed_examples.sh index 4cc9079eb5..e1a81a7076 100755 --- a/run_distributed_examples.sh +++ b/run_distributed_examples.sh @@ -51,13 +51,16 @@ function distributed_tensor_parallelism() { } function distributed_ddp-tutorial-series() { - uv run bash run_example.sh multigpu.py || error "ddp tutorial series multigpu example failed" + uv python multigpu.py 10 1 || error "ddp tutorial series multigpu example failed" uv run bash run_example.sh multigpu_torchrun.py || error "ddp tutorial series multigpu torchrun example failed" uv run bash run_example.sh multinode.py || error "ddp tutorial series multinode example failed" + uv python single_gpu.py 10 1 || error "ddp tutorial series single gpu example failed" + } function distributed_FSDP2() { uv run bash run_example.sh example.py || error "FSDP2 example failed" + } function distributed_ddp() {