Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ indexer/secrets.toml
.cache/

# Ignore the build output of the ccv development environment
build/devenv/ccv
build/devenv/ccv

coverage.out
46 changes: 33 additions & 13 deletions integration/pkg/accessors/evm/evm_source_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (

"github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/gobindings/generated/latest/onramp"
"github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/gobindings/generated/latest/rmn_remote"
"github.com/smartcontractkit/chainlink-ccv/integration/pkg/rmnremotereader"
"github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess"
"github.com/smartcontractkit/chainlink-ccv/protocol"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-evm/pkg/client"
"github.com/smartcontractkit/chainlink-evm/pkg/heads"

"github.com/smartcontractkit/chainlink-ccv/integration/pkg/rmnremotereader"
"github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess"
"github.com/smartcontractkit/chainlink-ccv/protocol"
)

// Compile-time checks to ensure SourceReader implements the SourceReader interface.
Expand All @@ -36,6 +37,7 @@ type SourceReader struct {
chainSelector protocol.ChainSelector
lggr logger.Logger
onRampABI *abi.ABI // Cached ABI to avoid re-parsing
onCriticalInvariant func(context.Context)
}

func NewEVMSourceReader(
Expand All @@ -46,6 +48,7 @@ func NewEVMSourceReader(
ccipMessageSentTopic string,
chainSelector protocol.ChainSelector,
lggr logger.Logger,
onCriticalInvariant func(context.Context),
) (chainaccess.SourceReader, error) {
var errs []error
appendIfNil := func(field any, fieldName string) {
Expand Down Expand Up @@ -88,6 +91,11 @@ func NewEVMSourceReader(
return nil, fmt.Errorf("failed to get OnRamp ABI: %w", err)
}

onInvariant := onCriticalInvariant
if onInvariant == nil {
onInvariant = func(context.Context) {}
}

return &SourceReader{
chainClient: chainClient,
headTracker: headTracker,
Expand All @@ -98,6 +106,7 @@ func NewEVMSourceReader(
chainSelector: chainSelector,
lggr: lggr,
onRampABI: onRampABI,
onCriticalInvariant: onInvariant,
}, nil
}

Expand Down Expand Up @@ -156,6 +165,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to

// Explicitly check for the expected number of topics
if len(log.Topics) < 4 {
r.onCriticalInvariant(ctx)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding it to all errors, but some error logs are promoted to fatals to make them failing loud. The ones upgraded are the ones that would be the result from a malicious ramp. The ones that are still just skipping (but do send a metric!) are wrong payloads but not in a clearly malicious way.

Let me know if everything should be fatal.

r.lggr.Errorw("CCIPMessageSent event has insufficient topics",
"expected", 4,
"actual", len(log.Topics),
Expand All @@ -181,6 +191,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to
event.Sender = sender
err = r.onRampABI.UnpackIntoInterface(event, "CCIPMessageSent", log.Data)
if err != nil {
r.onCriticalInvariant(ctx)
r.lggr.Errorw("Failed to unpack CCIPMessageSent event payload", "error", err)
continue // to next message
}
Expand All @@ -194,6 +205,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to

// Check minimum receipt count: at least 1 CCV + executor + network fees = 3 receipts
if len(event.Receipts) < 3 {
r.onCriticalInvariant(ctx)
r.lggr.Errorw("Insufficient receipts. Expected at least 3 (1 CCV + executor + network fees)",
"count", len(event.Receipts),
"messageId", common.Bytes2Hex(event.MessageId[:]))
Expand Down Expand Up @@ -224,6 +236,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to
"messageId", common.Bytes2Hex(event.MessageId[:]))
decodedMsg, err := protocol.DecodeMessage(event.EncodedMessage)
if err != nil {
r.onCriticalInvariant(ctx)
r.lggr.Errorw("Failed to decode message", "error", err, "rawMessage", event.EncodedMessage)
continue // to next message
}
Expand All @@ -232,30 +245,37 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to

// Validate that ccvAndExecutorHash is not zero - it's required
if decodedMsg.CcvAndExecutorHash == (protocol.Bytes32{}) {
r.onCriticalInvariant(ctx)
r.lggr.Errorw("ccvAndExecutorHash is zero in decoded message",
"messageID", common.Bytes2Hex(event.MessageId[:]),
"blockNumber", log.BlockNumber)
continue // to next message
}

if !decodedMsg.OnRampAddress.Equal(expectedSourceAddressBytes(r.onRampAddress)) {
r.lggr.Errorw("onRampAddress must match the value configured. This should never happen, if it does something is seriously wrong. Escalate immediately", "messageId", common.Bytes2Hex(event.MessageId[:]))
continue // to next message
r.onCriticalInvariant(ctx)
r.lggr.Fatalw("onRampAddress must match the value configured — critical invariant violated; escalate immediately",
"messageId", common.Bytes2Hex(event.MessageId[:]))
continue // ensure we never process this msg
Comment on lines 255 to +259
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using lggr.Fatalw inside FetchMessageSentEvents is risky because fatal logging typically terminates the process; this would turn a single bad event into a full service outage, and the subsequent continue becomes effectively dead code. Prefer returning an error (so the caller can decide), or logging at Error/Panic with clear context and skipping the message.

Copilot uses AI. Check for mistakes.
}

if !decodedMsg.Sender.Equal(expectedSourceAddressBytes(event.Sender)) {
r.lggr.Errorw("sender must match the value emitted from the on-chain event. This should never happen.", "messageId", common.Bytes2Hex(event.Sender[:]))
continue // to next message
r.onCriticalInvariant(ctx)
r.lggr.Fatalw("sender must match the value emitted from the on-chain event. This should never happen.", "messageId", common.Bytes2Hex(event.Sender[:]))
continue // ensure we never process this msg
}

if decodedMsg.MustMessageID() != event.MessageId {
r.lggr.Errorw("computed messageID must match the value emitted from the on-chain event. This should never happen, if it does escalate immediately.", "messageId", common.Bytes2Hex(event.MessageId[:]))
continue // to the next message
r.onCriticalInvariant(ctx)
r.lggr.Fatalw("computed messageID must match the value emitted from the on-chain event — critical invariant violated; escalate immediately",
"messageId", common.Bytes2Hex(event.MessageId[:]))
continue // ensure we never process this msg
}

if decodedMsg.DestChainSelector != protocol.ChainSelector(event.DestChainSelector) {
r.lggr.Errorw("destination chain selector must match the value emited from the on-chain event. This should never happen", "messageId", common.Bytes2Hex(event.MessageId[:]))
continue // to the next message
r.onCriticalInvariant(ctx)
r.lggr.Fatalw("destination chain selector must match the value emitted from the on-chain event. This should never happen", "messageId", common.Bytes2Hex(event.MessageId[:]))
continue // ensure we never process this msg
}

allReceipts := receiptBlobsFromEvent(event.Receipts, event.VerifierBlobs) // Validate the receipt structure matches expectations
Expand All @@ -269,11 +289,11 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to
}

results = append(results, protocol.MessageSentEvent{
MessageID: protocol.Bytes32(event.MessageId),
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Casts not needed

MessageID: event.MessageId,
Message: *decodedMsg,
Receipts: allReceipts, // Keep original order from OnRamp event
BlockNumber: log.BlockNumber,
TxHash: protocol.ByteSlice(log.TxHash.Bytes()),
TxHash: log.TxHash.Bytes(),
})
Comment on lines 291 to 297
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protocol.MessageSentEvent expects MessageID to be protocol.Bytes32 and TxHash to be protocol.ByteSlice (both defined types), so assigning event.MessageId ([32]byte) and log.TxHash.Bytes() ([]byte) will not compile. Convert both to the expected protocol types when building the struct literal.

Copilot uses AI. Check for mistakes.
}
return results, nil
Expand Down
1 change: 1 addition & 0 deletions integration/pkg/accessors/evm/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (f *factory) GetAccessor(ctx context.Context, chainSelector protocol.ChainS
onramp.OnRampCCIPMessageSent{}.Topic().Hex(),
chainSelector,
f.lggr,
nil,
)
if err != nil {
return nil, fmt.Errorf("failed to create EVM source reader: %w", err)
Expand Down
11 changes: 10 additions & 1 deletion integration/pkg/constructors/committee_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package constructors

import (
"bytes"
"context"
"fmt"
"time"

Expand Down Expand Up @@ -91,6 +92,10 @@ func NewVerificationCoordinator(
continue
}

chainMetrics := verifierMonitoring.Metrics().With(
"chain_selector", fmt.Sprintf("%d", sel),
"chain_name", sel.ChainName(),
)
sourceReader, err := evm.NewEVMSourceReader(
chain.Client(),
chain.HeadTracker(),
Expand All @@ -100,7 +105,11 @@ func NewVerificationCoordinator(
// TODO: does this need to be configurable?
onramp.OnRampCCIPMessageSent{}.Topic().Hex(),
sel,
logger.With(lggr, "component", "SourceReader", "chainID", sel))
logger.With(lggr, "component", "SourceReader", "chainID", sel),
func(ctx context.Context) {
chainMetrics.IncrementCriticalSourceInvariantViolations(ctx)
},
)
if err != nil {
lggr.Errorw("Failed to create source reader.", "error", err, "chainID", sel)
return nil, fmt.Errorf("failed to create source reader: %w", err)
Expand Down
33 changes: 33 additions & 0 deletions verifier/internal/mocks/mock_MetricLabeler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions verifier/pkg/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (m *noopMetricLabeler) RecordTaskVerificationQueueSize(ctx context.Context,
func (m *noopMetricLabeler) RecordStorageWriteQueueSize(ctx context.Context, size int64) {}
func (m *noopMetricLabeler) IncrementStorageWriteErrors(ctx context.Context) {}
func (m *noopMetricLabeler) IncrementTaskVerificationPermanentErrors(ctx context.Context) {}
func (m *noopMetricLabeler) IncrementCriticalSourceInvariantViolations(ctx context.Context) {}
func (m *noopMetricLabeler) RecordSourceChainLatestBlock(ctx context.Context, blockNum int64) {}
func (m *noopMetricLabeler) RecordSourceChainFinalizedBlock(ctx context.Context, blockNum int64) {}
func (m *noopMetricLabeler) RecordSourceChainSafeBlock(ctx context.Context, blockNum int64) {}
Expand Down
15 changes: 15 additions & 0 deletions verifier/pkg/monitoring/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type VerifierMetrics struct {
taskVerificationPermanentErrors metric.Int64Counter
storageWriteErrorsCounter metric.Int64Counter

criticalSourceInvariantViolations metric.Int64Counter

// Heartbeat Tracking
heartbeatsSentCounter metric.Int64Counter
heartbeatsFailedCounter metric.Int64Counter
Expand Down Expand Up @@ -277,6 +279,14 @@ func InitMetrics() (*VerifierMetrics, error) {
return nil, fmt.Errorf("failed to register task verification permanent errors counter: %w", err)
}

vm.criticalSourceInvariantViolations, err = beholder.GetMeter().Int64Counter(
"verifier_critical_source_invariant_violations_total",
metric.WithDescription("Encoded CCIP message disagrees with configured source chain observation (e.g. onRamp address or message id vs log)"),
)
if err != nil {
return nil, fmt.Errorf("failed to register critical source invariant violations counter: %w", err)
}

// HTTP API Metrics
vm.httpActiveRequestsUpDownCounter, err = beholder.GetMeter().Int64UpDownCounter(
"verifier_http_active_requests",
Expand Down Expand Up @@ -433,6 +443,11 @@ func (v *VerifierMetricLabeler) IncrementTaskVerificationPermanentErrors(ctx con
v.vm.taskVerificationPermanentErrors.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (v *VerifierMetricLabeler) IncrementCriticalSourceInvariantViolations(ctx context.Context) {
otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes()
v.vm.criticalSourceInvariantViolations.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (v *VerifierMetricLabeler) IncrementHeartbeatsSent(ctx context.Context) {
otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes()
v.vm.heartbeatsSentCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
Expand Down
2 changes: 2 additions & 0 deletions verifier/pkg/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func (f *FakeVerifierMetricLabeler) IncrementStorageWriteErrors(context.Context)

func (f *FakeVerifierMetricLabeler) IncrementTaskVerificationPermanentErrors(context.Context) {}

func (f *FakeVerifierMetricLabeler) IncrementCriticalSourceInvariantViolations(context.Context) {}

func (f *FakeVerifierMetricLabeler) RecordSourceChainLatestBlock(_ context.Context, blockNum int64) {
f.SourceChainLatestBLock.Store(blockNum)
}
Expand Down
3 changes: 3 additions & 0 deletions verifier/pkg/vtypes/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ type MetricLabeler interface {
// IncrementTaskVerificationPermanentErrors increments the counter for non-retryable verification errors.
IncrementTaskVerificationPermanentErrors(ctx context.Context)

// IncrementCriticalSourceInvariantViolations increments when encoded source-chain data disagrees with configured on-chain facts (e.g. onRamp in message vs observed contract).
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment for IncrementCriticalSourceInvariantViolations is narrowly scoped to encoded data disagreeing with configured on-chain facts, but current usage also covers decode/unpack/structural validation failures in the EVM source reader. Either update this comment (and metric description) to reflect all intended triggers, or introduce separate metrics so dashboards/alerts are not ambiguous.

Suggested change
// IncrementCriticalSourceInvariantViolations increments when encoded source-chain data disagrees with configured on-chain facts (e.g. onRamp in message vs observed contract).
// IncrementCriticalSourceInvariantViolations increments the counter for critical source-reader
// validation failures, including when encoded source-chain data disagrees with configured
// or observed on-chain facts, as well as decode, unpack, or structural validation failures
// while interpreting source-chain data.

Copilot uses AI. Check for mistakes.
IncrementCriticalSourceInvariantViolations(ctx context.Context)

// Heartbeat tracking

// IncrementHeartbeatsSent increments the counter for successfully sent heartbeats.
Expand Down
1 change: 1 addition & 0 deletions verifier/testutil/metric_labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (n *NoopMetricLabeler) RecordTaskVerificationQueueSize(_ context.Context, _
func (n *NoopMetricLabeler) RecordStorageWriteQueueSize(_ context.Context, _ int64) {}
func (n *NoopMetricLabeler) IncrementStorageWriteErrors(_ context.Context) {}
func (n *NoopMetricLabeler) IncrementTaskVerificationPermanentErrors(_ context.Context) {}
func (n *NoopMetricLabeler) IncrementCriticalSourceInvariantViolations(_ context.Context) {}
func (n *NoopMetricLabeler) IncrementHeartbeatsSent(_ context.Context) {}
func (n *NoopMetricLabeler) IncrementHeartbeatsFailed(_ context.Context) {}
func (n *NoopMetricLabeler) RecordHeartbeatDuration(_ context.Context, _ time.Duration) {}
Expand Down
Loading