Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ indexer/secrets.toml
.cache/

# Ignore the build output of the ccv development environment
build/devenv/ccv
build/devenv/ccv
# Added by code-review-graph
.code-review-graph/
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fmt: ensure-golangci-lint

# Run golangci-lint
lint fix="": ensure-golangci-lint
find . -type f -name go.mod -execdir golangci-lint run {{ if fix != "" { "--fix" } else { "" } }} \;
gomods -c 'golangci-lint run {{ if fix != "" { "--fix" } else { "" } }}'

shellcheck:
@command -v shellcheck >/dev/null 2>&1 || { \
Expand Down
61 changes: 45 additions & 16 deletions cmd/executor/standalone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"maps"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -71,7 +72,7 @@ func main() {
}
lggr = logger.Named(lggr, "executor")

executorConfig, blockchainInfo, err := loadConfiguration(configPath)
executorConfig, blockchainInfo, chainAddresses, err := loadConfiguration(configPath)
if err != nil {
lggr.Errorw("Failed to load configuration", "path", configPath, "error", err)
os.Exit(1)
Expand Down Expand Up @@ -159,9 +160,21 @@ func main() {
destReaders := make(map[protocol.ChainSelector]chainaccess.DestinationReader)
enabledDestChains := make([]protocol.ChainSelector, 0)
rmnReaders := make(map[protocol.ChainSelector]chainaccess.RMNCurseReader)
pk := os.Getenv(privateKeyEnvVar)
if pk == "" {
lggr.Errorf("Environment variable %s is not set", privateKeyEnvVar)
os.Exit(1)
}

for selector, chain := range blockchainInfo.GetAllInfos() {
strSel := strconv.FormatUint(uint64(selector), 10)
chainConfig := executorConfig.ChainConfiguration[strSel]

offRampAddrStr := chainAddresses.OffRampAddresses[strSel]
if offRampAddrStr == "" {
lggr.Warnw("No offramp configured for chain, skipping.", "chainSelector", strSel)
continue
}
rmnAddrStr := chainAddresses.RMNRemoteAddresses[strSel]

chainClient, err := evm.CreateMultiNodeClientFromInfo(ctx, chain, lggr)
if err != nil {
Expand All @@ -173,8 +186,8 @@ func main() {
Lggr: lggr,
ChainSelector: selector,
ChainClient: chainClient,
OfframpAddress: chainConfig.OffRampAddress,
RmnRemoteAddress: chainConfig.RmnAddress,
OfframpAddress: offRampAddrStr,
RmnRemoteAddress: rmnAddrStr,
ExecutionVisabilityWindow: executorConfig.MaxRetryDuration,
Monitoring: executorMonitoring,
})
Expand All @@ -184,19 +197,13 @@ func main() {
continue
}

pk := os.Getenv(privateKeyEnvVar)
if pk == "" {
lggr.Errorf("Environment variable %s is not set", privateKeyEnvVar)
os.Exit(1)
}

ct, err := contracttransmitter.NewEVMContractTransmitterFromRPC(
ctx,
lggr,
selector,
chain.Nodes[0].InternalHTTPUrl,
pk,
common.HexToAddress(chainConfig.OffRampAddress),
common.HexToAddress(offRampAddrStr),
)
if err != nil {
lggr.Errorw("Failed to create contract transmitter", "error", err)
Expand Down Expand Up @@ -346,15 +353,37 @@ func main() {
lggr.Infow("Execution service stopped gracefully")
}

func loadConfiguration(filepath string) (*executor.Configuration, *chainaccess.Infos[evm.Info], error) {
var config executor.ConfigWithBlockchainInfo[evm.Info]
func loadConfiguration(filepath string) (*executor.Configuration, *chainaccess.Infos[evm.Info], chainaccess.GenericConfig, error) {
// CommitteeConfig is added separately because it is not part of executor.Configuration.
// ExecutorConfig is promoted through executor.Configuration, so no separate embed is needed.
var config struct {
executor.ConfigWithBlockchainInfo[evm.Info]
chainaccess.CommitteeConfig
}
if _, err := toml.DecodeFile(filepath, &config); err != nil {
return nil, nil, err
return nil, nil, chainaccess.GenericConfig{}, err
}

normalizedConfig, err := config.GetNormalizedConfig()
if err != nil {
return nil, nil, err
return nil, nil, chainaccess.GenericConfig{}, err
}

// RMN addresses live in per-chain ChainConfiguration for the standalone path.
// Merge them into CommitteeConfig.RMNRemoteAddresses, letting any top-level
// CommitteeConfig values (from an overlay file) take precedence.
rmnRemoteAddresses := make(map[string]string, len(normalizedConfig.ChainConfiguration))
for sel, cc := range normalizedConfig.ChainConfiguration {
rmnRemoteAddresses[sel] = cc.RmnAddress
}
maps.Copy(rmnRemoteAddresses, config.RMNRemoteAddresses)

genericConfig := chainaccess.GenericConfig{
CommitteeConfig: chainaccess.CommitteeConfig{
OnRampAddresses: config.OnRampAddresses,
RMNRemoteAddresses: rmnRemoteAddresses,
},
ExecutorConfig: normalizedConfig.ExecutorConfig,
}
return normalizedConfig, &config.BlockchainInfos, nil
return normalizedConfig, &config.BlockchainInfos, genericConfig, nil
}
21 changes: 15 additions & 6 deletions executor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,21 @@ type Configuration struct {
// This is used to configure the chain-specific configuration for each chain such as addresses, executor pool, and execution interval.
ChainConfiguration map[string]ChainConfiguration `toml:"chain_configuration"`
WorkerCount int `toml:"worker_count"`

// ExecutorConfig holds destination-side contract addresses. Embedding it here places
// off_ramp_addresses and execution_visibility_window at the same top-level TOML location
// as in GenericConfig, making the executor config a valid overlay target.
chainaccess.ExecutorConfig
}

// ChainConfiguration is all the configuration an executor needs to know about a specific chain.
// This is separate from chain-specific RPC information in BlockchainInfos.
// ChainConfiguration is all the executor-specific configuration for a single destination chain.
type ChainConfiguration struct {
// RMN address is the address of the RMN contract to check for curse state.
// RmnAddress is the address of the RMN Remote contract on this destination chain.
RmnAddress string `toml:"rmn_address"`
// OffRamp address is the address of the offramp contract to send messages to.
// OffRampAddress is the address of the OffRamp contract on this destination chain.
// Takes precedence over the top-level off_ramp_addresses map when both are present.
OffRampAddress string `toml:"off_ramp_address"`
// Executor pool is the list of executor IDs used for turn taking. This executor's ID must be in the list.
// ExecutorPool is the list of executor IDs used for turn taking. This executor's ID must be in the list.
ExecutorPool []string `toml:"executor_pool"`
// ExecutionInterval is how long each executor has to process a message before the next executor in the cluster takes over.
ExecutionInterval time.Duration `toml:"execution_interval"`
Expand Down Expand Up @@ -130,7 +135,7 @@ func (c *Configuration) Validate() error {
if chainConfig.RmnAddress == "" {
return fmt.Errorf("rmn_address must be configured for chain %s", chainSel)
}
if chainConfig.OffRampAddress == "" {
if chainConfig.OffRampAddress == "" && c.OffRampAddresses[chainSel] == "" {
return fmt.Errorf("off_ramp_address must be configured for chain %s", chainSel)
}
if chainConfig.DefaultExecutorAddress == "" {
Expand Down Expand Up @@ -174,6 +179,10 @@ func (c *Configuration) GetNormalizedConfig() (*Configuration, error) {
normalized.LookbackWindow = parseOrDefault(c.LookbackWindow, lookbackWindowDefault)
normalized.ReaderCacheExpiry = parseOrDefault(c.ReaderCacheExpiry, readerCacheExpiryDefault)
normalized.MaxRetryDuration = parseOrDefault(c.MaxRetryDuration, maxRetryDurationDefault)
// Default ExecutionVisibilityWindow to MaxRetryDuration when not explicitly set.
if c.ExecutionVisibilityWindow == 0 {
normalized.ExecutionVisibilityWindow = normalized.MaxRetryDuration
}
if c.IndexerQueryLimit == 0 {
normalized.IndexerQueryLimit = IndexerQueryLimitDefault
}
Expand Down
10 changes: 6 additions & 4 deletions executor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"time"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess"
)

func validChainConfig() ChainConfiguration {
return ChainConfiguration{
RmnAddress: "0x1234567890abcdef",
OffRampAddress: "0xabcdef1234567890",
DefaultExecutorAddress: "0xdeadbeef12345678",
ExecutorPool: []string{"executor-1", "executor-2"},
}
Expand All @@ -20,6 +21,9 @@ func validConfig() Configuration {
return Configuration{
ExecutorID: "executor-1",
IndexerAddress: []string{"http://indexer1:8100"},
ExecutorConfig: chainaccess.ExecutorConfig{
OffRampAddresses: map[string]string{"1": "0xabcdef1234567890"},
},
ChainConfiguration: map[string]ChainConfiguration{
"1": validChainConfig(),
},
Expand Down Expand Up @@ -177,9 +181,7 @@ func TestConfiguration_Validate(t *testing.T) {
name: "missing_offramp_address_fails",
config: func() Configuration {
c := validConfig()
cc := validChainConfig()
cc.OffRampAddress = ""
c.ChainConfiguration = map[string]ChainConfiguration{"1": cc}
c.OffRampAddresses = nil
return c
}(),
wantErrContains: "off_ramp_address must be configured",
Expand Down
72 changes: 54 additions & 18 deletions integration/pkg/accessors/evm/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,32 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"

chainsel "github.com/smartcontractkit/chain-selectors"
"github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/gobindings/generated/latest/onramp"
executormonitoring "github.com/smartcontractkit/chainlink-ccv/executor/pkg/monitoring"
"github.com/smartcontractkit/chainlink-ccv/integration/pkg/destinationreader"
"github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess"
"github.com/smartcontractkit/chainlink-ccv/protocol"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-evm/pkg/client"
"github.com/smartcontractkit/chainlink-evm/pkg/heads"
)

const defaultExecutionVisibilityWindow = 8 * time.Hour

type factory struct {
lggr logger.Logger
// TODO: put these in a single map.
onRampAddresses map[protocol.ChainSelector]string
rmnRemoteAddresses map[protocol.ChainSelector]string
headTrackers map[protocol.ChainSelector]heads.Tracker
chainClients map[protocol.ChainSelector]client.Client
onRampAddresses map[protocol.ChainSelector]string
rmnRemoteAddresses map[protocol.ChainSelector]string
offRampAddresses map[protocol.ChainSelector]string
executionVisibilityWindow time.Duration
headTrackers map[protocol.ChainSelector]heads.Tracker
chainClients map[protocol.ChainSelector]client.Client
}

// NewFactory creates a new EVM AccessorFactory.
Expand All @@ -31,16 +38,23 @@ type factory struct {
func NewFactory(
lggr logger.Logger,
// TODO: use ethereum address instead of string
onRampAddresses, rmnRemoteAddresses map[protocol.ChainSelector]string,
onRampAddresses, rmnRemoteAddresses, offRampAddresses map[protocol.ChainSelector]string,
executionVisibilityWindow time.Duration,
headTrackers map[protocol.ChainSelector]heads.Tracker,
chainClients map[protocol.ChainSelector]client.Client,
) chainaccess.AccessorFactory {
evw := executionVisibilityWindow
if evw == 0 {
evw = defaultExecutionVisibilityWindow
}
return &factory{
lggr: lggr,
onRampAddresses: onRampAddresses,
rmnRemoteAddresses: rmnRemoteAddresses,
headTrackers: headTrackers,
chainClients: chainClients,
lggr: lggr,
onRampAddresses: onRampAddresses,
rmnRemoteAddresses: rmnRemoteAddresses,
offRampAddresses: offRampAddresses,
executionVisibilityWindow: evw,
headTrackers: headTrackers,
chainClients: chainClients,
}
}

Expand Down Expand Up @@ -103,17 +117,32 @@ func (f *factory) GetAccessor(ctx context.Context, chainSelector protocol.ChainS
return nil, fmt.Errorf("failed to create EVM source reader: %w", err)
}

return newAccessor(evmSourceReader), nil
}
var destReader chainaccess.DestinationReader
if offRampAddr := f.offRampAddresses[chainSelector]; offRampAddr != "" {
dr, err := destinationreader.NewEvmDestinationReader(destinationreader.Params{
Lggr: f.lggr,
ChainSelector: chainSelector,
ChainClient: chainClient,
OfframpAddress: offRampAddr,
RmnRemoteAddress: f.rmnRemoteAddresses[chainSelector],
ExecutionVisabilityWindow: f.executionVisibilityWindow,
Monitoring: executormonitoring.NewNoopExecutorMonitoring(),
})
if err != nil {
return nil, fmt.Errorf("failed to create EVM destination reader for chain %d: %w", chainSelector, err)
}
destReader = dr
}

type accessor struct {
sourceReader chainaccess.SourceReader
return &accessor{
sourceReader: evmSourceReader,
destinationReader: destReader,
}, nil
}

func newAccessor(sourceReader chainaccess.SourceReader) chainaccess.Accessor {
return &accessor{
sourceReader: sourceReader,
}
type accessor struct {
sourceReader chainaccess.SourceReader
destinationReader chainaccess.DestinationReader
}

func (a *accessor) SourceReader() chainaccess.SourceReader {
Expand All @@ -122,3 +151,10 @@ func (a *accessor) SourceReader() chainaccess.SourceReader {
}
return a.sourceReader
}

func (a *accessor) GetDestinationReader() (chainaccess.DestinationReader, bool) {
if a == nil || a.destinationReader == nil {
return nil, false
}
return a.destinationReader, true
}
3 changes: 2 additions & 1 deletion integration/pkg/accessors/evm/factory_constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func CreateAccessorFactory(
// Convert from map[string]string -> map[chainsel]string
onRampInfos := chainaccess.Infos[string](generic.OnRampAddresses).GetAllInfos()
rmnRemoteInfos := chainaccess.Infos[string](generic.RMNRemoteAddresses).GetAllInfos()
offRampInfos := chainaccess.Infos[string](generic.OffRampAddresses).GetAllInfos()

return NewFactory(lggr, onRampInfos, rmnRemoteInfos, headTrackers, chainClients), nil
return NewFactory(lggr, onRampInfos, rmnRemoteInfos, offRampInfos, generic.ExecutionVisibilityWindow, headTrackers, chainClients), nil
}
Loading
Loading