diff --git a/commit/merkleroot/observation.go b/commit/merkleroot/observation.go index 5b4009368e..5b302d89a3 100644 --- a/commit/merkleroot/observation.go +++ b/commit/merkleroot/observation.go @@ -554,16 +554,46 @@ func (o observerImpl) ObserveLatestOnRampSeqNums(ctx context.Context) []pluginty return nil } - supportedSourceChains := mapset.NewSet(allSourceChains...). + oracleSupportedSourceChains := mapset.NewSet(allSourceChains...). Intersect(supportedChains).ToSlice() - slices.Sort(supportedSourceChains) + slices.Sort(oracleSupportedSourceChains) + + // If this oracle supports the destination chain, filter to only enabled source chains. I.e., valid lanes to dest. + // Otherwise, we try all supported source chains and rely on error handling for disabled lanes. + destChain := o.chainSupport.DestChain() + chainsToQuery := oracleSupportedSourceChains + + supportsDestChain, err := o.chainSupport.SupportsDestChain(o.oracleID) + if err != nil { + lggr.Warnw("call to SupportsDestChain failed", "err", err) + // Continue with oracleSupportedSourceChains, error handling will filter disabled lanes + } else if supportsDestChain { + // Filter to only include enabled source chains (valid lanes to the destination) + sourceChainsCfg, cfgErr := o.ccipReader.GetOffRampSourceChainsConfig(ctx, oracleSupportedSourceChains) + if cfgErr != nil { + lggr.Warnw("call to GetOffRampSourceChainsConfig failed, falling back to all supported chains", "err", cfgErr) + // Continue with oracleSupportedSourceChains, error handling will filter disabled lanes + } else { + // Build set of enabled chains from the config + enabledChainsSet := mapset.NewSet[cciptypes.ChainSelector]() + for chain, cfg := range sourceChainsCfg { + if cfg.IsEnabled && chain != destChain { + enabledChainsSet.Add(chain) + } + } + // Intersect with supported chains to get the final list + chainsToQuery = mapset.NewSet(oracleSupportedSourceChains...). + Intersect(enabledChainsSet).ToSlice() + slices.Sort(chainsToQuery) + } + } mu := &sync.Mutex{} - latestOnRampSeqNums := make([]plugintypes.SeqNumChain, 0, len(supportedSourceChains)) + latestOnRampSeqNums := make([]plugintypes.SeqNumChain, 0, len(chainsToQuery)) wg := &sync.WaitGroup{} - for _, sourceChain := range supportedSourceChains { + for _, sourceChain := range chainsToQuery { wg.Go(func() { latestOnRampSeqNum, err := o.ccipReader.LatestMsgSeqNum(ctx, sourceChain) if err != nil { @@ -747,7 +777,7 @@ func (o observerImpl) ObserveRMNRemoteCfg(ctx context.Context) cciptypes.RemoteC rmnRemoteCfg, err := o.ccipReader.GetRMNRemoteConfig(ctx) if err != nil { - if errors.Is(err, readerpkg.ErrContractReaderNotFound) { + if errors.Is(err, readerpkg.ErrContractReaderNotFound) || isNoBindingsError(err) { // destination chain not supported return cciptypes.RemoteConfig{} } diff --git a/commit/merkleroot/observation_test.go b/commit/merkleroot/observation_test.go index 7ce11d505a..c43f54e475 100644 --- a/commit/merkleroot/observation_test.go +++ b/commit/merkleroot/observation_test.go @@ -37,6 +37,7 @@ import ( common_mock "github.com/smartcontractkit/chainlink-ccip/mocks/internal_/plugincommon" reader_mock "github.com/smartcontractkit/chainlink-ccip/mocks/pkg/reader" readerpkg_mock "github.com/smartcontractkit/chainlink-ccip/mocks/pkg/reader" + "github.com/smartcontractkit/chainlink-ccip/pkg/reader" "github.com/smartcontractkit/chainlink-ccip/pluginconfig" ) @@ -992,3 +993,296 @@ func (a signatureVerifierAlwaysTrue) VerifyReportSignatures( _ context.Context, _ []cciptypes.RMNECDSASignature, _ cciptypes.RMNReport, _ []cciptypes.UnknownAddress) error { return nil } + +func Test_ObserveLatestOnRampSeqNums(t *testing.T) { + const nodeID commontypes.OracleID = 1 + destChain := cciptypes.ChainSelector(100) + sourceChain1 := cciptypes.ChainSelector(4) + sourceChain2 := cciptypes.ChainSelector(7) + sourceChain3 := cciptypes.ChainSelector(19) + allSourceChains := []cciptypes.ChainSelector{sourceChain1, sourceChain2, sourceChain3} + + testCases := []struct { + name string + expResult []plugintypes.SeqNumChain + getDeps func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) + }{ + { + name: "Happy path - all chains enabled, oracle supports dest chain", + getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) { + chainSupport := common_mock.NewMockChainSupport(t) + chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil) + chainSupport.EXPECT().SupportedChains(nodeID).Return( + mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil) + chainSupport.EXPECT().DestChain().Return(destChain) + chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil) + + ccipReader := reader_mock.NewMockCCIPReader(t) + ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return( + map[cciptypes.ChainSelector]reader.StaticSourceChainConfig{ + sourceChain1: {IsEnabled: true}, + sourceChain2: {IsEnabled: true}, + sourceChain3: {IsEnabled: true}, + }, nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain2).Return(cciptypes.SeqNum(200), nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain3).Return(cciptypes.SeqNum(300), nil) + return chainSupport, ccipReader + }, + expResult: []plugintypes.SeqNumChain{ + plugintypes.NewSeqNumChain(sourceChain1, 100), + plugintypes.NewSeqNumChain(sourceChain2, 200), + plugintypes.NewSeqNumChain(sourceChain3, 300), + }, + }, + { + name: "Some chains disabled - only enabled chains are queried", + getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) { + chainSupport := common_mock.NewMockChainSupport(t) + chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil) + chainSupport.EXPECT().SupportedChains(nodeID).Return( + mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil) + chainSupport.EXPECT().DestChain().Return(destChain) + chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil) + + ccipReader := reader_mock.NewMockCCIPReader(t) + // sourceChain2 is disabled + ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return( + map[cciptypes.ChainSelector]reader.StaticSourceChainConfig{ + sourceChain1: {IsEnabled: true}, + sourceChain2: {IsEnabled: false}, + sourceChain3: {IsEnabled: true}, + }, nil) + // Only enabled chains should be queried + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain3).Return(cciptypes.SeqNum(300), nil) + return chainSupport, ccipReader + }, + expResult: []plugintypes.SeqNumChain{ + plugintypes.NewSeqNumChain(sourceChain1, 100), + plugintypes.NewSeqNumChain(sourceChain3, 300), + }, + }, + { + name: "All chains disabled - no chains queried", + getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) { + chainSupport := common_mock.NewMockChainSupport(t) + chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil) + chainSupport.EXPECT().SupportedChains(nodeID).Return( + mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil) + chainSupport.EXPECT().DestChain().Return(destChain) + chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil) + + ccipReader := reader_mock.NewMockCCIPReader(t) + ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return( + map[cciptypes.ChainSelector]reader.StaticSourceChainConfig{ + sourceChain1: {IsEnabled: false}, + sourceChain2: {IsEnabled: false}, + sourceChain3: {IsEnabled: false}, + }, nil) + // No LatestMsgSeqNum calls expected + return chainSupport, ccipReader + }, + expResult: []plugintypes.SeqNumChain{}, + }, + { + name: "Oracle does not support dest chain - falls back to all supported chains", + getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) { + chainSupport := common_mock.NewMockChainSupport(t) + chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil) + chainSupport.EXPECT().SupportedChains(nodeID).Return( + mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil) + chainSupport.EXPECT().DestChain().Return(destChain) + chainSupport.EXPECT().SupportsDestChain(nodeID).Return(false, nil) + + ccipReader := reader_mock.NewMockCCIPReader(t) + // GetOffRampSourceChainsConfig should NOT be called + // All supported chains should be queried + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain2).Return(cciptypes.SeqNum(200), nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain3).Return(cciptypes.SeqNum(300), nil) + return chainSupport, ccipReader + }, + expResult: []plugintypes.SeqNumChain{ + plugintypes.NewSeqNumChain(sourceChain1, 100), + plugintypes.NewSeqNumChain(sourceChain2, 200), + plugintypes.NewSeqNumChain(sourceChain3, 300), + }, + }, + { + name: "SupportsDestChain errors - falls back to all supported chains", + getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) { + chainSupport := common_mock.NewMockChainSupport(t) + chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil) + chainSupport.EXPECT().SupportedChains(nodeID).Return( + mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil) + chainSupport.EXPECT().DestChain().Return(destChain) + chainSupport.EXPECT().SupportsDestChain(nodeID).Return(false, errors.New("some error")) + + ccipReader := reader_mock.NewMockCCIPReader(t) + // GetOffRampSourceChainsConfig should NOT be called + // All supported chains should be queried + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain2).Return(cciptypes.SeqNum(200), nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain3).Return(cciptypes.SeqNum(300), nil) + return chainSupport, ccipReader + }, + expResult: []plugintypes.SeqNumChain{ + plugintypes.NewSeqNumChain(sourceChain1, 100), + plugintypes.NewSeqNumChain(sourceChain2, 200), + plugintypes.NewSeqNumChain(sourceChain3, 300), + }, + }, + { + name: "GetOffRampSourceChainsConfig errors - falls back to all supported chains", + getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) { + chainSupport := common_mock.NewMockChainSupport(t) + chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil) + chainSupport.EXPECT().SupportedChains(nodeID).Return( + mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil) + chainSupport.EXPECT().DestChain().Return(destChain) + chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil) + + ccipReader := reader_mock.NewMockCCIPReader(t) + ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return( + nil, errors.New("failed to get source chains config")) + // Fallback: all supported chains should be queried + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain2).Return(cciptypes.SeqNum(200), nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain3).Return(cciptypes.SeqNum(300), nil) + return chainSupport, ccipReader + }, + expResult: []plugintypes.SeqNumChain{ + plugintypes.NewSeqNumChain(sourceChain1, 100), + plugintypes.NewSeqNumChain(sourceChain2, 200), + plugintypes.NewSeqNumChain(sourceChain3, 300), + }, + }, + { + name: "Dest chain excluded from enabled chains even if in config", + getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) { + chainSupport := common_mock.NewMockChainSupport(t) + chainSupport.EXPECT().KnownSourceChainsSlice().Return( + []cciptypes.ChainSelector{sourceChain1, destChain}, nil) + chainSupport.EXPECT().SupportedChains(nodeID).Return( + mapset.NewSet(sourceChain1, destChain), nil) + chainSupport.EXPECT().DestChain().Return(destChain) + chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil) + + ccipReader := reader_mock.NewMockCCIPReader(t) + // Even though destChain is marked as enabled, it should be excluded + ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return( + map[cciptypes.ChainSelector]reader.StaticSourceChainConfig{ + sourceChain1: {IsEnabled: true}, + destChain: {IsEnabled: true}, // Should be excluded + }, nil) + // Only sourceChain1 should be queried + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil) + return chainSupport, ccipReader + }, + expResult: []plugintypes.SeqNumChain{ + plugintypes.NewSeqNumChain(sourceChain1, 100), + }, + }, + { + name: "nil is returned when KnownSourceChainsSlice errors", + getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) { + chainSupport := common_mock.NewMockChainSupport(t) + chainSupport.EXPECT().KnownSourceChainsSlice().Return(nil, errors.New("some error")) + ccipReader := reader_mock.NewMockCCIPReader(t) + return chainSupport, ccipReader + }, + expResult: nil, + }, + { + name: "nil is returned when SupportedChains errors", + getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) { + chainSupport := common_mock.NewMockChainSupport(t) + chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil) + chainSupport.EXPECT().SupportedChains(nodeID).Return(nil, errors.New("some error")) + ccipReader := reader_mock.NewMockCCIPReader(t) + return chainSupport, ccipReader + }, + expResult: nil, + }, + { + name: "Enabled chains intersected with oracle supported chains", + getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) { + chainSupport := common_mock.NewMockChainSupport(t) + // Oracle only supports sourceChain1 and sourceChain2 + chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil) + chainSupport.EXPECT().SupportedChains(nodeID).Return( + mapset.NewSet(sourceChain1, sourceChain2), nil) // Missing sourceChain3 + chainSupport.EXPECT().DestChain().Return(destChain) + chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil) + + ccipReader := reader_mock.NewMockCCIPReader(t) + // All chains are enabled in the config + ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return( + map[cciptypes.ChainSelector]reader.StaticSourceChainConfig{ + sourceChain1: {IsEnabled: true}, + sourceChain2: {IsEnabled: true}, + sourceChain3: {IsEnabled: true}, // But oracle doesn't support this + }, nil) + // Only sourceChain1 and sourceChain2 should be queried (intersection) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain2).Return(cciptypes.SeqNum(200), nil) + return chainSupport, ccipReader + }, + expResult: []plugintypes.SeqNumChain{ + plugintypes.NewSeqNumChain(sourceChain1, 100), + plugintypes.NewSeqNumChain(sourceChain2, 200), + }, + }, + { + name: "LatestMsgSeqNum error for one chain - other chains still returned", + getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) { + chainSupport := common_mock.NewMockChainSupport(t) + chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil) + chainSupport.EXPECT().SupportedChains(nodeID).Return( + mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil) + chainSupport.EXPECT().DestChain().Return(destChain) + chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil) + + ccipReader := reader_mock.NewMockCCIPReader(t) + ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return( + map[cciptypes.ChainSelector]reader.StaticSourceChainConfig{ + sourceChain1: {IsEnabled: true}, + sourceChain2: {IsEnabled: true}, + sourceChain3: {IsEnabled: true}, + }, nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain2).Return( + cciptypes.SeqNum(0), errors.New("rpc error")) + ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain3).Return(cciptypes.SeqNum(300), nil) + return chainSupport, ccipReader + }, + expResult: []plugintypes.SeqNumChain{ + plugintypes.NewSeqNumChain(sourceChain1, 100), + plugintypes.NewSeqNumChain(sourceChain3, 300), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := t.Context() + + chainSupport, ccipReader := tc.getDeps(t) + defer chainSupport.AssertExpectations(t) + defer ccipReader.AssertExpectations(t) + + o := newObserverImpl( + logger.Test(t), + nil, + nodeID, + chainSupport, + ccipReader, + mocks.NewMessageHasher(), + ) + + result := o.ObserveLatestOnRampSeqNums(ctx) + assert.Equal(t, tc.expResult, result) + }) + } +} diff --git a/commit/plugin_roledon_e2e_test.go b/commit/plugin_roledon_e2e_test.go index 1af4e60fbb..10362a65f1 100644 --- a/commit/plugin_roledon_e2e_test.go +++ b/commit/plugin_roledon_e2e_test.go @@ -128,7 +128,7 @@ func TestPlugin_RoleDonE2E_NoPrevOutcome(t *testing.T) { OnRamp: clrand.RandomBytes(32), } } - deps.ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, s.sourceChains).Return(sourceChainsCfg, nil) + deps.ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return(sourceChainsCfg, nil) deps.priceReader.EXPECT().GetFeeQuoterTokenUpdates(mock.Anything, mock.Anything, s.destChain).Return(nil, nil) diff --git a/pkg/chainaccessor/default_accessor.go b/pkg/chainaccessor/default_accessor.go index ba7e62b34d..0d5ecbac12 100644 --- a/pkg/chainaccessor/default_accessor.go +++ b/pkg/chainaccessor/default_accessor.go @@ -84,7 +84,7 @@ func (l *DefaultAccessor) GetAllConfigsLegacy( lggr := logger.With( l.lggr, "destChainSelector", destChainSelector, - "sourceChainSelector", l.chainSelector, + "sourceChainSelectors", sourceChainSelectors, ) var configRequests contractreader.ExtendedBatchGetLatestValuesRequest @@ -360,7 +360,6 @@ func (l *DefaultAccessor) MsgsBetweenSeqNums( lggr.Infow("queried messages between sequence numbers", "numMsgs", len(seq), - "sourceChainSelector", l.chainSelector, "seqNumRange", seqNumRange.String(), ) @@ -400,7 +399,6 @@ func (l *DefaultAccessor) MsgsBetweenSeqNums( "seqNum.MsgID", slicelib.Map(msgsWithoutDataField, func(m cciptypes.Message) string { return fmt.Sprintf("%d.%d", m.Header.SequenceNumber, m.Header.MessageID) }), - "sourceChainSelector", l.chainSelector, "seqNumRange", seqNumRange.String(), ) @@ -440,7 +438,6 @@ func (l *DefaultAccessor) LatestMessageTo( lggr.Debugw("queried latest message from source", "numMsgs", len(seq), - "sourceChainSelector", l.chainSelector, "destChainSelector", destChainSelector, ) if len(seq) > 1 { diff --git a/pkg/reader/observed_ccip.go b/pkg/reader/observed_ccip.go index 0600aeccc5..41e0896ba3 100644 --- a/pkg/reader/observed_ccip.go +++ b/pkg/reader/observed_ccip.go @@ -46,6 +46,10 @@ var ( float64(time.Second), float64(2 * time.Second), float64(5 * time.Second), + float64(10 * time.Second), + float64(30 * time.Second), + float64(60 * time.Second), + float64(120 * time.Second), }, }, []string{"chainFamily", "chainID", "query"},