-
Notifications
You must be signed in to change notification settings - Fork 2
Add metric to crit invariants #1037
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,12 +13,13 @@ import ( | |
|
|
||
| "github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/gobindings/generated/latest/onramp" | ||
| "github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/gobindings/generated/latest/rmn_remote" | ||
| "github.com/smartcontractkit/chainlink-ccv/integration/pkg/rmnremotereader" | ||
| "github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess" | ||
| "github.com/smartcontractkit/chainlink-ccv/protocol" | ||
| "github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
| "github.com/smartcontractkit/chainlink-evm/pkg/client" | ||
| "github.com/smartcontractkit/chainlink-evm/pkg/heads" | ||
|
|
||
| "github.com/smartcontractkit/chainlink-ccv/integration/pkg/rmnremotereader" | ||
| "github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess" | ||
| "github.com/smartcontractkit/chainlink-ccv/protocol" | ||
| ) | ||
|
|
||
| // Compile-time checks to ensure SourceReader implements the SourceReader interface. | ||
|
|
@@ -36,6 +37,7 @@ type SourceReader struct { | |
| chainSelector protocol.ChainSelector | ||
| lggr logger.Logger | ||
| onRampABI *abi.ABI // Cached ABI to avoid re-parsing | ||
| onCriticalInvariant func(context.Context) | ||
| } | ||
|
|
||
| func NewEVMSourceReader( | ||
|
|
@@ -46,6 +48,7 @@ func NewEVMSourceReader( | |
| ccipMessageSentTopic string, | ||
| chainSelector protocol.ChainSelector, | ||
| lggr logger.Logger, | ||
| onCriticalInvariant func(context.Context), | ||
| ) (chainaccess.SourceReader, error) { | ||
| var errs []error | ||
| appendIfNil := func(field any, fieldName string) { | ||
|
|
@@ -88,6 +91,11 @@ func NewEVMSourceReader( | |
| return nil, fmt.Errorf("failed to get OnRamp ABI: %w", err) | ||
| } | ||
|
|
||
| onInvariant := onCriticalInvariant | ||
| if onInvariant == nil { | ||
| onInvariant = func(context.Context) {} | ||
| } | ||
|
|
||
| return &SourceReader{ | ||
| chainClient: chainClient, | ||
| headTracker: headTracker, | ||
|
|
@@ -98,6 +106,7 @@ func NewEVMSourceReader( | |
| chainSelector: chainSelector, | ||
| lggr: lggr, | ||
| onRampABI: onRampABI, | ||
| onCriticalInvariant: onInvariant, | ||
| }, nil | ||
| } | ||
|
|
||
|
|
@@ -156,6 +165,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to | |
|
|
||
| // Explicitly check for the expected number of topics | ||
| if len(log.Topics) < 4 { | ||
| r.onCriticalInvariant(ctx) | ||
| r.lggr.Errorw("CCIPMessageSent event has insufficient topics", | ||
| "expected", 4, | ||
| "actual", len(log.Topics), | ||
|
|
@@ -181,6 +191,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to | |
| event.Sender = sender | ||
| err = r.onRampABI.UnpackIntoInterface(event, "CCIPMessageSent", log.Data) | ||
| if err != nil { | ||
| r.onCriticalInvariant(ctx) | ||
| r.lggr.Errorw("Failed to unpack CCIPMessageSent event payload", "error", err) | ||
| continue // to next message | ||
| } | ||
|
|
@@ -194,6 +205,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to | |
|
|
||
| // Check minimum receipt count: at least 1 CCV + executor + network fees = 3 receipts | ||
| if len(event.Receipts) < 3 { | ||
| r.onCriticalInvariant(ctx) | ||
| r.lggr.Errorw("Insufficient receipts. Expected at least 3 (1 CCV + executor + network fees)", | ||
| "count", len(event.Receipts), | ||
| "messageId", common.Bytes2Hex(event.MessageId[:])) | ||
|
|
@@ -224,6 +236,7 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to | |
| "messageId", common.Bytes2Hex(event.MessageId[:])) | ||
| decodedMsg, err := protocol.DecodeMessage(event.EncodedMessage) | ||
| if err != nil { | ||
| r.onCriticalInvariant(ctx) | ||
| r.lggr.Errorw("Failed to decode message", "error", err, "rawMessage", event.EncodedMessage) | ||
| continue // to next message | ||
| } | ||
|
|
@@ -232,30 +245,37 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to | |
|
|
||
| // Validate that ccvAndExecutorHash is not zero - it's required | ||
| if decodedMsg.CcvAndExecutorHash == (protocol.Bytes32{}) { | ||
| r.onCriticalInvariant(ctx) | ||
| r.lggr.Errorw("ccvAndExecutorHash is zero in decoded message", | ||
| "messageID", common.Bytes2Hex(event.MessageId[:]), | ||
| "blockNumber", log.BlockNumber) | ||
| continue // to next message | ||
| } | ||
|
|
||
| if !decodedMsg.OnRampAddress.Equal(expectedSourceAddressBytes(r.onRampAddress)) { | ||
| r.lggr.Errorw("onRampAddress must match the value configured. This should never happen, if it does something is seriously wrong. Escalate immediately", "messageId", common.Bytes2Hex(event.MessageId[:])) | ||
| continue // to next message | ||
| r.onCriticalInvariant(ctx) | ||
| r.lggr.Fatalw("onRampAddress must match the value configured — critical invariant violated; escalate immediately", | ||
| "messageId", common.Bytes2Hex(event.MessageId[:])) | ||
| continue // ensure we never process this msg | ||
|
Comment on lines
255
to
+259
|
||
| } | ||
|
|
||
| if !decodedMsg.Sender.Equal(expectedSourceAddressBytes(event.Sender)) { | ||
| r.lggr.Errorw("sender must match the value emitted from the on-chain event. This should never happen.", "messageId", common.Bytes2Hex(event.Sender[:])) | ||
| continue // to next message | ||
| r.onCriticalInvariant(ctx) | ||
| r.lggr.Fatalw("sender must match the value emitted from the on-chain event. This should never happen.", "messageId", common.Bytes2Hex(event.Sender[:])) | ||
| continue // ensure we never process this msg | ||
| } | ||
|
|
||
| if decodedMsg.MustMessageID() != event.MessageId { | ||
| r.lggr.Errorw("computed messageID must match the value emitted from the on-chain event. This should never happen, if it does escalate immediately.", "messageId", common.Bytes2Hex(event.MessageId[:])) | ||
| continue // to the next message | ||
| r.onCriticalInvariant(ctx) | ||
| r.lggr.Fatalw("computed messageID must match the value emitted from the on-chain event — critical invariant violated; escalate immediately", | ||
| "messageId", common.Bytes2Hex(event.MessageId[:])) | ||
| continue // ensure we never process this msg | ||
| } | ||
|
|
||
| if decodedMsg.DestChainSelector != protocol.ChainSelector(event.DestChainSelector) { | ||
| r.lggr.Errorw("destination chain selector must match the value emited from the on-chain event. This should never happen", "messageId", common.Bytes2Hex(event.MessageId[:])) | ||
| continue // to the next message | ||
| r.onCriticalInvariant(ctx) | ||
| r.lggr.Fatalw("destination chain selector must match the value emitted from the on-chain event. This should never happen", "messageId", common.Bytes2Hex(event.MessageId[:])) | ||
| continue // ensure we never process this msg | ||
| } | ||
|
|
||
| allReceipts := receiptBlobsFromEvent(event.Receipts, event.VerifierBlobs) // Validate the receipt structure matches expectations | ||
|
|
@@ -269,11 +289,11 @@ func (r *SourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock, to | |
| } | ||
|
|
||
| results = append(results, protocol.MessageSentEvent{ | ||
| MessageID: protocol.Bytes32(event.MessageId), | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Casts not needed |
||
| MessageID: event.MessageId, | ||
| Message: *decodedMsg, | ||
| Receipts: allReceipts, // Keep original order from OnRamp event | ||
| BlockNumber: log.BlockNumber, | ||
| TxHash: protocol.ByteSlice(log.TxHash.Bytes()), | ||
| TxHash: log.TxHash.Bytes(), | ||
| }) | ||
|
Comment on lines
291
to
297
|
||
| } | ||
| return results, nil | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -97,6 +97,9 @@ type MetricLabeler interface { | |||||||||||
| // IncrementTaskVerificationPermanentErrors increments the counter for non-retryable verification errors. | ||||||||||||
| IncrementTaskVerificationPermanentErrors(ctx context.Context) | ||||||||||||
|
|
||||||||||||
| // IncrementCriticalSourceInvariantViolations increments when encoded source-chain data disagrees with configured on-chain facts (e.g. onRamp in message vs observed contract). | ||||||||||||
|
||||||||||||
| // IncrementCriticalSourceInvariantViolations increments when encoded source-chain data disagrees with configured on-chain facts (e.g. onRamp in message vs observed contract). | |
| // IncrementCriticalSourceInvariantViolations increments the counter for critical source-reader | |
| // validation failures, including when encoded source-chain data disagrees with configured | |
| // or observed on-chain facts, as well as decode, unpack, or structural validation failures | |
| // while interpreting source-chain data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding it to all errors, but some error logs are promoted to fatals to make them failing loud. The ones upgraded are the ones that would be the result from a malicious ramp. The ones that are still just skipping (but do send a metric!) are wrong payloads but not in a clearly malicious way.
Let me know if everything should be fatal.