Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions engine/access/rpc/connection/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
lru "github.com/hashicorp/golang-lru/v2"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm attempting to fix this in another PR by completely refactoring the connection cache:
#7859

it needs some more rounds of review though.

my preference is to leave this logic as is, and push that PR forward to fix the tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look 👀.

Would you be ok with adding a delay in the test to temporarily address the issue?

Are you ok with the other two?

"github.com/onflow/crypto"
"github.com/rs/zerolog"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"

Expand All @@ -16,20 +15,25 @@ import (

// CachedClient represents a gRPC client connection that is cached for reuse.
type CachedClient struct {
conn *grpc.ClientConn
address string
cfg Config

cache *Cache
closeRequested *atomic.Bool
wg sync.WaitGroup
mu sync.RWMutex
cache *Cache

connMu sync.RWMutex
conn *grpc.ClientConn

closeRequested bool
// wgMu mutex is needed to protect the workgroup from being added to
// if we are in a closeRequested state.
wgMu sync.RWMutex
wg sync.WaitGroup
}

// ClientConn returns the underlying gRPC client connection.
func (cc *CachedClient) ClientConn() *grpc.ClientConn {
cc.mu.RLock()
defer cc.mu.RUnlock()
cc.connMu.RLock()
defer cc.connMu.RUnlock()
return cc.conn
}

Expand All @@ -40,12 +44,22 @@ func (cc *CachedClient) Address() string {

// CloseRequested returns true if the CachedClient has been marked for closure.
func (cc *CachedClient) CloseRequested() bool {
return cc.closeRequested.Load()
cc.wgMu.RLock()
defer cc.wgMu.RUnlock()

return cc.closeRequested
}

// AddRequest increments the in-flight request counter for the CachedClient.
// It returns a function that should be called when the request completes to decrement the counter
func (cc *CachedClient) AddRequest() func() {
cc.wgMu.RLock()
defer cc.wgMu.RUnlock()

// if close is requested, cc.wg might already be done
if cc.closeRequested {
return func() {}
}
cc.wg.Add(1)
return cc.wg.Done
}
Expand All @@ -61,15 +75,16 @@ func (cc *CachedClient) Invalidate() {
// Close closes the CachedClient connection. It marks the connection for closure and waits asynchronously for ongoing
// requests to complete before closing the connection.
func (cc *CachedClient) Close() {
// Mark the connection for closure
if !cc.closeRequested.CompareAndSwap(false, true) {
return
}
func() {
cc.wgMu.Lock()
defer cc.wgMu.Unlock()
cc.closeRequested = true
}()

// Obtain the lock to ensure that any connection attempts have completed
cc.mu.RLock()
cc.connMu.RLock()
conn := cc.conn
cc.mu.RUnlock()
cc.connMu.RUnlock()

// If the initial connection attempt failed, conn will be nil
if conn == nil {
Expand Down Expand Up @@ -127,10 +142,9 @@ func (c *Cache) GetConnected(
connectFn func(string, Config, crypto.PublicKey, *CachedClient) (*grpc.ClientConn, error),
) (*CachedClient, error) {
client := &CachedClient{
address: address,
cfg: cfg,
closeRequested: atomic.NewBool(false),
cache: c,
address: address,
cfg: cfg,
cache: c,
}

// Note: PeekOrAdd does not "visit" the existing entry, so we need to call Get explicitly
Expand All @@ -145,8 +159,8 @@ func (c *Cache) GetConnected(
c.metrics.ConnectionAddedToPool()
}

client.mu.Lock()
defer client.mu.Unlock()
client.connMu.Lock()
defer client.connMu.Unlock()

// after getting the lock, check if the connection is still active
if client.conn != nil && client.conn.GetState() != connectivity.Shutdown {
Expand Down
32 changes: 13 additions & 19 deletions engine/access/rpc/connection/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,30 @@ import (
func TestCachedClientShutdown(t *testing.T) {
// Test that a completely uninitialized client can be closed without panics
t.Run("uninitialized client", func(t *testing.T) {
client := &CachedClient{
closeRequested: atomic.NewBool(false),
}
client := &CachedClient{}
client.Close()
assert.True(t, client.closeRequested.Load())
assert.True(t, client.CloseRequested())
})

// Test closing a client with no outstanding requests
// Close() should return quickly
t.Run("with no outstanding requests", func(t *testing.T) {
client := &CachedClient{
closeRequested: atomic.NewBool(false),
conn: setupGRPCServer(t),
conn: setupGRPCServer(t),
}

unittest.RequireReturnsBefore(t, func() {
client.Close()
}, 100*time.Millisecond, "client timed out closing connection")

assert.True(t, client.closeRequested.Load())
assert.True(t, client.CloseRequested())
})

// Test closing a client with outstanding requests waits for requests to complete
// Close() should block until the request completes
t.Run("with some outstanding requests", func(t *testing.T) {
client := &CachedClient{
closeRequested: atomic.NewBool(false),
conn: setupGRPCServer(t),
conn: setupGRPCServer(t),
}
done := client.AddRequest()

Expand All @@ -62,17 +58,17 @@ func TestCachedClientShutdown(t *testing.T) {
client.Close()
}, 100*time.Millisecond, "client timed out closing connection")

assert.True(t, client.closeRequested.Load())
assert.True(t, client.CloseRequested())
assert.True(t, doneCalled.Load())
})

// Test closing a client that is already closing does not block
// Close() should return immediately
t.Run("already closing", func(t *testing.T) {
client := &CachedClient{
closeRequested: atomic.NewBool(true), // close already requested
conn: setupGRPCServer(t),
conn: setupGRPCServer(t),
}
client.Close()
done := client.AddRequest()

doneCalled := atomic.NewBool(false)
Expand All @@ -89,23 +85,21 @@ func TestCachedClientShutdown(t *testing.T) {
client.Close()
}, 10*time.Millisecond, "client timed out closing connection")

assert.True(t, client.closeRequested.Load())
assert.True(t, client.CloseRequested())
assert.False(t, doneCalled.Load())
})

// Test closing a client that is locked during connection setup
// Close() should wait for the lock before shutting down
t.Run("connection setting up", func(t *testing.T) {
client := &CachedClient{
closeRequested: atomic.NewBool(false),
}
client := &CachedClient{}

// simulate an in-progress connection setup
client.mu.Lock()
client.connMu.Lock()

go func() {
// unlock after setting up the connection
defer client.mu.Unlock()
defer client.connMu.Unlock()

// pause before setting the connection to cause client.Close() to block
time.Sleep(100 * time.Millisecond)
Expand All @@ -117,7 +111,7 @@ func TestCachedClientShutdown(t *testing.T) {
client.Close()
}, 500*time.Millisecond, "client timed out closing connection")

assert.True(t, client.closeRequested.Load())
assert.True(t, client.CloseRequested())
assert.NotNil(t, client.conn)
})
}
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rpc/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ func TestEvictingCacheClients(t *testing.T) {

// Invalidate marks the connection for closure asynchronously, so give it some time to run
require.Eventually(t, func() bool {
return cachedClient.closeRequested.Load()
return cachedClient.CloseRequested()
}, 100*time.Millisecond, 10*time.Millisecond, "client timed out closing connection")

// Call a gRPC method on the client, requests should be blocked since the connection is invalidated
Expand Down
9 changes: 8 additions & 1 deletion ledger/complete/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,14 @@ Loop:
}
c.logger.Info().Msg("Finished draining trie update channel in compactor on shutdown")

// Don't wait for checkpointing to finish because it might take too long.
// Don't wait for checkpointing to finish if it takes more than 10ms because it might take too long.
// We wait at least a little bit to make the tests a lot more stable.
if !checkpointSem.TryAcquire(1) {
select {
case <-checkpointResultCh:
case <-time.After(10 * time.Millisecond):
}
}
}

// checkpoint creates checkpoint of tries snapshot,
Expand Down
6 changes: 5 additions & 1 deletion network/test/cohort2/unicast_authorization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,11 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_ReceiverHasNoSu
err = senderCon.Unicast(&libp2pmessage.TestMessage{
Text: string("hello"),
}, u.receiverID.NodeID)
require.NoError(u.T(), err)
if err != nil {
// It can happen that the receiver resets before the sender closes,
// in which case the error will be "stream reset"
require.ErrorContains(u.T(), err, "stream reset", "expected stream-related error when receiver has no subscription")
}
Comment thread
janezpodhostnik marked this conversation as resolved.

// wait for slashing violations consumer mock to invoke run func and close ch if expected method call happens
unittest.RequireCloseBefore(u.T(), u.waitCh, u.channelCloseDuration, "could close ch on time")
Expand Down
Loading