From e39fea59d2d79daf7a880f98cd26f0c11f04f45f Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 12 Mar 2026 23:07:23 -0500 Subject: [PATCH 01/11] connmgr: Implement separate mutex for connections. This is the first in a series of commits that will ultimately convert all of the code related to connection manager handling events to make use of separate mutexes and atomics to protect concurrent access. The motivation for the change in architecture follows. Currently, nearly all operations performed by the connection manager are implemented via a single event handler goroutine and a single message channel to protect concurrent access. Generally speaking, mutexes and atomics are best suited to caches and state tracking, while channels are better suited to distributing units of work and communicating async results. While the existing implementation has worked well for many years, it has several downsides such as: - Adding any new functionality requires additional plumbing to provide access to any related information - The state is mostly inaccessible since it is limited to a single goroutine - The use of channels significantly hinders dynamically adjusting to changing conditions based on the state due to the previous There are several other improvements that would be ideal to make, but in the effort of making it easier to follow the changes and assert correctness, this series of commits focuses only on converting it to a synchronous model. With all of the in mind, the commit starts the conversion by introducing a separate mutex to protect the connection requests and moving the map that tracks the connection requests out of the event handler to the connection manager itself. This will allow future commits to methodically refactor the various operations without introducing races. --- internal/connmgr/connmanager.go | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/internal/connmgr/connmanager.go b/internal/connmgr/connmanager.go index ad23d1282..0a323fb57 100644 --- a/internal/connmgr/connmanager.go +++ b/internal/connmgr/connmanager.go @@ -235,6 +235,14 @@ type ConnManager struct { // requests is used internally to interact with the connection handler // goroutine. requests chan interface{} + + // The following fields are used to track the various connections managed + // by the connection manager. They are protected by the associated + // connection mutex. + // + // conns represents the set of all active connections. + connMtx sync.RWMutex + conns map[uint64]*ConnReq } // handleFailedConn handles a connection failed due to a disconnect or any @@ -292,9 +300,6 @@ func (cm *ConnManager) connHandler(ctx context.Context) { // pending holds all registered conn requests that have yet to // succeed. pending = make(map[uint64]*ConnReq) - - // conns represents the set of all actively connected peers. - conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound) ) out: @@ -320,9 +325,11 @@ out: continue } + cm.connMtx.Lock() connReq.updateState(ConnEstablished) connReq.conn = msg.conn - conns[connReqID] = connReq + cm.conns[connReqID] = connReq + cm.connMtx.Unlock() log.Debugf("Connected to %v", connReq) connReq.retryCount = 0 cm.failedAttempts = 0 @@ -334,7 +341,9 @@ out: } case handleDisconnected: - connReq, ok := conns[msg.id] + cm.connMtx.Lock() + connReq, ok := cm.conns[msg.id] + cm.connMtx.Unlock() if !ok { connReq, ok = pending[msg.id] if !ok { @@ -357,7 +366,9 @@ out: // disconnected and execute disconnection // callback. log.Debugf("Disconnected from %v", connReq) - delete(conns, msg.id) + cm.connMtx.Lock() + delete(cm.conns, msg.id) + cm.connMtx.Unlock() if connReq.conn != nil { connReq.conn.Close() @@ -378,7 +389,9 @@ out: // Otherwise, attempt a reconnection when there are not already // enough outbound peers to satisfy the target number of // outbound peers or this is a persistent peer. - numConns := uint32(len(conns)) + cm.connMtx.Lock() + numConns := uint32(len(cm.conns)) + cm.connMtx.Unlock() if numConns < cm.cfg.TargetOutbound || connReq.Permanent { // The connection request is reused for persistent peers, so // add it back to the pending map in that case so that @@ -440,12 +453,14 @@ out: msg.done <- err continue } - for _, connReq := range conns { + cm.connMtx.Lock() + for _, connReq := range cm.conns { err = msg.f(connReq) if err != nil { break } } + cm.connMtx.Unlock() msg.done <- err } @@ -757,6 +772,7 @@ func New(cfg *Config) (*ConnManager, error) { cfg: *cfg, // Copy so caller can't mutate requests: make(chan interface{}), quit: make(chan struct{}), + conns: make(map[uint64]*ConnReq, cfg.TargetOutbound), } return &cm, nil } From f01455d60394392c0349d7f710213ac108176341 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 12 Mar 2026 23:07:24 -0500 Subject: [PATCH 02/11] netsync: Make pending conns map concurrent safe. This moves the pending connection requests map out of the event handler goroutine to the connection manager itself and makes it concurrent safe by protecting it with the new connection mutex. This is part of the overall effort to convert the code related to handling the various connection manager events to synchronous code. --- internal/connmgr/connmanager.go | 58 +++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/internal/connmgr/connmanager.go b/internal/connmgr/connmanager.go index 0a323fb57..265cfdd66 100644 --- a/internal/connmgr/connmanager.go +++ b/internal/connmgr/connmanager.go @@ -62,8 +62,8 @@ type ConnReq struct { // state is the current connection state for this connection request. state atomic.Uint32 - // The following fields are owned by the connection handler and must not - // be accessed outside of it. + // The following fields are owned by the connection manager and must not + // be accessed without its connection mutex held. // // retryCount is the number of times a permanent connection request that // fails to connect has been retried since the last successful connection. @@ -240,8 +240,12 @@ type ConnManager struct { // by the connection manager. They are protected by the associated // connection mutex. // + // pending holds all registered connection requests that have yet to + // succeed. + // // conns represents the set of all active connections. connMtx sync.RWMutex + pending map[uint64]*ConnReq conns map[uint64]*ConnReq } @@ -250,6 +254,8 @@ type ConnManager struct { // retry duration. Otherwise, if required, it makes a new connection request. // After maxFailedConnectionAttempts new connections will be retried after the // configured retry duration. +// +// This function MUST be called with the connection lock held (writes). func (cm *ConnManager) handleFailedConn(ctx context.Context, c *ConnReq) { // Ignore during shutdown. if ctx.Err() != nil { @@ -296,12 +302,6 @@ func (cm *ConnManager) handleFailedConn(ctx context.Context, c *ConnReq) { // connections so that we remain connected to the network. Connection requests // are processed and mapped by their assigned ids. func (cm *ConnManager) connHandler(ctx context.Context) { - var ( - // pending holds all registered conn requests that have yet to - // succeed. - pending = make(map[uint64]*ConnReq) - ) - out: for { select { @@ -309,32 +309,35 @@ out: switch msg := req.(type) { case registerPending: connReq := msg.c + cm.connMtx.Lock() connReq.updateState(ConnPending) - pending[msg.c.ID()] = connReq + cm.pending[msg.c.ID()] = connReq + cm.connMtx.Unlock() close(msg.done) case handleConnected: connReq := msg.c + cm.connMtx.Lock() connReqID := connReq.ID() - if _, ok := pending[connReqID]; !ok { + if _, ok := cm.pending[connReqID]; !ok { if msg.conn != nil { msg.conn.Close() } + cm.connMtx.Unlock() log.Debugf("Ignoring connection for "+ "canceled connreq=%v", connReq) continue } - cm.connMtx.Lock() connReq.updateState(ConnEstablished) connReq.conn = msg.conn cm.conns[connReqID] = connReq - cm.connMtx.Unlock() log.Debugf("Connected to %v", connReq) connReq.retryCount = 0 cm.failedAttempts = 0 - delete(pending, connReqID) + delete(cm.pending, connReqID) + cm.connMtx.Unlock() if cm.cfg.OnConnection != nil { go cm.cfg.OnConnection(connReq, msg.conn) @@ -343,9 +346,8 @@ out: case handleDisconnected: cm.connMtx.Lock() connReq, ok := cm.conns[msg.id] - cm.connMtx.Unlock() if !ok { - connReq, ok = pending[msg.id] + connReq, ok = cm.pending[msg.id] if !ok { log.Errorf("Unknown connid=%d", msg.id) @@ -358,7 +360,8 @@ out: // connection. connReq.updateState(ConnCanceled) log.Debugf("Canceling: %v", connReq) - delete(pending, msg.id) + delete(cm.pending, msg.id) + cm.connMtx.Unlock() continue } @@ -366,9 +369,7 @@ out: // disconnected and execute disconnection // callback. log.Debugf("Disconnected from %v", connReq) - cm.connMtx.Lock() delete(cm.conns, msg.id) - cm.connMtx.Unlock() if connReq.conn != nil { connReq.conn.Close() @@ -383,15 +384,14 @@ out: // make no further attempts with this request. if !msg.retry { connReq.updateState(ConnDisconnected) + cm.connMtx.Unlock() continue } // Otherwise, attempt a reconnection when there are not already // enough outbound peers to satisfy the target number of // outbound peers or this is a persistent peer. - cm.connMtx.Lock() numConns := uint32(len(cm.conns)) - cm.connMtx.Unlock() if numConns < cm.cfg.TargetOutbound || connReq.Permanent { // The connection request is reused for persistent peers, so // add it back to the pending map in that case so that @@ -400,15 +400,17 @@ out: if connReq.Permanent { connReq.updateState(ConnPending) log.Debugf("Reconnecting to %v", connReq) - pending[msg.id] = connReq + cm.pending[msg.id] = connReq } cm.handleFailedConn(ctx, connReq) } + cm.connMtx.Unlock() case handleFailed: connReq := msg.c - if _, ok := pending[connReq.ID()]; !ok { + cm.connMtx.Lock() + if _, ok := cm.pending[connReq.ID()]; !ok { log.Debugf("Ignoring connection for "+ "canceled conn req: %v", connReq) continue @@ -418,12 +420,14 @@ out: log.Debugf("Failed to connect to %v: %v", connReq, msg.err) cm.handleFailedConn(ctx, connReq) + cm.connMtx.Unlock() case handleCancelPending: pendingAddr := msg.addr.String() var idToRemove uint64 var connReq *ConnReq - for id, req := range pending { + cm.connMtx.Lock() + for id, req := range cm.pending { if req == nil || req.Addr == nil { continue } @@ -433,22 +437,25 @@ out: } } if connReq != nil { - delete(pending, idToRemove) + delete(cm.pending, idToRemove) connReq.updateState(ConnCanceled) log.Debugf("Canceled pending connection to %v", msg.addr) msg.done <- nil } else { msg.done <- fmt.Errorf("no pending connection to %v", msg.addr) } + cm.connMtx.Unlock() case handleForEachConnReq: var err error - for _, connReq := range pending { + cm.connMtx.Lock() + for _, connReq := range cm.pending { err = msg.f(connReq) if err != nil { break } } + cm.connMtx.Unlock() if err != nil { msg.done <- err continue @@ -772,6 +779,7 @@ func New(cfg *Config) (*ConnManager, error) { cfg: *cfg, // Copy so caller can't mutate requests: make(chan interface{}), quit: make(chan struct{}), + pending: make(map[uint64]*ConnReq), conns: make(map[uint64]*ConnReq, cfg.TargetOutbound), } return &cm, nil From 6e34562abdefbbb87886c5b4e7e3fe2fa615f20a Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 12 Mar 2026 23:07:25 -0500 Subject: [PATCH 03/11] connmgr: Make register pending logic synchronous. This refactors the logic related to registering a pending connection out of the event handler since it is now independently concurrent safe. This is a part of the overall effort to convert the code related to handling the various connection manager events to synchronous code. --- internal/connmgr/connmanager.go | 77 +++++++++------------------------ 1 file changed, 20 insertions(+), 57 deletions(-) diff --git a/internal/connmgr/connmanager.go b/internal/connmgr/connmanager.go index 265cfdd66..0b17a4497 100644 --- a/internal/connmgr/connmanager.go +++ b/internal/connmgr/connmanager.go @@ -165,15 +165,6 @@ type Config struct { Timeout time.Duration } -// registerPending is used to register a pending connection attempt. By -// registering pending connection attempts we allow callers to cancel pending -// connection attempts before they're successful or in the case they're no -// longer wanted. -type registerPending struct { - c *ConnReq - done chan struct{} -} - // handleConnected is used to queue a successful connection. type handleConnected struct { c *ConnReq @@ -307,14 +298,6 @@ out: select { case req := <-cm.requests: switch msg := req.(type) { - case registerPending: - connReq := msg.c - cm.connMtx.Lock() - connReq.updateState(ConnPending) - cm.pending[msg.c.ID()] = connReq - cm.connMtx.Unlock() - close(msg.done) - case handleConnected: connReq := msg.c cm.connMtx.Lock() @@ -479,6 +462,15 @@ out: log.Trace("Connection handler done") } +// registerPending registers the provided connection request as a pending +// connection attempt. +// +// This function MUST be called with the connection mutex lock held (writes). +func (cm *ConnManager) registerPending(connReq *ConnReq) { + connReq.updateState(ConnPending) + cm.pending[connReq.ID()] = connReq +} + // newConnReq creates a new connection request and connects to the // corresponding address. func (cm *ConnManager) newConnReq(ctx context.Context) { @@ -490,24 +482,11 @@ func (cm *ConnManager) newConnReq(ctx context.Context) { c := &ConnReq{} c.id.Store(cm.connReqCount.Add(1)) - // Submit a request of a pending connection attempt to the connection - // manager. By registering the id before the connection is even - // established, we'll be able to later cancel the connection via the - // Remove method. - done := make(chan struct{}) - select { - case cm.requests <- registerPending{c, done}: - case <-cm.quit: - return - } - - // Wait for the registration to successfully add the pending conn req to - // the conn manager's internal state. - select { - case <-done: - case <-cm.quit: - return - } + // Register the pending connection attempt so it can be canceled via the + // [ConnManager.Remove] method. + cm.connMtx.Lock() + cm.registerPending(c) + cm.connMtx.Unlock() addr, err := cm.cfg.GetNewAddress() if err != nil { @@ -552,10 +531,9 @@ func (cm *ConnManager) Connect(ctx context.Context, c *ConnReq) { return } - // Assign an ID and register a pending connection attempt with the - // connection manager when an ID has not already been assigned. By - // registering the ID before the connection is established, it can later be - // canceled via the Remove method. + // Assign an ID and register the pending connection attempt when an ID has + // not already been assigned so it can be canceled via the + // [ConnManager.Remove] method. // // Note that the assignment of the ID and the overall request count need to // be synchronized. So long as this is the only place an existing conn @@ -571,24 +549,9 @@ func (cm *ConnManager) Connect(ctx context.Context, c *ConnReq) { } cm.assignIDMtx.Unlock() if doRegisterPending { - // Submit a request of a pending connection attempt to the - // connection manager. By registering the id before the - // connection is even established, we'll be able to later - // cancel the connection via the Remove method. - done := make(chan struct{}) - select { - case cm.requests <- registerPending{c, done}: - case <-cm.quit: - return - } - - // Wait for the registration to successfully add the pending - // conn req to the conn manager's internal state. - select { - case <-done: - case <-cm.quit: - return - } + cm.connMtx.Lock() + cm.registerPending(c) + cm.connMtx.Unlock() } log.Debugf("Attempting to connect to %v", c) From 5e27edd3beb8f63ab2e2f17af74ec351a680b12f Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 12 Mar 2026 23:07:26 -0500 Subject: [PATCH 04/11] connmgr: Make cancel pending logic synchronous. This refactors the logic related to canceling a pending connection out of the event handler since it is now independently concurrent safe. This is a part of the overall effort to convert the code related to handling the various connection manager events to synchronous code. --- internal/connmgr/connmanager.go | 71 ++++++++++++++------------------- internal/connmgr/error.go | 4 ++ internal/connmgr/error_test.go | 1 + 3 files changed, 34 insertions(+), 42 deletions(-) diff --git a/internal/connmgr/connmanager.go b/internal/connmgr/connmanager.go index 0b17a4497..396789625 100644 --- a/internal/connmgr/connmanager.go +++ b/internal/connmgr/connmanager.go @@ -183,12 +183,6 @@ type handleFailed struct { err error } -// handleCancelPending is used to remove failing connections from retries. -type handleCancelPending struct { - addr net.Addr - done chan error -} - // handleForEachConnReq is used to iterate all known connection requests to // include pending ones. type handleForEachConnReq struct { @@ -405,30 +399,6 @@ out: cm.handleFailedConn(ctx, connReq) cm.connMtx.Unlock() - case handleCancelPending: - pendingAddr := msg.addr.String() - var idToRemove uint64 - var connReq *ConnReq - cm.connMtx.Lock() - for id, req := range cm.pending { - if req == nil || req.Addr == nil { - continue - } - if pendingAddr == req.Addr.String() { - idToRemove, connReq = id, req - break - } - } - if connReq != nil { - delete(cm.pending, idToRemove) - connReq.updateState(ConnCanceled) - log.Debugf("Canceled pending connection to %v", msg.addr) - msg.done <- nil - } else { - msg.done <- fmt.Errorf("no pending connection to %v", msg.addr) - } - cm.connMtx.Unlock() - case handleForEachConnReq: var err error cm.connMtx.Lock() @@ -604,26 +574,43 @@ func (cm *ConnManager) Remove(id uint64) { } } +// findPendingByAddr attempts to find and return the pending connection request +// associated with the provided address. It returns nil if no matching request +// is found. +// +// This function MUST be called with the connection mutex held (writes). +func (cm *ConnManager) findPendingByAddr(addr net.Addr) *ConnReq { + pendingAddr := addr.String() + for _, req := range cm.pending { + if req == nil || req.Addr == nil { + continue + } + if pendingAddr == req.Addr.String() { + return req + } + } + return nil +} + // CancelPending removes the connection corresponding to the given address // from the list of pending failed connections. // // Returns an error if the connection manager is stopped or there is no pending // connection for the given address. func (cm *ConnManager) CancelPending(addr net.Addr) error { - done := make(chan error, 1) - select { - case cm.requests <- handleCancelPending{addr, done}: - case <-cm.quit: - } + cm.connMtx.Lock() + defer cm.connMtx.Unlock() - // Wait for the connection to be removed from the conn manager's - // internal state. - select { - case err := <-done: - return err - case <-cm.quit: - return fmt.Errorf("connection manager stopped") + connReq := cm.findPendingByAddr(addr) + if connReq == nil { + str := fmt.Sprintf("no pending connection to %v", addr) + return MakeError(ErrNotFound, str) } + + delete(cm.pending, connReq.ID()) + connReq.updateState(ConnCanceled) + log.Debugf("Canceled pending connection to %v", addr) + return nil } // ForEachConnReq calls the provided function with each connection request known diff --git a/internal/connmgr/error.go b/internal/connmgr/error.go index 5c9ffdfe9..e1ed71f11 100644 --- a/internal/connmgr/error.go +++ b/internal/connmgr/error.go @@ -18,6 +18,10 @@ const ( // cannot both be specified in the configuration. ErrBothDialsFilled = ErrorKind("ErrBothDialsFilled") + // ErrNotFound indicates a specified connection ID or address is unknown to + // the connection manager. + ErrNotFound = ErrorKind("ErrNotFound") + // ErrTorInvalidAddressResponse indicates an invalid address was // returned by the Tor DNS resolver. ErrTorInvalidAddressResponse = ErrorKind("ErrTorInvalidAddressResponse") diff --git a/internal/connmgr/error_test.go b/internal/connmgr/error_test.go index 4841daee2..f1f3bcf05 100644 --- a/internal/connmgr/error_test.go +++ b/internal/connmgr/error_test.go @@ -18,6 +18,7 @@ func TestErrorKindStringer(t *testing.T) { }{ {ErrDialNil, "ErrDialNil"}, {ErrBothDialsFilled, "ErrBothDialsFilled"}, + {ErrNotFound, "ErrNotFound"}, {ErrTorInvalidAddressResponse, "ErrTorInvalidAddressResponse"}, {ErrTorInvalidProxyResponse, "ErrTorInvalidProxyResponse"}, {ErrTorUnrecognizedAuthMethod, "ErrTorUnrecognizedAuthMethod"}, From 2ef577358ef8020fb9dff1ef3a2fc568f032371c Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 12 Mar 2026 23:07:26 -0500 Subject: [PATCH 05/11] connmgr: Make conn iteration logic synchronous. This refactors the logic related to iterating active connections out of the event handler since it is now independently concurrent safe. This is a part of the overall effort to convert the code related to handling the various connection manager events to synchronous code. --- internal/connmgr/connmanager.go | 56 +++++++++------------------------ 1 file changed, 15 insertions(+), 41 deletions(-) diff --git a/internal/connmgr/connmanager.go b/internal/connmgr/connmanager.go index 396789625..e04ecf382 100644 --- a/internal/connmgr/connmanager.go +++ b/internal/connmgr/connmanager.go @@ -183,13 +183,6 @@ type handleFailed struct { err error } -// handleForEachConnReq is used to iterate all known connection requests to -// include pending ones. -type handleForEachConnReq struct { - f func(c *ConnReq) error - done chan error -} - // ConnManager provides a manager to handle network connections. type ConnManager struct { // connReqCount is the number of connection requests that have been made and @@ -398,30 +391,6 @@ out: connReq, msg.err) cm.handleFailedConn(ctx, connReq) cm.connMtx.Unlock() - - case handleForEachConnReq: - var err error - cm.connMtx.Lock() - for _, connReq := range cm.pending { - err = msg.f(connReq) - if err != nil { - break - } - } - cm.connMtx.Unlock() - if err != nil { - msg.done <- err - continue - } - cm.connMtx.Lock() - for _, connReq := range cm.conns { - err = msg.f(connReq) - if err != nil { - break - } - } - cm.connMtx.Unlock() - msg.done <- err } case <-ctx.Done(): @@ -623,18 +592,23 @@ func (cm *ConnManager) CancelPending(addr net.Addr) error { // NOTE: This must not call any other connection manager methods during // iteration or it will result in a deadlock. func (cm *ConnManager) ForEachConnReq(f func(c *ConnReq) error) error { - done := make(chan error, 1) - select { - case cm.requests <- handleForEachConnReq{f, done}: - case <-cm.quit: - } + cm.connMtx.Lock() + defer cm.connMtx.Unlock() - select { - case err := <-done: - return err - case <-cm.quit: - return fmt.Errorf("connection manager stopped") + var err error + for _, connReq := range cm.pending { + err = f(connReq) + if err != nil { + return err + } + } + for _, connReq := range cm.conns { + err = f(connReq) + if err != nil { + return err + } } + return nil } // listenHandler accepts incoming connections on a given listener. It must be From 21ab40dea8c74da63191f34edc2226b6465dea80 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 12 Mar 2026 23:07:27 -0500 Subject: [PATCH 06/11] connmgr: Make connected logic synchronous. This refactors the logic related to handled established connections out of the event handler since it is now independently concurrent safe. This is a part of the overall effort to convert the code related to handling the various connection manager events to synchronous code. --- internal/connmgr/connmanager.go | 59 ++++++++++++--------------------- 1 file changed, 22 insertions(+), 37 deletions(-) diff --git a/internal/connmgr/connmanager.go b/internal/connmgr/connmanager.go index e04ecf382..0753f3f8e 100644 --- a/internal/connmgr/connmanager.go +++ b/internal/connmgr/connmanager.go @@ -165,12 +165,6 @@ type Config struct { Timeout time.Duration } -// handleConnected is used to queue a successful connection. -type handleConnected struct { - c *ConnReq - conn net.Conn -} - // handleDisconnected is used to remove a connection. type handleDisconnected struct { id uint64 @@ -285,34 +279,6 @@ out: select { case req := <-cm.requests: switch msg := req.(type) { - case handleConnected: - connReq := msg.c - cm.connMtx.Lock() - connReqID := connReq.ID() - if _, ok := cm.pending[connReqID]; !ok { - if msg.conn != nil { - msg.conn.Close() - } - cm.connMtx.Unlock() - log.Debugf("Ignoring connection for "+ - "canceled connreq=%v", connReq) - continue - } - - connReq.updateState(ConnEstablished) - connReq.conn = msg.conn - cm.conns[connReqID] = connReq - log.Debugf("Connected to %v", connReq) - connReq.retryCount = 0 - cm.failedAttempts = 0 - - delete(cm.pending, connReqID) - cm.connMtx.Unlock() - - if cm.cfg.OnConnection != nil { - go cm.cfg.OnConnection(connReq, msg.conn) - } - case handleDisconnected: cm.connMtx.Lock() connReq, ok := cm.conns[msg.id] @@ -486,6 +452,7 @@ func (cm *ConnManager) Connect(ctx context.Context, c *ConnReq) { c.id.Store(cm.connReqCount.Add(1)) doRegisterPending = true } + connReqID := c.ID() cm.assignIDMtx.Unlock() if doRegisterPending { cm.connMtx.Lock() @@ -495,6 +462,8 @@ func (cm *ConnManager) Connect(ctx context.Context, c *ConnReq) { log.Debugf("Attempting to connect to %v", c) + // Attempt to establish the connection to the address associated with the + // connection request. Apply a timeout if requested. if cm.cfg.Timeout != 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, cm.cfg.Timeout) @@ -515,9 +484,25 @@ func (cm *ConnManager) Connect(ctx context.Context, c *ConnReq) { return } - select { - case cm.requests <- handleConnected{c, conn}: - case <-cm.quit: + cm.connMtx.Lock() + defer cm.connMtx.Unlock() + + if _, ok := cm.pending[connReqID]; !ok { + conn.Close() + log.Debugf("Ignoring connection for canceled connreq=%v", c) + return + } + + c.updateState(ConnEstablished) + c.conn = conn + cm.conns[connReqID] = c + log.Debugf("Connected to %v", c) + c.retryCount = 0 + cm.failedAttempts = 0 + delete(cm.pending, connReqID) + + if cm.cfg.OnConnection != nil { + go cm.cfg.OnConnection(c, conn) } } From 89f32c8e841b2ecb68246da068cd52e9da1a6053 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 12 Mar 2026 23:07:28 -0500 Subject: [PATCH 07/11] connmgr: Make failed conn logic synchronous. This refactors the logic related to handled failed connections out of the event handler since it is now independently concurrent safe. This is a part of the overall effort to convert the code related to handling the various connection manager events to synchronous code. --- internal/connmgr/connmanager.go | 166 +++++++++++++++++--------------- 1 file changed, 89 insertions(+), 77 deletions(-) diff --git a/internal/connmgr/connmanager.go b/internal/connmgr/connmanager.go index 0753f3f8e..51e53f949 100644 --- a/internal/connmgr/connmanager.go +++ b/internal/connmgr/connmanager.go @@ -171,12 +171,6 @@ type handleDisconnected struct { retry bool } -// handleFailed is used to remove a pending connection. -type handleFailed struct { - c *ConnReq - err error -} - // ConnManager provides a manager to handle network connections. type ConnManager struct { // connReqCount is the number of connection requests that have been made and @@ -221,52 +215,6 @@ type ConnManager struct { conns map[uint64]*ConnReq } -// handleFailedConn handles a connection failed due to a disconnect or any -// other failure. If permanent, it retries the connection after the configured -// retry duration. Otherwise, if required, it makes a new connection request. -// After maxFailedConnectionAttempts new connections will be retried after the -// configured retry duration. -// -// This function MUST be called with the connection lock held (writes). -func (cm *ConnManager) handleFailedConn(ctx context.Context, c *ConnReq) { - // Ignore during shutdown. - if ctx.Err() != nil { - return - } - - if c.Permanent { - c.retryCount++ - d := time.Duration(c.retryCount) * cm.cfg.RetryDuration - if d > maxRetryDuration { - d = maxRetryDuration - } - log.Debugf("Retrying connection to %v in %v", c, d) - go func() { - select { - case <-time.After(d): - cm.Connect(ctx, c) - case <-cm.quit: - } - }() - } else if cm.cfg.GetNewAddress != nil { - cm.failedAttempts++ - if cm.failedAttempts >= maxFailedAttempts { - log.Debugf("Max failed connection attempts reached: [%d] "+ - "-- retrying connection in: %v", maxFailedAttempts, - cm.cfg.RetryDuration) - go func() { - select { - case <-time.After(cm.cfg.RetryDuration): - cm.newConnReq(ctx) - case <-cm.quit: - } - }() - } else { - go cm.newConnReq(ctx) - } - } -} - // connHandler handles all connection related requests. It must be run as a // goroutine. // @@ -342,21 +290,6 @@ out: cm.handleFailedConn(ctx, connReq) } cm.connMtx.Unlock() - - case handleFailed: - connReq := msg.c - cm.connMtx.Lock() - if _, ok := cm.pending[connReq.ID()]; !ok { - log.Debugf("Ignoring connection for "+ - "canceled conn req: %v", connReq) - continue - } - - connReq.updateState(ConnFailed) - log.Debugf("Failed to connect to %v: %v", - connReq, msg.err) - cm.handleFailedConn(ctx, connReq) - cm.connMtx.Unlock() } case <-ctx.Done(): @@ -376,8 +309,8 @@ func (cm *ConnManager) registerPending(connReq *ConnReq) { cm.pending[connReq.ID()] = connReq } -// newConnReq creates a new connection request and connects to the -// corresponding address. +// newConnReq creates a new connection request and connects to the corresponding +// address. func (cm *ConnManager) newConnReq(ctx context.Context) { // Ignore during shutdown. if ctx.Err() != nil { @@ -395,10 +328,9 @@ func (cm *ConnManager) newConnReq(ctx context.Context) { addr, err := cm.cfg.GetNewAddress() if err != nil { - select { - case cm.requests <- handleFailed{c, err}: - case <-cm.quit: - } + cm.connMtx.Lock() + cm.handleFailedPending(ctx, c, err) + cm.connMtx.Unlock() return } @@ -407,6 +339,87 @@ func (cm *ConnManager) newConnReq(ctx context.Context) { cm.Connect(ctx, c) } +// handleFailedConn handles a connection failed due to a disconnect or any other +// failure. Permanent connection requests are retried after the configured +// retry duration. A new connection request is created if required. +// +// In the event there have been [maxFailedAttempts] failed successive attempts, +// new connections will be retried after the configured retry duration. +// +// This function MUST be called with the connection lock held (writes). +func (cm *ConnManager) handleFailedConn(ctx context.Context, c *ConnReq) { + // Ignore during shutdown. + select { + case <-cm.quit: + return + case <-ctx.Done(): + return + default: + } + + // Reconnect to permanent connection requests after a retry timeout with + // an increasing backoff up to a max for repeated failed attempts. + if c.Permanent { + c.retryCount++ + retryWait := time.Duration(c.retryCount) * cm.cfg.RetryDuration + retryWait = min(retryWait, maxRetryDuration) + log.Debugf("Retrying connection to %v in %v", c, retryWait) + go func() { + select { + case <-time.After(retryWait): + cm.Connect(ctx, c) + case <-cm.quit: + case <-ctx.Done(): + } + }() + return + } + + // Nothing more to do when the method to automatically get new addresses + // to connect to isn't configured. + if cm.cfg.GetNewAddress == nil { + return + } + + // Wait to attempt new connections when there are too many successive + // failures. This prevents massive connection spam when no connections can + // be made, such as a network outtage. + cm.failedAttempts++ + if cm.failedAttempts >= maxFailedAttempts { + log.Debugf("Max failed connection attempts reached: [%d] -- retrying "+ + "connection in: %v", maxFailedAttempts, cm.cfg.RetryDuration) + go func() { + select { + case <-time.After(cm.cfg.RetryDuration): + cm.newConnReq(ctx) + case <-cm.quit: + case <-ctx.Done(): + } + }() + return + } + + // Otherwise, attempt a new connection with a new address now. + go cm.newConnReq(ctx) +} + +// handleFailedPending handles failed pending connection requests. Connection +// requests that were canceled are ignored. Otherwise, their state is updated +// to mark it failed and it passed along to [ConnManager.handlFailedConn] to +// possibly retry or be reused for a new connection depending on settings. +// +// This function MUST be called with the connection lock held (writes). +func (cm *ConnManager) handleFailedPending(ctx context.Context, c *ConnReq, failedErr error) { + if _, ok := cm.pending[c.ID()]; !ok { + log.Debugf("Ignoring connection for canceled conn req: %v", c) + return + } + + c.updateState(ConnFailed) + log.Debugf("Failed to connect to %v: %v", c, failedErr) + cm.handleFailedConn(ctx, c) +} + // Connect assigns an id and dials a connection to the address of the connection // request using the provided context and the dial function configured when // initially creating the connection manager. @@ -477,10 +490,9 @@ func (cm *ConnManager) Connect(ctx context.Context, c *ConnReq) { conn, err = cm.cfg.DialAddr(ctx, c.Addr) } if err != nil { - select { - case cm.requests <- handleFailed{c, err}: - case <-cm.quit: - } + cm.connMtx.Lock() + cm.handleFailedPending(ctx, c, err) + cm.connMtx.Unlock() return } From 6de04d11ccfaba10d253c83a935af27280781748 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 12 Mar 2026 23:07:28 -0500 Subject: [PATCH 08/11] connmgr: Make disconnect logic synchronous. This refactors the logic related to disconnection out of the event handler since it is now independently concurrent safe. This is a part of the overall effort to convert the code related to handling the various connection manager events to synchronous code. --- internal/connmgr/connmanager.go | 162 ++++++++++++++++---------------- 1 file changed, 79 insertions(+), 83 deletions(-) diff --git a/internal/connmgr/connmanager.go b/internal/connmgr/connmanager.go index 51e53f949..83650dcfc 100644 --- a/internal/connmgr/connmanager.go +++ b/internal/connmgr/connmanager.go @@ -165,12 +165,6 @@ type Config struct { Timeout time.Duration } -// handleDisconnected is used to remove a connection. -type handleDisconnected struct { - id uint64 - retry bool -} - // ConnManager provides a manager to handle network connections. type ConnManager struct { // connReqCount is the number of connection requests that have been made and @@ -225,73 +219,7 @@ func (cm *ConnManager) connHandler(ctx context.Context) { out: for { select { - case req := <-cm.requests: - switch msg := req.(type) { - case handleDisconnected: - cm.connMtx.Lock() - connReq, ok := cm.conns[msg.id] - if !ok { - connReq, ok = cm.pending[msg.id] - if !ok { - log.Errorf("Unknown connid=%d", - msg.id) - continue - } - - // Pending connection was found, remove - // it from pending map if we should - // ignore a later, successful - // connection. - connReq.updateState(ConnCanceled) - log.Debugf("Canceling: %v", connReq) - delete(cm.pending, msg.id) - cm.connMtx.Unlock() - continue - } - - // An existing connection was located, mark as - // disconnected and execute disconnection - // callback. - log.Debugf("Disconnected from %v", connReq) - delete(cm.conns, msg.id) - - if connReq.conn != nil { - connReq.conn.Close() - } - - if cm.cfg.OnDisconnection != nil { - go cm.cfg.OnDisconnection(connReq) - } - - // All internal state has been cleaned up, if - // this connection is being removed, we will - // make no further attempts with this request. - if !msg.retry { - connReq.updateState(ConnDisconnected) - cm.connMtx.Unlock() - continue - } - - // Otherwise, attempt a reconnection when there are not already - // enough outbound peers to satisfy the target number of - // outbound peers or this is a persistent peer. - numConns := uint32(len(cm.conns)) - if numConns < cm.cfg.TargetOutbound || connReq.Permanent { - // The connection request is reused for persistent peers, so - // add it back to the pending map in that case so that - // subsequent processing of connections and failures do not - // ignore the request. - if connReq.Permanent { - connReq.updateState(ConnPending) - log.Debugf("Reconnecting to %v", connReq) - cm.pending[msg.id] = connReq - } - - cm.handleFailedConn(ctx, connReq) - } - cm.connMtx.Unlock() - } - + case <-cm.requests: case <-ctx.Done(): break out } @@ -518,26 +446,94 @@ func (cm *ConnManager) Connect(ctx context.Context, c *ConnReq) { } } +// handleDisconnected handles a connection that has been disconnected. +// +// This function MUST be called with the connection mutex held (writes). +func (cm *ConnManager) handleDisconnected(id uint64, retry bool) { + // Mark the connection request as canceled and remove it from the pending + // connections when it is still pending. Since the connection attempt is + // taking place asynchronously, this ensures any later successful connection + // is ignored. + connReq, ok := cm.pending[id] + if ok { + connReq.updateState(ConnCanceled) + log.Debugf("Canceling: %v", connReq) + delete(cm.pending, id) + } + + connReq, ok = cm.conns[id] + if !ok { + log.Errorf("Unknown connid=%d", id) + return + } + + // Close the underlying connection and invoke the associated callback (if + // assigned). + log.Debugf("Disconnected from %v", connReq) + delete(cm.conns, id) + if connReq.conn != nil { + connReq.conn.Close() + } + if cm.cfg.OnDisconnection != nil { + go cm.cfg.OnDisconnection(connReq) + } + + // Mark the associated connection request as disconnected and return when no + // further attempts will be made now that all internal state has been + // cleaned up. + if !retry { + connReq.updateState(ConnDisconnected) + return + } + + // Otherwise, attempt a reconnection when the associated connection request + // is marked as permanent or there are not already enough outbound peers to + // satisfy the target number of outbound peers. + numConns := uint32(len(cm.conns)) + if connReq.Permanent || numConns < cm.cfg.TargetOutbound { + // The connection request is reused for permanent ones, so add it back + // to the pending map in that case so that subsequent processing of + // connections and failures do not ignore the request. + if connReq.Permanent { + cm.registerPending(connReq) + log.Debugf("Reconnecting to %v", connReq) + } + + // A background context is the only viable choice here. It is not + // ideal, but it is acceptable, because, ultimately, this context is + // really only used for persistent peers when they retry and persistent + // peers are not tied to a specific context anyway. They are instead + // removed by other means. Due to that, there also is no machinery to + // cancel a given persistent peer from a given context anyway. + // + // Future work ideally should refactor the persistent peer handling to + // have proper full context support. + cm.handleFailedConn(context.Background(), connReq) + } +} + // Disconnect disconnects the connection corresponding to the given connection -// id. If permanent, the connection will be retried with an increasing backoff +// id. Permanent connections will be retried with an increasing backoff // duration. +// +// This function is safe for concurrent access. func (cm *ConnManager) Disconnect(id uint64) { - select { - case cm.requests <- handleDisconnected{id, true}: - case <-cm.quit: - } + cm.connMtx.Lock() + cm.handleDisconnected(id, true) + cm.connMtx.Unlock() } // Remove removes the connection corresponding to the given connection id from // known connections. // -// NOTE: This method can also be used to cancel a lingering connection attempt +// NOTE: This method can also be used to cancel a pending connection attempt // that hasn't yet succeeded. +// +// This function is safe for concurrent access. func (cm *ConnManager) Remove(id uint64) { - select { - case cm.requests <- handleDisconnected{id, false}: - case <-cm.quit: - } + cm.connMtx.Lock() + cm.handleDisconnected(id, false) + cm.connMtx.Unlock() } // findPendingByAddr attempts to find and return the pending connection request From 381653f4dcacdf5e4acc1583b09cb790c002dc16 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 12 Mar 2026 23:07:29 -0500 Subject: [PATCH 09/11] connmgr: Remove unused requests chan and handler. This removes the requests message channel and connection handler since they are no longer used. This is a part of the overall effort to convert the code related to handling the various connection manager events to synchronous code. --- internal/connmgr/connmanager.go | 41 ++++----------------------------- 1 file changed, 5 insertions(+), 36 deletions(-) diff --git a/internal/connmgr/connmanager.go b/internal/connmgr/connmanager.go index 83650dcfc..12f44e3b4 100644 --- a/internal/connmgr/connmanager.go +++ b/internal/connmgr/connmanager.go @@ -192,10 +192,6 @@ type ConnManager struct { // outside of it. failedAttempts uint64 - // requests is used internally to interact with the connection handler - // goroutine. - requests chan interface{} - // The following fields are used to track the various connections managed // by the connection manager. They are protected by the associated // connection mutex. @@ -209,25 +205,6 @@ type ConnManager struct { conns map[uint64]*ConnReq } -// connHandler handles all connection related requests. It must be run as a -// goroutine. -// -// The connection handler makes sure that we maintain a pool of active outbound -// connections so that we remain connected to the network. Connection requests -// are processed and mapped by their assigned ids. -func (cm *ConnManager) connHandler(ctx context.Context) { -out: - for { - select { - case <-cm.requests: - case <-ctx.Done(): - break out - } - } - - log.Trace("Connection handler done") -} - // registerPending registers the provided connection request as a pending // connection attempt. // @@ -629,16 +606,9 @@ func (cm *ConnManager) listenHandler(ctx context.Context, listener net.Listener) func (cm *ConnManager) Run(ctx context.Context) { log.Trace("Starting connection manager") - // Start the connection handler goroutine. - var wg sync.WaitGroup - wg.Add(1) - go func() { - cm.connHandler(ctx) - wg.Done() - }() - // Start all the listeners so long as the caller requested them and provided // a callback to be invoked when connections are accepted. + var wg sync.WaitGroup var listeners []net.Listener if cm.cfg.OnAccept != nil { listeners = cm.cfg.Listeners @@ -693,11 +663,10 @@ func New(cfg *Config) (*ConnManager, error) { cfg.TargetOutbound = defaultTargetOutbound } cm := ConnManager{ - cfg: *cfg, // Copy so caller can't mutate - requests: make(chan interface{}), - quit: make(chan struct{}), - pending: make(map[uint64]*ConnReq), - conns: make(map[uint64]*ConnReq, cfg.TargetOutbound), + cfg: *cfg, // Copy so caller can't mutate + quit: make(chan struct{}), + pending: make(map[uint64]*ConnReq), + conns: make(map[uint64]*ConnReq, cfg.TargetOutbound), } return &cm, nil } From 86642b7a645b978a79e3bb0a6f6ffafe3ec71c64 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 12 Mar 2026 23:07:30 -0500 Subject: [PATCH 10/11] connmgr: More accurate dialaddr test. This makes TestPassAddrAlongDialAddr more accurate by explicitly detecting the dailed address directly in the provided dialer as opposed to from the conn request object of the mock connection. An invalid address, such as the one that is used in the test, would never result in a valid connection. --- internal/connmgr/connmanager_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/connmgr/connmanager_test.go b/internal/connmgr/connmanager_test.go index 8a499d340..74e3b156f 100644 --- a/internal/connmgr/connmanager_test.go +++ b/internal/connmgr/connmanager_test.go @@ -241,7 +241,11 @@ func TestTargetOutbound(t *testing.T) { // any address object returned by GetNewAddress will be correctly passed along // to DialAddr to be used for connecting to a host. func TestPassAddrAlongDialAddr(t *testing.T) { - connected := make(chan *ConnReq) + dailedAddr := make(chan net.Addr) + detectDialer := func(ctx context.Context, addr net.Addr) (net.Conn, error) { + dailedAddr <- addr + return nil, errors.New("error") + } // targetAddr will be the specific address we'll use to connect. It _could_ // be carrying more info than a standard (tcp/udp) network address, so it @@ -253,13 +257,10 @@ func TestPassAddrAlongDialAddr(t *testing.T) { cmgr, err := New(&Config{ TargetOutbound: 1, - DialAddr: mockDialerAddr, + DialAddr: detectDialer, GetNewAddress: func() (net.Addr, error) { return targetAddr, nil }, - OnConnection: func(c *ConnReq, conn net.Conn) { - connected <- c - }, }) if err != nil { t.Fatalf("New error: %v", err) @@ -267,8 +268,8 @@ func TestPassAddrAlongDialAddr(t *testing.T) { _, shutdown, wg := runConnMgrAsync(context.Background(), cmgr) select { - case c := <-connected: - receivedMock, isMockAddr := c.Addr.(mockAddr) + case addr := <-dailedAddr: + receivedMock, isMockAddr := addr.(mockAddr) if !isMockAddr { t.Fatal("connected to an address that was not a mockAddr") } From 1a4d8a0e85a7b4b35894577170d02a35ac32100f Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 12 Mar 2026 23:07:30 -0500 Subject: [PATCH 11/11] connmgr: Improve TestTargetOutbound robustness. This reworks the TestTargetOutbound test a bit to ensure it fails if the expected number of connections are not made within a certain timeout. The prevents a test timeout in the failure case. --- internal/connmgr/connmanager_test.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/internal/connmgr/connmanager_test.go b/internal/connmgr/connmanager_test.go index 74e3b156f..8b4fb62f2 100644 --- a/internal/connmgr/connmanager_test.go +++ b/internal/connmgr/connmanager_test.go @@ -199,8 +199,10 @@ func TestConnectMode(t *testing.T) { // configuration option by waiting until all connections are established and // ensuring they are the only connections made. func TestTargetOutbound(t *testing.T) { - targetOutbound := uint32(10) - connected := make(chan *ConnReq) + const targetOutbound = 10 + var numConnections atomic.Uint32 + hitTargetConns := make(chan struct{}) + extraConns := make(chan *ConnReq) cmgr, err := New(&Config{ TargetOutbound: targetOutbound, Dial: mockDialer, @@ -211,7 +213,13 @@ func TestTargetOutbound(t *testing.T) { }, nil }, OnConnection: func(c *ConnReq, conn net.Conn) { - connected <- c + totalConnections := numConnections.Add(1) + if totalConnections == targetOutbound { + close(hitTargetConns) + } + if totalConnections > targetOutbound { + extraConns <- c + } }, }) if err != nil { @@ -220,13 +228,15 @@ func TestTargetOutbound(t *testing.T) { _, shutdown, wg := runConnMgrAsync(context.Background(), cmgr) // Wait for the expected number of target outbound conns to be established. - for i := uint32(0); i < targetOutbound; i++ { - <-connected + select { + case <-hitTargetConns: + case <-time.After(20 * time.Millisecond): + t.Fatal("did not reach target number of conns before timeout") } // Ensure no additional connections are made. select { - case c := <-connected: + case c := <-extraConns: t.Fatalf("target outbound: got unexpected connection - %v", c.Addr) case <-time.After(time.Millisecond * 5): break