diff --git a/.gitignore b/.gitignore index 86311b421..c0074e733 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,6 @@ indexer/secrets.toml .cache/ # Ignore the build output of the ccv development environment -build/devenv/ccv \ No newline at end of file +build/devenv/ccv +# Added by code-review-graph +.code-review-graph/ diff --git a/Justfile b/Justfile index 1b846f92e..c98ca08de 100644 --- a/Justfile +++ b/Justfile @@ -49,7 +49,7 @@ fmt: ensure-golangci-lint # Run golangci-lint lint fix="": ensure-golangci-lint - find . -type f -name go.mod -execdir golangci-lint run {{ if fix != "" { "--fix" } else { "" } }} \; + gomods -c 'golangci-lint run {{ if fix != "" { "--fix" } else { "" } }}' shellcheck: @command -v shellcheck >/dev/null 2>&1 || { \ diff --git a/cmd/executor/standalone/main.go b/cmd/executor/standalone/main.go index 207e20deb..68005a449 100644 --- a/cmd/executor/standalone/main.go +++ b/cmd/executor/standalone/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "maps" "net/http" "os" "os/signal" @@ -71,7 +72,7 @@ func main() { } lggr = logger.Named(lggr, "executor") - executorConfig, blockchainInfo, err := loadConfiguration(configPath) + executorConfig, blockchainInfo, chainAddresses, err := loadConfiguration(configPath) if err != nil { lggr.Errorw("Failed to load configuration", "path", configPath, "error", err) os.Exit(1) @@ -159,9 +160,21 @@ func main() { destReaders := make(map[protocol.ChainSelector]chainaccess.DestinationReader) enabledDestChains := make([]protocol.ChainSelector, 0) rmnReaders := make(map[protocol.ChainSelector]chainaccess.RMNCurseReader) + pk := os.Getenv(privateKeyEnvVar) + if pk == "" { + lggr.Errorf("Environment variable %s is not set", privateKeyEnvVar) + os.Exit(1) + } + for selector, chain := range blockchainInfo.GetAllInfos() { strSel := strconv.FormatUint(uint64(selector), 10) - chainConfig := executorConfig.ChainConfiguration[strSel] + + offRampAddrStr := chainAddresses.OffRampAddresses[strSel] + if offRampAddrStr == "" { + lggr.Warnw("No offramp configured for chain, skipping.", "chainSelector", strSel) + continue + } + rmnAddrStr := chainAddresses.RMNRemoteAddresses[strSel] chainClient, err := evm.CreateMultiNodeClientFromInfo(ctx, chain, lggr) if err != nil { @@ -173,8 +186,8 @@ func main() { Lggr: lggr, ChainSelector: selector, ChainClient: chainClient, - OfframpAddress: chainConfig.OffRampAddress, - RmnRemoteAddress: chainConfig.RmnAddress, + OfframpAddress: offRampAddrStr, + RmnRemoteAddress: rmnAddrStr, ExecutionVisabilityWindow: executorConfig.MaxRetryDuration, Monitoring: executorMonitoring, }) @@ -184,19 +197,13 @@ func main() { continue } - pk := os.Getenv(privateKeyEnvVar) - if pk == "" { - lggr.Errorf("Environment variable %s is not set", privateKeyEnvVar) - os.Exit(1) - } - ct, err := contracttransmitter.NewEVMContractTransmitterFromRPC( ctx, lggr, selector, chain.Nodes[0].InternalHTTPUrl, pk, - common.HexToAddress(chainConfig.OffRampAddress), + common.HexToAddress(offRampAddrStr), ) if err != nil { lggr.Errorw("Failed to create contract transmitter", "error", err) @@ -346,15 +353,37 @@ func main() { lggr.Infow("Execution service stopped gracefully") } -func loadConfiguration(filepath string) (*executor.Configuration, *chainaccess.Infos[evm.Info], error) { - var config executor.ConfigWithBlockchainInfo[evm.Info] +func loadConfiguration(filepath string) (*executor.Configuration, *chainaccess.Infos[evm.Info], chainaccess.GenericConfig, error) { + // CommitteeConfig is added separately because it is not part of executor.Configuration. + // ExecutorConfig is promoted through executor.Configuration, so no separate embed is needed. + var config struct { + executor.ConfigWithBlockchainInfo[evm.Info] + chainaccess.CommitteeConfig + } if _, err := toml.DecodeFile(filepath, &config); err != nil { - return nil, nil, err + return nil, nil, chainaccess.GenericConfig{}, err } normalizedConfig, err := config.GetNormalizedConfig() if err != nil { - return nil, nil, err + return nil, nil, chainaccess.GenericConfig{}, err + } + + // RMN addresses live in per-chain ChainConfiguration for the standalone path. + // Merge them into CommitteeConfig.RMNRemoteAddresses, letting any top-level + // CommitteeConfig values (from an overlay file) take precedence. + rmnRemoteAddresses := make(map[string]string, len(normalizedConfig.ChainConfiguration)) + for sel, cc := range normalizedConfig.ChainConfiguration { + rmnRemoteAddresses[sel] = cc.RmnAddress + } + maps.Copy(rmnRemoteAddresses, config.RMNRemoteAddresses) + + genericConfig := chainaccess.GenericConfig{ + CommitteeConfig: chainaccess.CommitteeConfig{ + OnRampAddresses: config.OnRampAddresses, + RMNRemoteAddresses: rmnRemoteAddresses, + }, + ExecutorConfig: normalizedConfig.ExecutorConfig, } - return normalizedConfig, &config.BlockchainInfos, nil + return normalizedConfig, &config.BlockchainInfos, genericConfig, nil } diff --git a/executor/config.go b/executor/config.go index 0a8f95338..722cd99e1 100644 --- a/executor/config.go +++ b/executor/config.go @@ -59,16 +59,21 @@ type Configuration struct { // This is used to configure the chain-specific configuration for each chain such as addresses, executor pool, and execution interval. ChainConfiguration map[string]ChainConfiguration `toml:"chain_configuration"` WorkerCount int `toml:"worker_count"` + + // ExecutorConfig holds destination-side contract addresses. Embedding it here places + // off_ramp_addresses and execution_visibility_window at the same top-level TOML location + // as in GenericConfig, making the executor config a valid overlay target. + chainaccess.ExecutorConfig } -// ChainConfiguration is all the configuration an executor needs to know about a specific chain. -// This is separate from chain-specific RPC information in BlockchainInfos. +// ChainConfiguration is all the executor-specific configuration for a single destination chain. type ChainConfiguration struct { - // RMN address is the address of the RMN contract to check for curse state. + // RmnAddress is the address of the RMN Remote contract on this destination chain. RmnAddress string `toml:"rmn_address"` - // OffRamp address is the address of the offramp contract to send messages to. + // OffRampAddress is the address of the OffRamp contract on this destination chain. + // Takes precedence over the top-level off_ramp_addresses map when both are present. OffRampAddress string `toml:"off_ramp_address"` - // Executor pool is the list of executor IDs used for turn taking. This executor's ID must be in the list. + // ExecutorPool is the list of executor IDs used for turn taking. This executor's ID must be in the list. ExecutorPool []string `toml:"executor_pool"` // ExecutionInterval is how long each executor has to process a message before the next executor in the cluster takes over. ExecutionInterval time.Duration `toml:"execution_interval"` @@ -130,7 +135,7 @@ func (c *Configuration) Validate() error { if chainConfig.RmnAddress == "" { return fmt.Errorf("rmn_address must be configured for chain %s", chainSel) } - if chainConfig.OffRampAddress == "" { + if chainConfig.OffRampAddress == "" && c.OffRampAddresses[chainSel] == "" { return fmt.Errorf("off_ramp_address must be configured for chain %s", chainSel) } if chainConfig.DefaultExecutorAddress == "" { @@ -174,6 +179,10 @@ func (c *Configuration) GetNormalizedConfig() (*Configuration, error) { normalized.LookbackWindow = parseOrDefault(c.LookbackWindow, lookbackWindowDefault) normalized.ReaderCacheExpiry = parseOrDefault(c.ReaderCacheExpiry, readerCacheExpiryDefault) normalized.MaxRetryDuration = parseOrDefault(c.MaxRetryDuration, maxRetryDurationDefault) + // Default ExecutionVisibilityWindow to MaxRetryDuration when not explicitly set. + if c.ExecutionVisibilityWindow == 0 { + normalized.ExecutionVisibilityWindow = normalized.MaxRetryDuration + } if c.IndexerQueryLimit == 0 { normalized.IndexerQueryLimit = IndexerQueryLimitDefault } diff --git a/executor/config_test.go b/executor/config_test.go index b59b8bf13..916e9c66a 100644 --- a/executor/config_test.go +++ b/executor/config_test.go @@ -5,12 +5,13 @@ import ( "time" "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess" ) func validChainConfig() ChainConfiguration { return ChainConfiguration{ RmnAddress: "0x1234567890abcdef", - OffRampAddress: "0xabcdef1234567890", DefaultExecutorAddress: "0xdeadbeef12345678", ExecutorPool: []string{"executor-1", "executor-2"}, } @@ -20,6 +21,9 @@ func validConfig() Configuration { return Configuration{ ExecutorID: "executor-1", IndexerAddress: []string{"http://indexer1:8100"}, + ExecutorConfig: chainaccess.ExecutorConfig{ + OffRampAddresses: map[string]string{"1": "0xabcdef1234567890"}, + }, ChainConfiguration: map[string]ChainConfiguration{ "1": validChainConfig(), }, @@ -177,9 +181,7 @@ func TestConfiguration_Validate(t *testing.T) { name: "missing_offramp_address_fails", config: func() Configuration { c := validConfig() - cc := validChainConfig() - cc.OffRampAddress = "" - c.ChainConfiguration = map[string]ChainConfiguration{"1": cc} + c.OffRampAddresses = nil return c }(), wantErrContains: "off_ramp_address must be configured", diff --git a/integration/pkg/accessors/evm/factory.go b/integration/pkg/accessors/evm/factory.go index 353a5f18a..03bd07289 100644 --- a/integration/pkg/accessors/evm/factory.go +++ b/integration/pkg/accessors/evm/factory.go @@ -4,11 +4,14 @@ import ( "context" "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/common" chainsel "github.com/smartcontractkit/chain-selectors" "github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/gobindings/generated/latest/onramp" + executormonitoring "github.com/smartcontractkit/chainlink-ccv/executor/pkg/monitoring" + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/destinationreader" "github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess" "github.com/smartcontractkit/chainlink-ccv/protocol" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -16,13 +19,17 @@ import ( "github.com/smartcontractkit/chainlink-evm/pkg/heads" ) +const defaultExecutionVisibilityWindow = 8 * time.Hour + type factory struct { lggr logger.Logger // TODO: put these in a single map. - onRampAddresses map[protocol.ChainSelector]string - rmnRemoteAddresses map[protocol.ChainSelector]string - headTrackers map[protocol.ChainSelector]heads.Tracker - chainClients map[protocol.ChainSelector]client.Client + onRampAddresses map[protocol.ChainSelector]string + rmnRemoteAddresses map[protocol.ChainSelector]string + offRampAddresses map[protocol.ChainSelector]string + executionVisibilityWindow time.Duration + headTrackers map[protocol.ChainSelector]heads.Tracker + chainClients map[protocol.ChainSelector]client.Client } // NewFactory creates a new EVM AccessorFactory. @@ -31,16 +38,23 @@ type factory struct { func NewFactory( lggr logger.Logger, // TODO: use ethereum address instead of string - onRampAddresses, rmnRemoteAddresses map[protocol.ChainSelector]string, + onRampAddresses, rmnRemoteAddresses, offRampAddresses map[protocol.ChainSelector]string, + executionVisibilityWindow time.Duration, headTrackers map[protocol.ChainSelector]heads.Tracker, chainClients map[protocol.ChainSelector]client.Client, ) chainaccess.AccessorFactory { + evw := executionVisibilityWindow + if evw == 0 { + evw = defaultExecutionVisibilityWindow + } return &factory{ - lggr: lggr, - onRampAddresses: onRampAddresses, - rmnRemoteAddresses: rmnRemoteAddresses, - headTrackers: headTrackers, - chainClients: chainClients, + lggr: lggr, + onRampAddresses: onRampAddresses, + rmnRemoteAddresses: rmnRemoteAddresses, + offRampAddresses: offRampAddresses, + executionVisibilityWindow: evw, + headTrackers: headTrackers, + chainClients: chainClients, } } @@ -103,17 +117,32 @@ func (f *factory) GetAccessor(ctx context.Context, chainSelector protocol.ChainS return nil, fmt.Errorf("failed to create EVM source reader: %w", err) } - return newAccessor(evmSourceReader), nil -} + var destReader chainaccess.DestinationReader + if offRampAddr := f.offRampAddresses[chainSelector]; offRampAddr != "" { + dr, err := destinationreader.NewEvmDestinationReader(destinationreader.Params{ + Lggr: f.lggr, + ChainSelector: chainSelector, + ChainClient: chainClient, + OfframpAddress: offRampAddr, + RmnRemoteAddress: f.rmnRemoteAddresses[chainSelector], + ExecutionVisabilityWindow: f.executionVisibilityWindow, + Monitoring: executormonitoring.NewNoopExecutorMonitoring(), + }) + if err != nil { + return nil, fmt.Errorf("failed to create EVM destination reader for chain %d: %w", chainSelector, err) + } + destReader = dr + } -type accessor struct { - sourceReader chainaccess.SourceReader + return &accessor{ + sourceReader: evmSourceReader, + destinationReader: destReader, + }, nil } -func newAccessor(sourceReader chainaccess.SourceReader) chainaccess.Accessor { - return &accessor{ - sourceReader: sourceReader, - } +type accessor struct { + sourceReader chainaccess.SourceReader + destinationReader chainaccess.DestinationReader } func (a *accessor) SourceReader() chainaccess.SourceReader { @@ -122,3 +151,10 @@ func (a *accessor) SourceReader() chainaccess.SourceReader { } return a.sourceReader } + +func (a *accessor) GetDestinationReader() (chainaccess.DestinationReader, bool) { + if a == nil || a.destinationReader == nil { + return nil, false + } + return a.destinationReader, true +} diff --git a/integration/pkg/accessors/evm/factory_constructor.go b/integration/pkg/accessors/evm/factory_constructor.go index 0ed7c923a..e094e5ef4 100644 --- a/integration/pkg/accessors/evm/factory_constructor.go +++ b/integration/pkg/accessors/evm/factory_constructor.go @@ -82,6 +82,7 @@ func CreateAccessorFactory( // Convert from map[string]string -> map[chainsel]string onRampInfos := chainaccess.Infos[string](generic.OnRampAddresses).GetAllInfos() rmnRemoteInfos := chainaccess.Infos[string](generic.RMNRemoteAddresses).GetAllInfos() + offRampInfos := chainaccess.Infos[string](generic.OffRampAddresses).GetAllInfos() - return NewFactory(lggr, onRampInfos, rmnRemoteInfos, headTrackers, chainClients), nil + return NewFactory(lggr, onRampInfos, rmnRemoteInfos, offRampInfos, generic.ExecutionVisibilityWindow, headTrackers, chainClients), nil } diff --git a/integration/pkg/constructors/executor.go b/integration/pkg/constructors/executor.go index d10924b5c..4844714cc 100644 --- a/integration/pkg/constructors/executor.go +++ b/integration/pkg/constructors/executor.go @@ -55,8 +55,6 @@ func NewExecutorCoordinator( lggr.Infow("Executor configuration", "config", cfg) - offRampAddresses := make(map[protocol.ChainSelector]protocol.UnknownAddress, len(cfg.ChainConfiguration)) - rmnAddresses := make(map[protocol.ChainSelector]protocol.UnknownAddress, len(cfg.ChainConfiguration)) execPool := make(map[protocol.ChainSelector][]string, len(cfg.ChainConfiguration)) execIntervals := make(map[protocol.ChainSelector]time.Duration, len(cfg.ChainConfiguration)) defaultExecutorAddresses := make(map[protocol.ChainSelector]protocol.UnknownAddress, len(cfg.ChainConfiguration)) @@ -67,14 +65,6 @@ func NewExecutorCoordinator( return nil, fmt.Errorf("failed to parse selector '%s': %w", selStr, err) } sel := protocol.ChainSelector(intSel) - offRampAddresses[sel], err = protocol.NewUnknownAddressFromHex(chainConfig.OffRampAddress) - if err != nil { - return nil, fmt.Errorf("failed to parse offramp address '%s': %w", chainConfig.OffRampAddress, err) - } - rmnAddresses[sel], err = protocol.NewUnknownAddressFromHex(chainConfig.RmnAddress) - if err != nil { - return nil, fmt.Errorf("failed to parse rmn address '%s': %w", chainConfig.RmnAddress, err) - } execPool[sel] = chainConfig.ExecutorPool execIntervals[sel] = chainConfig.ExecutionInterval defaultExecutorAddresses[sel], err = protocol.NewUnknownAddressFromHex(chainConfig.DefaultExecutorAddress) @@ -96,16 +86,26 @@ func NewExecutorCoordinator( rmnReaders := make(map[protocol.ChainSelector]chainaccess.RMNCurseReader) enabledDestChains := make([]protocol.ChainSelector, 0) for sel, chain := range relayers { - if _, ok := offRampAddresses[sel]; !ok { + chainCfg := cfg.ChainConfiguration[sel.String()] + offRampAddrStr := chainCfg.OffRampAddress + if offRampAddrStr == "" { + offRampAddrStr = cfg.OffRampAddresses[sel.String()] + } + if offRampAddrStr == "" { lggr.Warnw("No offramp configured for chain, skipping.", "chainID", sel) continue } + offRampAddr, err := protocol.NewUnknownAddressFromHex(offRampAddrStr) + if err != nil { + return nil, fmt.Errorf("failed to parse offramp address '%s': %w", offRampAddrStr, err) + } + rmnAddrStr := chainCfg.RmnAddress transmitters[sel] = contracttransmitter.NewEVMContractTransmitterFromTxm( logger.With(lggr, "component", "ContractTransmitter"), sel, chain.TxManager(), - common.HexToAddress(offRampAddresses[sel].String()), + common.HexToAddress(offRampAddr.String()), keys[sel], fromAddresses[sel], executorMonitoring, @@ -116,8 +116,8 @@ func NewExecutorCoordinator( Lggr: logger.With(lggr, "component", "DestinationReader"), ChainSelector: sel, ChainClient: chain.Client(), - OfframpAddress: offRampAddresses[sel].String(), // TODO: use UnknownAddress instead of string? - RmnRemoteAddress: rmnAddresses[sel].String(), + OfframpAddress: offRampAddrStr, + RmnRemoteAddress: rmnAddrStr, ExecutionVisabilityWindow: cfg.MaxRetryDuration, Monitoring: executorMonitoring, }) diff --git a/pkg/chainaccess/interfaces.go b/pkg/chainaccess/interfaces.go index 2fd8633f2..d350493d2 100644 --- a/pkg/chainaccess/interfaces.go +++ b/pkg/chainaccess/interfaces.go @@ -71,9 +71,16 @@ type MessageFilter interface { // Accessor provides objects that in turn provide specific kinds of blockchain access. // It is scoped to a particular chain selector. +// +// Capabilities beyond SourceReader are optional: callers must check the bool return +// before using the value. A false return means the underlying factory does not support +// that capability for this chain. type Accessor interface { // SourceReader returns the SourceReader for the chain selector. SourceReader() SourceReader + // GetDestinationReader returns the DestinationReader for the chain selector, if + // the factory that produced this Accessor supports destination-side reads. + GetDestinationReader() (DestinationReader, bool) } // AccessorFactory creates Accessors for specific chain selectors. diff --git a/pkg/chainaccess/registry.go b/pkg/chainaccess/registry.go index 04053f195..4bd3e6234 100644 --- a/pkg/chainaccess/registry.go +++ b/pkg/chainaccess/registry.go @@ -6,6 +6,7 @@ import ( "maps" "reflect" "sync" + "time" "github.com/BurntSushi/toml" @@ -69,6 +70,7 @@ type GenericConfig struct { ChainConfig Infos[any] `toml:"blockchain_infos"` CommitteeConfig + ExecutorConfig } // GetAllConcreteConfig populates target, which must be a pointer to an Infos[T] @@ -127,16 +129,31 @@ func (gc GenericConfig) GetConcreteConfig(selector protocol.ChainSelector, targe return nil } -// CommitteeConfig that is defined as part of the app and required by the SourceReader. +// CommitteeConfig holds source-side contract addresses used by committee verifiers. +// It is embedded in GenericConfig. type CommitteeConfig struct { - // OnRampAddresses is a map the addresses of the on ramps for each chain selector. + // OnRampAddresses maps chain selectors (as decimal strings) to OnRamp contract addresses. OnRampAddresses map[string]string `json:"on_ramp_addresses" toml:"on_ramp_addresses"` - // RMNRemoteAddresses is a map of RMN Remote contract addresses for each chain selector. - // Required for curse detection. + // RMNRemoteAddresses maps chain selectors to RMN Remote contract addresses. + // Used by source readers for curse detection. RMNRemoteAddresses map[string]string `json:"rmn_remote_addresses" toml:"rmn_remote_addresses"` } +// ExecutorConfig holds destination-side contract addresses and executor tuning parameters. +// It is embedded in GenericConfig alongside CommitteeConfig so that a single config +// file covers both verifier and executor needs. +type ExecutorConfig struct { + // OffRampAddresses maps chain selectors (as decimal strings) to OffRamp contract addresses. + // Required by destination readers and contract transmitters. + OffRampAddresses map[string]string `json:"off_ramp_addresses" toml:"off_ramp_addresses"` + + // ExecutionVisibilityWindow is how far back a DestinationReader looks for execution + // attempts when deciding whether an honest attempt has been made. + // Defaults to 8 hours when zero. + ExecutionVisibilityWindow time.Duration `toml:"execution_visibility_window"` +} + // accessorConstructorMapCopy returns a copy of the accessorConstructorMap to avoid holding the lock during // delegate calls. func accessorConstructorMapCopy() map[ChainFamily]AccessorFactoryConstructor { diff --git a/pkg/chainaccess/registry_test.go b/pkg/chainaccess/registry_test.go index 0cf09e29d..8db9f3a64 100644 --- a/pkg/chainaccess/registry_test.go +++ b/pkg/chainaccess/registry_test.go @@ -37,6 +37,10 @@ type testAccessor struct{} func (a *testAccessor) SourceReader() chainaccess.SourceReader { return nil } +func (a *testAccessor) GetDestinationReader() (chainaccess.DestinationReader, bool) { + return nil, false +} + func init() { // Register a test constructor for the "evm" family so that NewRegistry // can build a Registry without real RPC connections.