Skip to content

Unique QP per channel and env-controlled GID index for executor#762

Open
Binyang2014 wants to merge 6 commits intomainfrom
binyli/unique-qp-and-gid-index
Open

Unique QP per channel and env-controlled GID index for executor#762
Binyang2014 wants to merge 6 commits intomainfrom
binyli/unique-qp-and-gid-index

Conversation

@Binyang2014
Copy link
Contributor

Summary

This PR fixes two issues in the executor's connection and channel setup:

  1. Unique QP (connection) per channel: Previously, the executor used a single shared connection per peer (unordered_map<int, Connection>). This caused issues when multiple channels to the same peer needed independent QPs — particularly for semaphore signal forwarding in HostNoAtomic mode, where each semaphore's setSignalForwardingDst must target a unique QP. The fix changes connections to a vector<Connection> where each channel gets its own dedicated connection.

  2. Per-peer tag matching: The previous global tag++ counter could cause tag mismatches between ranks with different peer topologies. Tags are now tracked per-peer using std::unordered_map<int, int> peerTags, ensuring symmetric tag sequences between any two communicating ranks.

  3. Environment-controlled GID index (MSCCLPP_IB_GID_INDEX): Adds a new environment variable to override the IB GID index used for queue pair creation. This is applied in the EndpointConfig::Ib constructor so it takes effect for all IB connections uniformly.

  4. Cleanup: Removed redundant memorySemaphores and proxySemaphores vectors from ExecutionContext — semaphore lifetimes are already managed by BaseMemoryChannel (via shared_ptr) and ProxyService respectively.

Changes

  • include/mscclpp/core.hpp: Added #include <mscclpp/env.hpp>, apply MSCCLPP_IB_GID_INDEX env override in EndpointConfig::Ib constructor.
  • include/mscclpp/env.hpp: Added ibGidIndex env variable (MSCCLPP_IB_GID_INDEX, default -1).
  • src/core/env.cpp: Initialize and log the new ibGidIndex env variable.
  • src/core/executor/executor.cc:
    • connections: changed from unordered_map<int, Connection> to vector<Connection>.
    • setupConnections: create one connection per channel entry (paired + unpaired), using per-peer tags.
    • setupChannels: walk connections sequentially via connIdx++, use local semaphore vectors directly.
    • Removed memorySemaphores/proxySemaphores from ExecutionContext struct.

- Change executor to create one connection (unique QP) per channel entry
  instead of sharing connections per peer. This is required for HostNoAtomic
  IB mode where each connection can only forward signals to one semaphore
  via setSignalForwardingDst.

- Add MSCCLPP_IB_GID_INDEX environment variable to override the default
  GID index (3) used for IB transport. Set to the desired GID index value,
  or leave unset/-1 to use the default.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates executor connection/semaphore setup to support multiple independent channels per peer and adds an environment variable to override the InfiniBand GID index used when creating IB endpoints.

Changes:

  • Create one Connection per channel instance (paired + unpaired) and switch connection storage from per-peer map to an ordered vector.
  • Make connection tags tracked per-peer (instead of a single global counter) to avoid mismatches across asymmetric peer topologies.
  • Add MSCCLPP_IB_GID_INDEX and apply it in EndpointConfig::Ib construction.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
src/core/executor/executor.cc Changes connection creation to per-channel, adds per-peer tags, and adjusts channel/semaphore setup to consume connections sequentially.
src/core/env.cpp Initializes and logs the new MSCCLPP_IB_GID_INDEX environment variable.
include/mscclpp/env.hpp Declares the new Env::ibGidIndex field and documents the env var.
include/mscclpp/core.hpp Includes env support and applies the GID index override inside EndpointConfig::Ib.
Comments suppressed due to low confidence (1)

src/core/executor/executor.cc:387

  • setupChannels() builds semaphores for both paired and getUnpairedChannelInfos() entries, but only stores semaphores for paired channels into context.memoryChannels. Any unpaired MemoryDevice2DeviceSemaphore objects are dropped when setupChannels() returns, which will unregister the local semaphore memory even though the peer may still use it (unpaired infos exist specifically when the peer has more channels than this rank). Please keep the unpaired memory semaphores alive in ExecutionContext (e.g., a dedicated vector) or otherwise ensure their backing RegisteredMemory remains valid for the peer.
    for (ChannelType channelType : channelTypes) {
      std::vector<ChannelInfo> channelInfos = plan.impl_->getChannelInfos(channelType);
      processChannelInfos(channelInfos);
      // Current semaphore construction requires two-way communication, e.g., to construct a semaphore signaling from
      // rank 0 to rank 1, both rank 0 and rank 1 need to send a message to each other. This PR fixes an executor bug
      // that fails to conduct two-way communication for constructing such one-way semaphores, and instead hangs
      // during the semaphore construction.
      channelInfos = plan.impl_->getUnpairedChannelInfos(nranks, channelType);
      processChannelInfos(channelInfos);
    }

    for (auto sem : futureMemorySemaphores) {
      memorySemaphores.push_back(std::make_shared<MemoryDevice2DeviceSemaphore>(sem.get()));
    }
    for (auto sem : futureProxySemaphores) {
      proxySemaphores.push_back(context.proxyService->addSemaphore(sem.get()));
    }

    for (ChannelType channelType : channelTypes) {
      std::vector<ChannelInfo> channelInfos = plan.impl_->getChannelInfos(channelType);
      int index = 0;
      for (ChannelInfo& info : channelInfos) {
        for (size_t i = 0; i < info.connectedPeers.size(); i++) {
          if (channelType == ChannelType::MEMORY) {
            context.memoryChannels.emplace_back(memorySemaphores[index++]);
          } else if (channelType == ChannelType::PORT) {
            context.portChannels.emplace_back(context.proxyService->basePortChannel(proxySemaphores[index++]));
          }
        }
      }
    }

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants