diff --git a/.gitignore b/.gitignore index 86311b421..c0d5f334b 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,6 @@ indexer/secrets.toml .cache/ # Ignore the build output of the ccv development environment -build/devenv/ccv \ No newline at end of file +build/devenv/ccv + +coverage.out \ No newline at end of file diff --git a/integration/pkg/accessors/evm/evm_source_reader.go b/integration/pkg/accessors/evm/evm_source_reader.go index d9492250a..2fe21ca86 100644 --- a/integration/pkg/accessors/evm/evm_source_reader.go +++ b/integration/pkg/accessors/evm/evm_source_reader.go @@ -13,12 +13,13 @@ import ( "github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/gobindings/generated/latest/onramp" "github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/gobindings/generated/latest/rmn_remote" - "github.com/smartcontractkit/chainlink-ccv/integration/pkg/rmnremotereader" - "github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess" - "github.com/smartcontractkit/chainlink-ccv/protocol" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-evm/pkg/client" "github.com/smartcontractkit/chainlink-evm/pkg/heads" + + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/rmnremotereader" + "github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess" + "github.com/smartcontractkit/chainlink-ccv/protocol" ) // Compile-time checks to ensure SourceReader implements the SourceReader interface. @@ -36,6 +37,7 @@ type SourceReader struct { chainSelector protocol.ChainSelector lggr logger.Logger onRampABI *abi.ABI // Cached ABI to avoid re-parsing + onCriticalInvariant func(context.Context) } func NewEVMSourceReader( @@ -46,6 +48,7 @@ func NewEVMSourceReader( ccipMessageSentTopic string, chainSelector protocol.ChainSelector, lggr logger.Logger, + onCriticalInvariant func(context.Context), ) (chainaccess.SourceReader, error) { var errs []error appendIfNil := func(field any, fieldName string) { @@ -88,6 +91,11 @@ func NewEVMSourceReader( return nil, fmt.Errorf("failed to get OnRamp ABI: %w", err) } + onInvariant := onCriticalInvariant + if onInvariant == nil { + onInvariant = func(context.Context) {} + } + return &SourceReader{ chainClient: chainClient, headTracker: headTracker, @@ -98,6 +106,7 @@ func NewEVMSourceReader( chainSelector: chainSelector, lggr: lggr, onRampABI: onRampABI, + onCriticalInvariant: onInvariant, }, nil } @@ -156,6 +165,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to // Explicitly check for the expected number of topics if len(log.Topics) < 4 { + r.onCriticalInvariant(ctx) r.lggr.Errorw("CCIPMessageSent event has insufficient topics", "expected", 4, "actual", len(log.Topics), @@ -181,6 +191,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to event.Sender = sender err = r.onRampABI.UnpackIntoInterface(event, "CCIPMessageSent", log.Data) if err != nil { + r.onCriticalInvariant(ctx) r.lggr.Errorw("Failed to unpack CCIPMessageSent event payload", "error", err) continue // to next message } @@ -194,6 +205,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to // Check minimum receipt count: at least 1 CCV + executor + network fees = 3 receipts if len(event.Receipts) < 3 { + r.onCriticalInvariant(ctx) r.lggr.Errorw("Insufficient receipts. Expected at least 3 (1 CCV + executor + network fees)", "count", len(event.Receipts), "messageId", common.Bytes2Hex(event.MessageId[:])) @@ -224,6 +236,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to "messageId", common.Bytes2Hex(event.MessageId[:])) decodedMsg, err := protocol.DecodeMessage(event.EncodedMessage) if err != nil { + r.onCriticalInvariant(ctx) r.lggr.Errorw("Failed to decode message", "error", err, "rawMessage", event.EncodedMessage) continue // to next message } @@ -232,6 +245,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to // Validate that ccvAndExecutorHash is not zero - it's required if decodedMsg.CcvAndExecutorHash == (protocol.Bytes32{}) { + r.onCriticalInvariant(ctx) r.lggr.Errorw("ccvAndExecutorHash is zero in decoded message", "messageID", common.Bytes2Hex(event.MessageId[:]), "blockNumber", log.BlockNumber) @@ -239,23 +253,29 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to } if !decodedMsg.OnRampAddress.Equal(expectedSourceAddressBytes(r.onRampAddress)) { - r.lggr.Errorw("onRampAddress must match the value configured. This should never happen, if it does something is seriously wrong. Escalate immediately", "messageId", common.Bytes2Hex(event.MessageId[:])) - continue // to next message + r.onCriticalInvariant(ctx) + r.lggr.Fatalw("onRampAddress must match the value configured — critical invariant violated; escalate immediately", + "messageId", common.Bytes2Hex(event.MessageId[:])) + continue // ensure we never process this msg } if !decodedMsg.Sender.Equal(expectedSourceAddressBytes(event.Sender)) { - r.lggr.Errorw("sender must match the value emitted from the on-chain event. This should never happen.", "messageId", common.Bytes2Hex(event.Sender[:])) - continue // to next message + r.onCriticalInvariant(ctx) + r.lggr.Fatalw("sender must match the value emitted from the on-chain event. This should never happen.", "messageId", common.Bytes2Hex(event.Sender[:])) + continue // ensure we never process this msg } if decodedMsg.MustMessageID() != event.MessageId { - r.lggr.Errorw("computed messageID must match the value emitted from the on-chain event. This should never happen, if it does escalate immediately.", "messageId", common.Bytes2Hex(event.MessageId[:])) - continue // to the next message + r.onCriticalInvariant(ctx) + r.lggr.Fatalw("computed messageID must match the value emitted from the on-chain event — critical invariant violated; escalate immediately", + "messageId", common.Bytes2Hex(event.MessageId[:])) + continue // ensure we never process this msg } if decodedMsg.DestChainSelector != protocol.ChainSelector(event.DestChainSelector) { - r.lggr.Errorw("destination chain selector must match the value emited from the on-chain event. This should never happen", "messageId", common.Bytes2Hex(event.MessageId[:])) - continue // to the next message + r.onCriticalInvariant(ctx) + r.lggr.Fatalw("destination chain selector must match the value emitted from the on-chain event. This should never happen", "messageId", common.Bytes2Hex(event.MessageId[:])) + continue // ensure we never process this msg } allReceipts := receiptBlobsFromEvent(event.Receipts, event.VerifierBlobs) // Validate the receipt structure matches expectations @@ -269,11 +289,11 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to } results = append(results, protocol.MessageSentEvent{ - MessageID: protocol.Bytes32(event.MessageId), + MessageID: event.MessageId, Message: *decodedMsg, Receipts: allReceipts, // Keep original order from OnRamp event BlockNumber: log.BlockNumber, - TxHash: protocol.ByteSlice(log.TxHash.Bytes()), + TxHash: log.TxHash.Bytes(), }) } return results, nil diff --git a/integration/pkg/accessors/evm/factory.go b/integration/pkg/accessors/evm/factory.go index 0d669dd86..30d5bc59c 100644 --- a/integration/pkg/accessors/evm/factory.go +++ b/integration/pkg/accessors/evm/factory.go @@ -125,6 +125,7 @@ func (f *factory) GetAccessor(ctx context.Context, chainSelector protocol.ChainS onramp.OnRampCCIPMessageSent{}.Topic().Hex(), chainSelector, f.lggr, + nil, ) if err != nil { return nil, fmt.Errorf("failed to create EVM source reader: %w", err) diff --git a/integration/pkg/constructors/committee_verifier.go b/integration/pkg/constructors/committee_verifier.go index dd4edea8c..1486254e0 100644 --- a/integration/pkg/constructors/committee_verifier.go +++ b/integration/pkg/constructors/committee_verifier.go @@ -2,6 +2,7 @@ package constructors import ( "bytes" + "context" "fmt" "time" @@ -91,6 +92,10 @@ func NewVerificationCoordinator( continue } + chainMetrics := verifierMonitoring.Metrics().With( + "chain_selector", fmt.Sprintf("%d", sel), + "chain_name", sel.ChainName(), + ) sourceReader, err := evm.NewEVMSourceReader( chain.Client(), chain.HeadTracker(), @@ -100,7 +105,11 @@ func NewVerificationCoordinator( // TODO: does this need to be configurable? onramp.OnRampCCIPMessageSent{}.Topic().Hex(), sel, - logger.With(lggr, "component", "SourceReader", "chainID", sel)) + logger.With(lggr, "component", "SourceReader", "chainID", sel), + func(ctx context.Context) { + chainMetrics.IncrementCriticalSourceInvariantViolations(ctx) + }, + ) if err != nil { lggr.Errorw("Failed to create source reader.", "error", err, "chainID", sel) return nil, fmt.Errorf("failed to create source reader: %w", err) diff --git a/verifier/internal/mocks/mock_MetricLabeler.go b/verifier/internal/mocks/mock_MetricLabeler.go index 2380469b1..5615efa70 100644 --- a/verifier/internal/mocks/mock_MetricLabeler.go +++ b/verifier/internal/mocks/mock_MetricLabeler.go @@ -92,6 +92,39 @@ func (_c *MockMetricLabeler_IncrementActiveRequestsCounter_Call) RunAndReturn(ru return _c } +// IncrementCriticalSourceInvariantViolations provides a mock function with given fields: ctx +func (_m *MockMetricLabeler) IncrementCriticalSourceInvariantViolations(ctx context.Context) { + _m.Called(ctx) +} + +// MockMetricLabeler_IncrementCriticalSourceInvariantViolations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IncrementCriticalSourceInvariantViolations' +type MockMetricLabeler_IncrementCriticalSourceInvariantViolations_Call struct { + *mock.Call +} + +// IncrementCriticalSourceInvariantViolations is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockMetricLabeler_Expecter) IncrementCriticalSourceInvariantViolations(ctx interface{}) *MockMetricLabeler_IncrementCriticalSourceInvariantViolations_Call { + return &MockMetricLabeler_IncrementCriticalSourceInvariantViolations_Call{Call: _e.mock.On("IncrementCriticalSourceInvariantViolations", ctx)} +} + +func (_c *MockMetricLabeler_IncrementCriticalSourceInvariantViolations_Call) Run(run func(ctx context.Context)) *MockMetricLabeler_IncrementCriticalSourceInvariantViolations_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockMetricLabeler_IncrementCriticalSourceInvariantViolations_Call) Return() *MockMetricLabeler_IncrementCriticalSourceInvariantViolations_Call { + _c.Call.Return() + return _c +} + +func (_c *MockMetricLabeler_IncrementCriticalSourceInvariantViolations_Call) RunAndReturn(run func(context.Context)) *MockMetricLabeler_IncrementCriticalSourceInvariantViolations_Call { + _c.Run(run) + return _c +} + // IncrementHTTPRequestCounter provides a mock function with given fields: ctx func (_m *MockMetricLabeler) IncrementHTTPRequestCounter(ctx context.Context) { _m.Called(ctx) diff --git a/verifier/pkg/helpers_test.go b/verifier/pkg/helpers_test.go index bb0f2df1c..fb0f117cd 100644 --- a/verifier/pkg/helpers_test.go +++ b/verifier/pkg/helpers_test.go @@ -126,6 +126,7 @@ func (m *noopMetricLabeler) RecordTaskVerificationQueueSize(ctx context.Context, func (m *noopMetricLabeler) RecordStorageWriteQueueSize(ctx context.Context, size int64) {} func (m *noopMetricLabeler) IncrementStorageWriteErrors(ctx context.Context) {} func (m *noopMetricLabeler) IncrementTaskVerificationPermanentErrors(ctx context.Context) {} +func (m *noopMetricLabeler) IncrementCriticalSourceInvariantViolations(ctx context.Context) {} func (m *noopMetricLabeler) RecordSourceChainLatestBlock(ctx context.Context, blockNum int64) {} func (m *noopMetricLabeler) RecordSourceChainFinalizedBlock(ctx context.Context, blockNum int64) {} func (m *noopMetricLabeler) RecordSourceChainSafeBlock(ctx context.Context, blockNum int64) {} diff --git a/verifier/pkg/monitoring/metrics.go b/verifier/pkg/monitoring/metrics.go index c2f2cb9ec..6fe39ed96 100644 --- a/verifier/pkg/monitoring/metrics.go +++ b/verifier/pkg/monitoring/metrics.go @@ -38,6 +38,8 @@ type VerifierMetrics struct { taskVerificationPermanentErrors metric.Int64Counter storageWriteErrorsCounter metric.Int64Counter + criticalSourceInvariantViolations metric.Int64Counter + // Heartbeat Tracking heartbeatsSentCounter metric.Int64Counter heartbeatsFailedCounter metric.Int64Counter @@ -277,6 +279,14 @@ func InitMetrics() (*VerifierMetrics, error) { return nil, fmt.Errorf("failed to register task verification permanent errors counter: %w", err) } + vm.criticalSourceInvariantViolations, err = beholder.GetMeter().Int64Counter( + "verifier_critical_source_invariant_violations_total", + metric.WithDescription("Encoded CCIP message disagrees with configured source chain observation (e.g. onRamp address or message id vs log)"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register critical source invariant violations counter: %w", err) + } + // HTTP API Metrics vm.httpActiveRequestsUpDownCounter, err = beholder.GetMeter().Int64UpDownCounter( "verifier_http_active_requests", @@ -433,6 +443,11 @@ func (v *VerifierMetricLabeler) IncrementTaskVerificationPermanentErrors(ctx con v.vm.taskVerificationPermanentErrors.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } +func (v *VerifierMetricLabeler) IncrementCriticalSourceInvariantViolations(ctx context.Context) { + otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes() + v.vm.criticalSourceInvariantViolations.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + func (v *VerifierMetricLabeler) IncrementHeartbeatsSent(ctx context.Context) { otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes() v.vm.heartbeatsSentCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) diff --git a/verifier/pkg/monitoring/monitoring.go b/verifier/pkg/monitoring/monitoring.go index 91ecf52db..4f381068d 100644 --- a/verifier/pkg/monitoring/monitoring.go +++ b/verifier/pkg/monitoring/monitoring.go @@ -144,6 +144,8 @@ func (f *FakeVerifierMetricLabeler) IncrementStorageWriteErrors(context.Context) func (f *FakeVerifierMetricLabeler) IncrementTaskVerificationPermanentErrors(context.Context) {} +func (f *FakeVerifierMetricLabeler) IncrementCriticalSourceInvariantViolations(context.Context) {} + func (f *FakeVerifierMetricLabeler) RecordSourceChainLatestBlock(_ context.Context, blockNum int64) { f.SourceChainLatestBLock.Store(blockNum) } diff --git a/verifier/pkg/vtypes/interfaces.go b/verifier/pkg/vtypes/interfaces.go index 429f2e78a..0358063d8 100644 --- a/verifier/pkg/vtypes/interfaces.go +++ b/verifier/pkg/vtypes/interfaces.go @@ -97,6 +97,9 @@ type MetricLabeler interface { // IncrementTaskVerificationPermanentErrors increments the counter for non-retryable verification errors. IncrementTaskVerificationPermanentErrors(ctx context.Context) + // IncrementCriticalSourceInvariantViolations increments when encoded source-chain data disagrees with configured on-chain facts (e.g. onRamp in message vs observed contract). + IncrementCriticalSourceInvariantViolations(ctx context.Context) + // Heartbeat tracking // IncrementHeartbeatsSent increments the counter for successfully sent heartbeats. diff --git a/verifier/testutil/metric_labeler.go b/verifier/testutil/metric_labeler.go index 27362d5f3..451e7b481 100644 --- a/verifier/testutil/metric_labeler.go +++ b/verifier/testutil/metric_labeler.go @@ -22,6 +22,7 @@ func (n *NoopMetricLabeler) RecordTaskVerificationQueueSize(_ context.Context, _ func (n *NoopMetricLabeler) RecordStorageWriteQueueSize(_ context.Context, _ int64) {} func (n *NoopMetricLabeler) IncrementStorageWriteErrors(_ context.Context) {} func (n *NoopMetricLabeler) IncrementTaskVerificationPermanentErrors(_ context.Context) {} +func (n *NoopMetricLabeler) IncrementCriticalSourceInvariantViolations(_ context.Context) {} func (n *NoopMetricLabeler) IncrementHeartbeatsSent(_ context.Context) {} func (n *NoopMetricLabeler) IncrementHeartbeatsFailed(_ context.Context) {} func (n *NoopMetricLabeler) RecordHeartbeatDuration(_ context.Context, _ time.Duration) {}