From 00bdcd7889c297e9664d1eee09468fb6b5f9d918 Mon Sep 17 00:00:00 2001 From: scdesperate Date: Thu, 12 Mar 2026 12:10:27 +0800 Subject: [PATCH] [fix](kt_ep_wrapper): change the device_group to cpu_group in `barrier` call --- python/sglang/srt/layers/moe/kt_ep_wrapper.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/python/sglang/srt/layers/moe/kt_ep_wrapper.py b/python/sglang/srt/layers/moe/kt_ep_wrapper.py index fa69bf42c621..13dfd33ce82f 100644 --- a/python/sglang/srt/layers/moe/kt_ep_wrapper.py +++ b/python/sglang/srt/layers/moe/kt_ep_wrapper.py @@ -464,11 +464,7 @@ def _create_cpu_buffers(self): else: self.shm_unique_id = None if dist.is_initialized(): - unique_id_list = [self.shm_unique_id] - dist.broadcast_object_list( - unique_id_list, src=0, group=get_tp_group().cpu_group - ) - self.shm_unique_id = unique_id_list[0] + self.shm_unique_id = get_tp_group().broadcast_object(self.shm_unique_id) for name in self.weight_names: gpu_tensor = getattr(self.gpu_layer, name) @@ -499,14 +495,14 @@ def _create_cpu_buffers(self): self.cpu_buffers[name] = cpu_buffer if dist.is_initialized(): - dist.barrier(group=get_tp_group().device_group) + get_tp_group().barrier() self.all_rank_buffer_ptrs = self._collect_all_rank_buffer_pointers() # Unlink shared memory after all ranks have collected pointers. # The memory remains accessible as long as we hold references via mmap. if dist.is_initialized(): - dist.barrier(group=get_tp_group().device_group) + get_tp_group().barrier() for shm in self.shm_handles.values(): shm.unlink() @@ -706,7 +702,7 @@ def submit_write_expert(expert_id): # Barrier to ensure all ranks see the written data if dist.is_initialized(): - dist.barrier(group=get_tp_group().device_group) + get_tp_group().barrier() with torch.cuda.stream(copy_stream): slot = e % 2 # Double buffering @@ -874,7 +870,7 @@ def submit_write_expert(expert_id, slot): # Barrier to ensure all ranks see the written data if dist.is_initialized(): - dist.barrier(group=get_tp_group().device_group) + get_tp_group().barrier() with torch.cuda.stream(copy_stream): for _, cpu_buf, gpu_t in weight_infos: @@ -1046,7 +1042,7 @@ def submit_write_expert(expert_id, slot): # Barrier to ensure all ranks see the written data if dist.is_initialized(): - dist.barrier(group=get_tp_group().device_group) + get_tp_group().barrier() with torch.cuda.stream(copy_stream): for _, cpu_buf, gpu_t in weight_infos: @@ -1198,7 +1194,7 @@ def submit_write_expert(expert_id, slot): # Barrier to ensure all ranks see the written data if dist.is_initialized(): - dist.barrier(group=get_tp_group().device_group) + get_tp_group().barrier() with torch.cuda.stream(copy_stream): for _, cpu_buf, gpu_t in weight_infos: