Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions commit/merkleroot/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,16 +554,46 @@ func (o observerImpl) ObserveLatestOnRampSeqNums(ctx context.Context) []pluginty
return nil
}

supportedSourceChains := mapset.NewSet(allSourceChains...).
oracleSupportedSourceChains := mapset.NewSet(allSourceChains...).
Intersect(supportedChains).ToSlice()

slices.Sort(supportedSourceChains)
slices.Sort(oracleSupportedSourceChains)

// If this oracle supports the destination chain, filter to only enabled source chains. I.e., valid lanes to dest.
// Otherwise, we try all supported source chains and rely on error handling for disabled lanes.
destChain := o.chainSupport.DestChain()
chainsToQuery := oracleSupportedSourceChains

supportsDestChain, err := o.chainSupport.SupportsDestChain(o.oracleID)
if err != nil {
lggr.Warnw("call to SupportsDestChain failed", "err", err)
// Continue with oracleSupportedSourceChains, error handling will filter disabled lanes
} else if supportsDestChain {
// Filter to only include enabled source chains (valid lanes to the destination)
sourceChainsCfg, cfgErr := o.ccipReader.GetOffRampSourceChainsConfig(ctx, oracleSupportedSourceChains)
if cfgErr != nil {
lggr.Warnw("call to GetOffRampSourceChainsConfig failed, falling back to all supported chains", "err", cfgErr)
// Continue with oracleSupportedSourceChains, error handling will filter disabled lanes
} else {
// Build set of enabled chains from the config
enabledChainsSet := mapset.NewSet[cciptypes.ChainSelector]()
for chain, cfg := range sourceChainsCfg {
if cfg.IsEnabled && chain != destChain {
enabledChainsSet.Add(chain)
}
}
// Intersect with supported chains to get the final list
chainsToQuery = mapset.NewSet(oracleSupportedSourceChains...).
Intersect(enabledChainsSet).ToSlice()
slices.Sort(chainsToQuery)
}
}

mu := &sync.Mutex{}
latestOnRampSeqNums := make([]plugintypes.SeqNumChain, 0, len(supportedSourceChains))
latestOnRampSeqNums := make([]plugintypes.SeqNumChain, 0, len(chainsToQuery))

wg := &sync.WaitGroup{}
for _, sourceChain := range supportedSourceChains {
for _, sourceChain := range chainsToQuery {
wg.Go(func() {
latestOnRampSeqNum, err := o.ccipReader.LatestMsgSeqNum(ctx, sourceChain)
if err != nil {
Expand Down Expand Up @@ -747,7 +777,7 @@ func (o observerImpl) ObserveRMNRemoteCfg(ctx context.Context) cciptypes.RemoteC

rmnRemoteCfg, err := o.ccipReader.GetRMNRemoteConfig(ctx)
if err != nil {
if errors.Is(err, readerpkg.ErrContractReaderNotFound) {
if errors.Is(err, readerpkg.ErrContractReaderNotFound) || isNoBindingsError(err) {
// destination chain not supported
return cciptypes.RemoteConfig{}
}
Expand Down
294 changes: 294 additions & 0 deletions commit/merkleroot/observation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
common_mock "github.com/smartcontractkit/chainlink-ccip/mocks/internal_/plugincommon"
reader_mock "github.com/smartcontractkit/chainlink-ccip/mocks/pkg/reader"
readerpkg_mock "github.com/smartcontractkit/chainlink-ccip/mocks/pkg/reader"
"github.com/smartcontractkit/chainlink-ccip/pkg/reader"
"github.com/smartcontractkit/chainlink-ccip/pluginconfig"
)

Expand Down Expand Up @@ -992,3 +993,296 @@ func (a signatureVerifierAlwaysTrue) VerifyReportSignatures(
_ context.Context, _ []cciptypes.RMNECDSASignature, _ cciptypes.RMNReport, _ []cciptypes.UnknownAddress) error {
return nil
}

func Test_ObserveLatestOnRampSeqNums(t *testing.T) {
const nodeID commontypes.OracleID = 1
destChain := cciptypes.ChainSelector(100)
sourceChain1 := cciptypes.ChainSelector(4)
sourceChain2 := cciptypes.ChainSelector(7)
sourceChain3 := cciptypes.ChainSelector(19)
allSourceChains := []cciptypes.ChainSelector{sourceChain1, sourceChain2, sourceChain3}

testCases := []struct {
name string
expResult []plugintypes.SeqNumChain
getDeps func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader)
}{
{
name: "Happy path - all chains enabled, oracle supports dest chain",
getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) {
chainSupport := common_mock.NewMockChainSupport(t)
chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil)
chainSupport.EXPECT().SupportedChains(nodeID).Return(
mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil)
chainSupport.EXPECT().DestChain().Return(destChain)
chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil)

ccipReader := reader_mock.NewMockCCIPReader(t)
ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return(
map[cciptypes.ChainSelector]reader.StaticSourceChainConfig{
sourceChain1: {IsEnabled: true},
sourceChain2: {IsEnabled: true},
sourceChain3: {IsEnabled: true},
}, nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain2).Return(cciptypes.SeqNum(200), nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain3).Return(cciptypes.SeqNum(300), nil)
return chainSupport, ccipReader
},
expResult: []plugintypes.SeqNumChain{
plugintypes.NewSeqNumChain(sourceChain1, 100),
plugintypes.NewSeqNumChain(sourceChain2, 200),
plugintypes.NewSeqNumChain(sourceChain3, 300),
},
},
{
name: "Some chains disabled - only enabled chains are queried",
getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) {
chainSupport := common_mock.NewMockChainSupport(t)
chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil)
chainSupport.EXPECT().SupportedChains(nodeID).Return(
mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil)
chainSupport.EXPECT().DestChain().Return(destChain)
chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil)

ccipReader := reader_mock.NewMockCCIPReader(t)
// sourceChain2 is disabled
ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return(
map[cciptypes.ChainSelector]reader.StaticSourceChainConfig{
sourceChain1: {IsEnabled: true},
sourceChain2: {IsEnabled: false},
sourceChain3: {IsEnabled: true},
}, nil)
// Only enabled chains should be queried
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain3).Return(cciptypes.SeqNum(300), nil)
return chainSupport, ccipReader
},
expResult: []plugintypes.SeqNumChain{
plugintypes.NewSeqNumChain(sourceChain1, 100),
plugintypes.NewSeqNumChain(sourceChain3, 300),
},
},
{
name: "All chains disabled - no chains queried",
getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) {
chainSupport := common_mock.NewMockChainSupport(t)
chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil)
chainSupport.EXPECT().SupportedChains(nodeID).Return(
mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil)
chainSupport.EXPECT().DestChain().Return(destChain)
chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil)

ccipReader := reader_mock.NewMockCCIPReader(t)
ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return(
map[cciptypes.ChainSelector]reader.StaticSourceChainConfig{
sourceChain1: {IsEnabled: false},
sourceChain2: {IsEnabled: false},
sourceChain3: {IsEnabled: false},
}, nil)
// No LatestMsgSeqNum calls expected
return chainSupport, ccipReader
},
expResult: []plugintypes.SeqNumChain{},
},
{
name: "Oracle does not support dest chain - falls back to all supported chains",
getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) {
chainSupport := common_mock.NewMockChainSupport(t)
chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil)
chainSupport.EXPECT().SupportedChains(nodeID).Return(
mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil)
chainSupport.EXPECT().DestChain().Return(destChain)
chainSupport.EXPECT().SupportsDestChain(nodeID).Return(false, nil)

ccipReader := reader_mock.NewMockCCIPReader(t)
// GetOffRampSourceChainsConfig should NOT be called
// All supported chains should be queried
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain2).Return(cciptypes.SeqNum(200), nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain3).Return(cciptypes.SeqNum(300), nil)
return chainSupport, ccipReader
},
expResult: []plugintypes.SeqNumChain{
plugintypes.NewSeqNumChain(sourceChain1, 100),
plugintypes.NewSeqNumChain(sourceChain2, 200),
plugintypes.NewSeqNumChain(sourceChain3, 300),
},
},
{
name: "SupportsDestChain errors - falls back to all supported chains",
getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) {
chainSupport := common_mock.NewMockChainSupport(t)
chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil)
chainSupport.EXPECT().SupportedChains(nodeID).Return(
mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil)
chainSupport.EXPECT().DestChain().Return(destChain)
chainSupport.EXPECT().SupportsDestChain(nodeID).Return(false, errors.New("some error"))

ccipReader := reader_mock.NewMockCCIPReader(t)
// GetOffRampSourceChainsConfig should NOT be called
// All supported chains should be queried
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain2).Return(cciptypes.SeqNum(200), nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain3).Return(cciptypes.SeqNum(300), nil)
return chainSupport, ccipReader
},
expResult: []plugintypes.SeqNumChain{
plugintypes.NewSeqNumChain(sourceChain1, 100),
plugintypes.NewSeqNumChain(sourceChain2, 200),
plugintypes.NewSeqNumChain(sourceChain3, 300),
},
},
{
name: "GetOffRampSourceChainsConfig errors - falls back to all supported chains",
getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) {
chainSupport := common_mock.NewMockChainSupport(t)
chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil)
chainSupport.EXPECT().SupportedChains(nodeID).Return(
mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil)
chainSupport.EXPECT().DestChain().Return(destChain)
chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil)

ccipReader := reader_mock.NewMockCCIPReader(t)
ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return(
nil, errors.New("failed to get source chains config"))
// Fallback: all supported chains should be queried
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain2).Return(cciptypes.SeqNum(200), nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain3).Return(cciptypes.SeqNum(300), nil)
return chainSupport, ccipReader
},
expResult: []plugintypes.SeqNumChain{
plugintypes.NewSeqNumChain(sourceChain1, 100),
plugintypes.NewSeqNumChain(sourceChain2, 200),
plugintypes.NewSeqNumChain(sourceChain3, 300),
},
},
{
name: "Dest chain excluded from enabled chains even if in config",
getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) {
chainSupport := common_mock.NewMockChainSupport(t)
chainSupport.EXPECT().KnownSourceChainsSlice().Return(
[]cciptypes.ChainSelector{sourceChain1, destChain}, nil)
chainSupport.EXPECT().SupportedChains(nodeID).Return(
mapset.NewSet(sourceChain1, destChain), nil)
chainSupport.EXPECT().DestChain().Return(destChain)
chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil)

ccipReader := reader_mock.NewMockCCIPReader(t)
// Even though destChain is marked as enabled, it should be excluded
ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return(
map[cciptypes.ChainSelector]reader.StaticSourceChainConfig{
sourceChain1: {IsEnabled: true},
destChain: {IsEnabled: true}, // Should be excluded
}, nil)
// Only sourceChain1 should be queried
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil)
return chainSupport, ccipReader
},
expResult: []plugintypes.SeqNumChain{
plugintypes.NewSeqNumChain(sourceChain1, 100),
},
},
{
name: "nil is returned when KnownSourceChainsSlice errors",
getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) {
chainSupport := common_mock.NewMockChainSupport(t)
chainSupport.EXPECT().KnownSourceChainsSlice().Return(nil, errors.New("some error"))
ccipReader := reader_mock.NewMockCCIPReader(t)
return chainSupport, ccipReader
},
expResult: nil,
},
{
name: "nil is returned when SupportedChains errors",
getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) {
chainSupport := common_mock.NewMockChainSupport(t)
chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil)
chainSupport.EXPECT().SupportedChains(nodeID).Return(nil, errors.New("some error"))
ccipReader := reader_mock.NewMockCCIPReader(t)
return chainSupport, ccipReader
},
expResult: nil,
},
{
name: "Enabled chains intersected with oracle supported chains",
getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) {
chainSupport := common_mock.NewMockChainSupport(t)
// Oracle only supports sourceChain1 and sourceChain2
chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil)
chainSupport.EXPECT().SupportedChains(nodeID).Return(
mapset.NewSet(sourceChain1, sourceChain2), nil) // Missing sourceChain3
chainSupport.EXPECT().DestChain().Return(destChain)
chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil)

ccipReader := reader_mock.NewMockCCIPReader(t)
// All chains are enabled in the config
ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return(
map[cciptypes.ChainSelector]reader.StaticSourceChainConfig{
sourceChain1: {IsEnabled: true},
sourceChain2: {IsEnabled: true},
sourceChain3: {IsEnabled: true}, // But oracle doesn't support this
}, nil)
// Only sourceChain1 and sourceChain2 should be queried (intersection)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain2).Return(cciptypes.SeqNum(200), nil)
return chainSupport, ccipReader
},
expResult: []plugintypes.SeqNumChain{
plugintypes.NewSeqNumChain(sourceChain1, 100),
plugintypes.NewSeqNumChain(sourceChain2, 200),
},
},
{
name: "LatestMsgSeqNum error for one chain - other chains still returned",
getDeps: func(t *testing.T) (*common_mock.MockChainSupport, *reader_mock.MockCCIPReader) {
chainSupport := common_mock.NewMockChainSupport(t)
chainSupport.EXPECT().KnownSourceChainsSlice().Return(allSourceChains, nil)
chainSupport.EXPECT().SupportedChains(nodeID).Return(
mapset.NewSet(sourceChain1, sourceChain2, sourceChain3), nil)
chainSupport.EXPECT().DestChain().Return(destChain)
chainSupport.EXPECT().SupportsDestChain(nodeID).Return(true, nil)

ccipReader := reader_mock.NewMockCCIPReader(t)
ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return(
map[cciptypes.ChainSelector]reader.StaticSourceChainConfig{
sourceChain1: {IsEnabled: true},
sourceChain2: {IsEnabled: true},
sourceChain3: {IsEnabled: true},
}, nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain1).Return(cciptypes.SeqNum(100), nil)
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain2).Return(
cciptypes.SeqNum(0), errors.New("rpc error"))
ccipReader.EXPECT().LatestMsgSeqNum(mock.Anything, sourceChain3).Return(cciptypes.SeqNum(300), nil)
return chainSupport, ccipReader
},
expResult: []plugintypes.SeqNumChain{
plugintypes.NewSeqNumChain(sourceChain1, 100),
plugintypes.NewSeqNumChain(sourceChain3, 300),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := t.Context()

chainSupport, ccipReader := tc.getDeps(t)
defer chainSupport.AssertExpectations(t)
defer ccipReader.AssertExpectations(t)

o := newObserverImpl(
logger.Test(t),
nil,
nodeID,
chainSupport,
ccipReader,
mocks.NewMessageHasher(),
)

result := o.ObserveLatestOnRampSeqNums(ctx)
assert.Equal(t, tc.expResult, result)
})
}
}
2 changes: 1 addition & 1 deletion commit/plugin_roledon_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestPlugin_RoleDonE2E_NoPrevOutcome(t *testing.T) {
OnRamp: clrand.RandomBytes(32),
}
}
deps.ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, s.sourceChains).Return(sourceChainsCfg, nil)
deps.ccipReader.EXPECT().GetOffRampSourceChainsConfig(mock.Anything, mock.Anything).Return(sourceChainsCfg, nil)

deps.priceReader.EXPECT().GetFeeQuoterTokenUpdates(mock.Anything, mock.Anything, s.destChain).Return(nil, nil)

Expand Down
Loading
Loading