diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 75662764628..b6523b47171 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -678,6 +678,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess opts = append(opts, blob.WithReprovideInterval(-1)) } + opts = append(opts, blob.WithUseBloomCache(builder.BitswapBloomCacheEnabled)) + var err error bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, builder.ExecutionDatastoreManager.Datastore(), opts...) if err != nil { diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index c35a2d42a05..8bd23c17465 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -432,6 +432,8 @@ func (exeNode *ExecutionNode) LoadBlobService( opts = append(opts, blob.WithReprovideInterval(-1)) } + opts = append(opts, blob.WithUseBloomCache(node.BitswapBloomCacheEnabled)) + if exeNode.exeConf.blobstoreRateLimit > 0 && exeNode.exeConf.blobstoreBurstLimit > 0 { opts = append(opts, blob.WithRateLimit(float64(exeNode.exeConf.blobstoreRateLimit), exeNode.exeConf.blobstoreBurstLimit)) } diff --git a/cmd/node_builder.go b/cmd/node_builder.go index bd1626cb627..7cfef4024e8 100644 --- a/cmd/node_builder.go +++ b/cmd/node_builder.go @@ -185,6 +185,13 @@ type BaseConfig struct { // This is only meaningful to Access and Execution nodes. BitswapReprovideEnabled bool + // BitswapBloomCacheEnabled configures whether the Bitswap bloom cache is enabled. + // When disabled, uses a plain blockstore instead of cached blockstore, avoiding + // the CPU cost of building the bloom filter on startup. Pebble's built-in bloom + // filters (persisted in SSTables) are still used for efficient lookups. + // This is only meaningful to Access and Execution nodes. + BitswapBloomCacheEnabled bool + TransactionFeesDisabled bool } @@ -296,12 +303,13 @@ func DefaultBaseConfig() *BaseConfig { Duration: 10 * time.Second, }, - HeroCacheMetricsEnable: false, - SyncCoreConfig: chainsync.DefaultConfig(), - CodecFactory: codecFactory, - ComplianceConfig: compliance.DefaultConfig(), - DhtSystemEnabled: true, - BitswapReprovideEnabled: true, + HeroCacheMetricsEnable: false, + SyncCoreConfig: chainsync.DefaultConfig(), + CodecFactory: codecFactory, + ComplianceConfig: compliance.DefaultConfig(), + DhtSystemEnabled: true, + BitswapReprovideEnabled: true, + BitswapBloomCacheEnabled: true, // default: use cached blockstore TODO leo: change default to false } } diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index a6bee57590c..1823368640a 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -1233,6 +1233,8 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS ), } + opts = append(opts, blob.WithUseBloomCache(builder.BitswapBloomCacheEnabled)) + var err error bs, err = node.EngineRegistry.RegisterBlobService(channels.PublicExecutionDataService, ds, opts...) if err != nil { diff --git a/cmd/scaffold.go b/cmd/scaffold.go index 142f2d55c6d..e5008b345f4 100644 --- a/cmd/scaffold.go +++ b/cmd/scaffold.go @@ -213,6 +213,10 @@ func (fnb *FlowNodeBuilder) BaseFlags() { "bitswap-reprovide-enabled", defaultConfig.BitswapReprovideEnabled, "[experimental] whether to enable bitswap reproviding. This is an experimental feature. Use with caution.") + fnb.flags.BoolVar(&fnb.BaseConfig.BitswapBloomCacheEnabled, + "bitswap-bloom-cache-enabled", + defaultConfig.BitswapBloomCacheEnabled, + "[experimental] whether to enable bitswap bloom cache. When disabled, uses a plain blockstore instead of cached blockstore, avoiding the CPU cost of building the bloom filter on startup. Pebble's built-in bloom filters (persisted in SSTables) are still used. This is an experimental feature. Use with caution.") // dynamic node startup flags fnb.flags.StringVar(&fnb.BaseConfig.DynamicStartupANPubkey, diff --git a/integration/tests/access/cohort3/execution_state_sync_test.go b/integration/tests/access/cohort3/execution_state_sync_test.go index 08cb0d6c8dc..fc2e46d402d 100644 --- a/integration/tests/access/cohort3/execution_state_sync_test.go +++ b/integration/tests/access/cohort3/execution_state_sync_test.go @@ -196,13 +196,25 @@ func (s *ExecutionStateSyncSuite) executionDataForHeight(ctx context.Context, no BlockId: header.ID[:], EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, }) + if err != nil { + s.log.Info(). + Uint64("height", height). + Hex("block_id", header.ID[:]). + Err(err). + Msg("failed to get execution data") return err } blockED, err = convert.MessageToBlockExecutionData(ed.GetBlockExecutionData(), flow.Localnet.Chain()) s.Require().NoError(err, "could not convert execution data") + s.log.Info(). + Uint64("height", height). + Hex("block_id", header.ID[:]). + Int("chunks", len(blockED.ChunkExecutionDatas)). + Msg("successfully retrieved execution data") + return err }), "could not get execution data for block %d", height) diff --git a/network/p2p/blob/blob_service.go b/network/p2p/blob/blob_service.go index b4a1257b200..abb81c24f83 100644 --- a/network/p2p/blob/blob_service.go +++ b/network/p2p/blob/blob_service.go @@ -55,6 +55,7 @@ var _ component.Component = (*blobService)(nil) type BlobServiceConfig struct { ReprovideInterval time.Duration // the interval at which the DHT provider entries are refreshed BitswapOptions []bitswap.Option // options to pass to the Bitswap service + UseBloomCache bool // if true, use the bloom cache (cached blockstore), otherwise use plain blockstore } // WithReprovideInterval sets the interval at which DHT provider entries are refreshed @@ -98,6 +99,17 @@ func WithRateLimit(r float64, b int) network.BlobServiceOption { } } +// WithUseBloomCache enables or disables the bloom cache. +// When enabled (true), uses a cached blockstore with bloom filter (default behavior). +// When disabled (false), uses a plain blockstore instead, avoiding the CPU cost of building +// the bloom filter on startup by scanning all keys. Pebble's built-in bloom filters +// (persisted in SSTables) are still used for efficient lookups. +func WithUseBloomCache(use bool) network.BlobServiceOption { + return func(bs network.BlobService) { + bs.(*blobService).config.UseBloomCache = use + } +} + // NewBlobService creates a new BlobService. func NewBlobService( host host.Host, @@ -109,26 +121,35 @@ func NewBlobService( opts ...network.BlobServiceOption, ) (*blobService, error) { bsNetwork := bsnet.NewFromIpfsHost(host, r, bsnet.Prefix(protocol.ID(prefix))) - blockStore, err := blockstore.CachedBlockstore( - context.Background(), - blockstore.NewBlockstore(ds), - blockstore.DefaultCacheOpts(), - ) - if err != nil { - return nil, fmt.Errorf("failed to create cached blockstore: %w", err) - } + + blockStore := blockstore.NewBlockstore(ds) + bs := &blobService{ prefix: prefix, config: &BlobServiceConfig{ ReprovideInterval: DefaultReprovideInterval, + UseBloomCache: true, // default: use bloom cache }, blockStore: blockStore, } + // Apply options before creating blockstore, as UseBloomCache affects blockstore creation for _, opt := range opts { opt(bs) } + if bs.config.UseBloomCache { + cachedBlockStore, err := blockstore.CachedBlockstore( + context.Background(), + blockStore, + blockstore.DefaultCacheOpts(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create cached blockstore: %w", err) + } + bs.blockStore = cachedBlockStore + } + cm := component.NewComponentManagerBuilder(). AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { btswp := bitswap.New(ctx, bsNetwork, bs.blockStore, bs.config.BitswapOptions...)