diff --git a/.changes/changed/2859.md b/.changes/changed/2859.md new file mode 100644 index 00000000000..b4701fc89e5 --- /dev/null +++ b/.changes/changed/2859.md @@ -0,0 +1 @@ +Swap out off-chain worker compression for dedicated compression service in `fuel-core-bin`. diff --git a/.changes/removed/2859.md b/.changes/removed/2859.md new file mode 100644 index 00000000000..ea9b5c2258b --- /dev/null +++ b/.changes/removed/2859.md @@ -0,0 +1 @@ +Removed DA compression from off-chain worker in favor of dedicated compression service in `fuel-core-bin`. diff --git a/Cargo.lock b/Cargo.lock index 794775d8a88..31816412a54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3368,7 +3368,7 @@ dependencies = [ "enum-iterator", "fuel-core", "fuel-core-chain-config", - "fuel-core-compression", + "fuel-core-compression-service", "fuel-core-consensus-module", "fuel-core-database", "fuel-core-executor", @@ -3397,7 +3397,6 @@ dependencies = [ "mockall", "num_cpus", "parking_lot", - "paste", "postcard", "proptest", "rand 0.8.5", @@ -3480,7 +3479,6 @@ dependencies = [ "dotenvy", "fuel-core", "fuel-core-chain-config", - "fuel-core-compression", "fuel-core-metrics", "fuel-core-shared-sequencer", "fuel-core-storage", @@ -3973,6 +3971,7 @@ dependencies = [ "fuel-core-bin", "fuel-core-client", "fuel-core-compression", + "fuel-core-compression-service", "fuel-core-executor", "fuel-core-gas-price-service", "fuel-core-p2p", diff --git a/Cargo.toml b/Cargo.toml index 7d749aa882a..bec263a5f6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,7 @@ fuel-core-keygen-bin = { version = "0.42.0", path = "./bin/keygen" } fuel-core-chain-config = { version = "0.42.0", path = "./crates/chain-config", default-features = false } fuel-core-client = { version = "0.42.0", path = "./crates/client" } fuel-core-compression = { version = "0.42.0", path = "./crates/compression" } +fuel-core-compression-service = { version = "0.42.0", path = "./crates/services/compression" } fuel-core-database = { version = "0.42.0", path = "./crates/database" } fuel-core-metrics = { version = "0.42.0", path = "./crates/metrics" } fuel-core-services = { version = "0.42.0", path = "./crates/services" } diff --git a/benches/benches/block_target_gas.rs b/benches/benches/block_target_gas.rs index e1384e65a81..0a7da601888 100644 --- a/benches/benches/block_target_gas.rs +++ b/benches/benches/block_target_gas.rs @@ -359,6 +359,7 @@ fn service_with_many_contracts( Default::default(), Default::default(), Default::default(), + Default::default(), ), config.clone(), ) diff --git a/bin/fuel-core/Cargo.toml b/bin/fuel-core/Cargo.toml index aadd89f5c52..deaf51fb2e8 100644 --- a/bin/fuel-core/Cargo.toml +++ b/bin/fuel-core/Cargo.toml @@ -27,7 +27,6 @@ dirs = "4.0" dotenvy = { version = "0.15", optional = true } fuel-core = { workspace = true, features = ["wasm-executor"] } fuel-core-chain-config = { workspace = true } -fuel-core-compression = { workspace = true } fuel-core-metrics = { workspace = true } fuel-core-shared-sequencer = { workspace = true, optional = true } fuel-core-types = { workspace = true, features = ["std"] } @@ -84,7 +83,6 @@ production = [ ] parallel-executor = ["fuel-core/parallel-executor"] fault-proving = [ - "fuel-core-compression/fault-proving", "fuel-core-storage/fault-proving", "fuel-core-types/fault-proving", "fuel-core-chain-config/fault-proving", diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index f7400ad0a81..ab7ac35e499 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -22,13 +22,15 @@ use fuel_core::{ CombinedDatabaseConfig, }, fuel_core_graphql_api::{ - worker_service::DaCompressionConfig, Costs, ServiceConfig as GraphQLConfig, }, producer::Config as ProducerConfig, service::{ - config::Trigger, + config::{ + DaCompressionMode, + Trigger, + }, genesis::NotifyCancel, Config, DbType, @@ -526,12 +528,12 @@ impl Command { let gas_price_metrics = metrics.is_enabled(Module::GasPrice); let da_compression = match da_compression { - Some(retention) => { - DaCompressionConfig::Enabled(fuel_core_compression::Config { - temporal_registry_retention: retention.into(), - }) - } - None => DaCompressionConfig::Disabled, + Some(retention_duration) => DaCompressionMode::Enabled( + fuel_core::service::config::DaCompressionConfig { + retention_duration: retention_duration.into(), + }, + ), + None => DaCompressionMode::Disabled, }; let TxPoolArgs { diff --git a/crates/fuel-core/Cargo.toml b/crates/fuel-core/Cargo.toml index 0616da56643..001ddd76e7c 100644 --- a/crates/fuel-core/Cargo.toml +++ b/crates/fuel-core/Cargo.toml @@ -21,7 +21,7 @@ cosmrs = { version = "0.21", optional = true } derive_more = { version = "0.99" } enum-iterator = { workspace = true } fuel-core-chain-config = { workspace = true, features = ["std"] } -fuel-core-compression = { workspace = true } +fuel-core-compression-service = { workspace = true } fuel-core-consensus-module = { workspace = true } fuel-core-database = { workspace = true } fuel-core-executor = { workspace = true, features = ["std"] } @@ -49,7 +49,6 @@ itertools = { workspace = true } mockall = { workspace = true, optional = true } num_cpus = { version = "1.16.0", optional = true } parking_lot = { workspace = true } -paste = { workspace = true } postcard = { workspace = true } rand = { workspace = true } rocksdb = { version = "0.21", default-features = false, features = [ @@ -110,7 +109,6 @@ test-helpers = [ "fuel-core-p2p?/test-helpers", "fuel-core-storage/test-helpers", "fuel-core-chain-config/test-helpers", - "fuel-core-compression/test-helpers", "fuel-core-txpool/test-helpers", "fuel-core-tx-status-manager/test-helpers", "fuel-core-services/test-helpers", @@ -123,7 +121,6 @@ rocksdb-production = ["rocksdb", "rocksdb/jemalloc"] wasm-executor = ["fuel-core-upgradable-executor/wasm-executor"] parallel-executor = ["fuel-core-parallel-executor"] fault-proving = [ - "fuel-core-compression/fault-proving", "fuel-core-types/fault-proving", "fuel-core-executor/fault-proving", "fuel-core-storage/fault-proving", diff --git a/crates/fuel-core/src/combined_database.rs b/crates/fuel-core/src/combined_database.rs index 1a9a328f0a4..dd18bccd14d 100644 --- a/crates/fuel-core/src/combined_database.rs +++ b/crates/fuel-core/src/combined_database.rs @@ -7,6 +7,7 @@ use crate::state::{ use crate::{ database::{ database_description::{ + compression::CompressionDatabase, gas_price::GasPriceDatabase, off_chain::OffChain, on_chain::OnChain, @@ -55,6 +56,7 @@ pub struct CombinedDatabase { off_chain: Database, relayer: Database, gas_price: Database, + compression: Database, } impl CombinedDatabase { @@ -63,12 +65,14 @@ impl CombinedDatabase { off_chain: Database, relayer: Database, gas_price: Database, + compression: Database, ) -> Self { Self { on_chain, off_chain, relayer, gas_price, + compression, } } @@ -78,6 +82,7 @@ impl CombinedDatabase { crate::state::rocks_db::RocksDb::::prune(path)?; crate::state::rocks_db::RocksDb::::prune(path)?; crate::state::rocks_db::RocksDb::::prune(path)?; + crate::state::rocks_db::RocksDb::::prune(path)?; Ok(()) } @@ -118,6 +123,9 @@ impl CombinedDatabase { crate::state::rocks_db::RocksDb::::backup(db_dir, temp_dir) .trace_err("Failed to backup gas-price database")?; + crate::state::rocks_db::RocksDb::::backup(db_dir, temp_dir) + .trace_err("Failed to backup compression database")?; + Ok(()) } @@ -149,22 +157,28 @@ impl CombinedDatabase { temp_restore_dir: &std::path::Path, ) -> crate::database::Result<()> { crate::state::rocks_db::RocksDb::::restore(temp_restore_dir, backup_dir) - .trace_err("Failed to backup on-chain database")?; + .trace_err("Failed to restore on-chain database")?; crate::state::rocks_db::RocksDb::::restore( temp_restore_dir, backup_dir, ) - .trace_err("Failed to backup off-chain database")?; + .trace_err("Failed to restore off-chain database")?; crate::state::rocks_db::RocksDb::::restore(temp_restore_dir, backup_dir) - .trace_err("Failed to backup relayer database")?; + .trace_err("Failed to restore relayer database")?; crate::state::rocks_db::RocksDb::::restore( temp_restore_dir, backup_dir, ) - .trace_err("Failed to backup gas-price database")?; + .trace_err("Failed to restore gas-price database")?; + + crate::state::rocks_db::RocksDb::::restore( + temp_restore_dir, + backup_dir, + ) + .trace_err("Failed to restore compression database")?; Ok(()) } @@ -215,11 +229,20 @@ impl CombinedDatabase { ..database_config }, )?; + let compression = Database::open_rocksdb( + path, + state_rewind_policy, + DatabaseConfig { + max_fds, + ..database_config + }, + )?; Ok(Self { on_chain, off_chain, relayer, gas_price, + compression, }) } @@ -234,6 +257,7 @@ impl CombinedDatabase { off_chain: Database::rocksdb_temp(state_rewind_policy, database_config)?, relayer: Default::default(), gas_price: Default::default(), + compression: Default::default(), }) } @@ -278,6 +302,7 @@ impl CombinedDatabase { Database::in_memory(), Database::in_memory(), Database::in_memory(), + Database::in_memory(), ) } @@ -286,6 +311,7 @@ impl CombinedDatabase { self.off_chain.check_version()?; self.relayer.check_version()?; self.gas_price.check_version()?; + self.compression.check_version()?; Ok(()) } @@ -293,6 +319,10 @@ impl CombinedDatabase { &self.on_chain } + pub fn compression(&self) -> &Database { + &self.compression + } + #[cfg(any(feature = "test-helpers", test))] pub fn on_chain_mut(&mut self) -> &mut Database { &mut self.on_chain diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 8547a6d071f..0b88b5992e3 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -24,6 +24,7 @@ use crate::{ KeyValueView, }, }; +use database_description::compression::CompressionDatabase; use fuel_core_chain_config::TableEntry; pub use fuel_core_database::Error; use fuel_core_gas_price_service::common::fuel_core_storage_adapter::storage::GasPriceMetadata; @@ -446,6 +447,15 @@ impl Modifiable for Database { } } +impl Modifiable for Database { + fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { + commit_changes_with_height_update(self, changes, |iter| { + iter.iter_all_keys::(Some(IterDirection::Reverse)) + .try_collect() + }) + } +} + #[cfg(not(feature = "relayer"))] impl Modifiable for Database { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { diff --git a/crates/fuel-core/src/database/database_description.rs b/crates/fuel-core/src/database/database_description.rs index 2c955df91ed..f7eebb96762 100644 --- a/crates/fuel-core/src/database/database_description.rs +++ b/crates/fuel-core/src/database/database_description.rs @@ -7,6 +7,7 @@ use fuel_core_types::{ use std::collections::HashSet; use strum::IntoEnumIterator; +pub mod compression; pub mod gas_price; pub mod off_chain; pub mod on_chain; diff --git a/crates/fuel-core/src/database/database_description/compression.rs b/crates/fuel-core/src/database/database_description/compression.rs new file mode 100644 index 00000000000..3cb94016135 --- /dev/null +++ b/crates/fuel-core/src/database/database_description/compression.rs @@ -0,0 +1,28 @@ +use crate::database::database_description::DatabaseDescription; +use fuel_core_compression_service::storage::column::CompressionColumn; +use fuel_core_storage::merkle::column::MerkleizedColumn; +use fuel_core_types::fuel_types::BlockHeight; + +#[derive(Clone, Copy, Debug)] +pub struct CompressionDatabase; + +impl DatabaseDescription for CompressionDatabase { + type Column = MerkleizedColumn; + type Height = BlockHeight; + + fn version() -> u32 { + 0 + } + + fn name() -> String { + "compression".to_string() + } + + fn metadata_column() -> Self::Column { + Self::Column::MerkleMetadataColumn + } + + fn prefix(_column: &Self::Column) -> Option { + None + } +} diff --git a/crates/fuel-core/src/graphql_api.rs b/crates/fuel-core/src/graphql_api.rs index 0761fda3197..5b6e6375657 100644 --- a/crates/fuel-core/src/graphql_api.rs +++ b/crates/fuel-core/src/graphql_api.rs @@ -11,7 +11,6 @@ use std::{ pub mod api_service; pub(crate) mod block_height_subscription; -pub mod da_compression; pub mod database; pub(crate) mod extensions; pub(crate) mod indexation; diff --git a/crates/fuel-core/src/graphql_api/api_service.rs b/crates/fuel-core/src/graphql_api/api_service.rs index 25c0f0f7a24..31cd310d148 100644 --- a/crates/fuel-core/src/graphql_api/api_service.rs +++ b/crates/fuel-core/src/graphql_api/api_service.rs @@ -100,6 +100,7 @@ use super::{ block_height_subscription, ports::{ worker, + DatabaseDaCompressedBlocks, OnChainDatabaseAt, }, }; @@ -116,6 +117,8 @@ pub type GasPriceProvider = Box; pub type ChainInfoProvider = Box; +pub type DaCompressionProvider = Box; + #[derive(Clone)] pub struct SharedState { pub bound_address: SocketAddr, @@ -245,6 +248,7 @@ pub fn new_service( chain_state_info_provider: ChainInfoProvider, memory_pool: SharedMemoryPool, block_height_subscriber: block_height_subscription::Subscriber, + da_compression_provider: DaCompressionProvider, ) -> anyhow::Result where OnChain: HistoricalView + 'static, @@ -303,6 +307,7 @@ where .data(gas_price_provider) .data(chain_state_info_provider) .data(memory_pool) + .data(da_compression_provider) .data(block_height_subscriber.clone()) .extension(ValidationExtension::new( max_queries_resolver_recursive_depth, diff --git a/crates/fuel-core/src/graphql_api/da_compression.rs b/crates/fuel-core/src/graphql_api/da_compression.rs deleted file mode 100644 index f04f73a8222..00000000000 --- a/crates/fuel-core/src/graphql_api/da_compression.rs +++ /dev/null @@ -1,577 +0,0 @@ -use crate::fuel_core_graphql_api::{ - ports::worker::OffChainDatabaseTransaction, - storage::da_compression::{ - evictor_cache::MetadataKey, - timestamps::{ - TimestampKey, - TimestampKeyspace, - }, - *, - }, -}; -use fuel_core_compression::{ - compress::compress, - config::Config, - ports::{ - EvictorDb, - HistoryLookup, - TemporalRegistry, - UtxoIdToPointer, - }, -}; -use fuel_core_storage::{ - not_found, - tables::{ - Coins, - FuelBlocks, - Messages, - }, - StorageAsMut, - StorageAsRef, - StorageInspect, -}; -use fuel_core_types::{ - blockchain::block::Block, - fuel_tx::{ - input::PredicateCode, - Address, - AssetId, - ContractId, - ScriptCode, - }, - services::executor::Event, - tai64::Tai64, -}; -use futures::FutureExt; - -/// Performs DA compression for a block and stores it in the database. -pub fn da_compress_block( - config: Config, - block: &Block, - block_events: &[Event], - db_tx: &mut T, -) -> anyhow::Result<()> -where - T: OffChainDatabaseTransaction, -{ - let compressed = compress( - config, - CompressDbTx { - db_tx: DbTx { db_tx }, - block_events, - }, - block, - ) - .now_or_never() - .expect("The current implementation resolved all futures instantly")?; - - db_tx - .storage_as_mut::() - .insert(&block.header().consensus().height, &compressed)?; - - Ok(()) -} - -pub struct DbTx<'a, Tx> { - pub db_tx: &'a mut Tx, -} - -struct CompressDbTx<'a, Tx> { - db_tx: DbTx<'a, Tx>, - block_events: &'a [Event], -} - -pub struct DecompressDbTx<'a, Tx, Onchain> { - pub db_tx: DbTx<'a, Tx>, - pub onchain_db: Onchain, -} - -#[cfg(not(feature = "fault-proving"))] -mod v1_impl_temporal_registry { - use super::*; - - macro_rules! impl_temporal_registry { - ($type:ty) => { paste::paste! { - impl<'a, Tx> TemporalRegistry<$type> for DbTx<'a, Tx> - where - Tx: OffChainDatabaseTransaction, - { - fn read_registry( - &self, - key: &fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result<$type> { - Ok(self - .db_tx - .storage_as_ref::<[< DaCompressionTemporalRegistry $type >]>() - .get(key)? - .ok_or(not_found!([< DaCompressionTemporalRegistry $type>]))? - .into_owned()) - } - - fn read_timestamp( - &self, - key: &fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result { - Ok(self - .db_tx - .storage_as_ref::<[< DaCompressionTemporalRegistryTimestamps >]>() - .get(&TimestampKey { - keyspace: TimestampKeyspace::$type, - key: *key, - })? - .ok_or(not_found!(DaCompressionTemporalRegistryTimestamps))? - .into_owned()) - } - - fn write_registry( - &mut self, - key: &fuel_core_types::fuel_compression::RegistryKey, - value: &$type, - timestamp: Tai64, - ) -> anyhow::Result<()> { - // Write the actual value - let old_value = self.db_tx - .storage_as_mut::<[< DaCompressionTemporalRegistry $type >]>() - .replace(key, value)?; - - // Remove the overwritten value from index, if any - if let Some(old_value) = old_value { - let old_reverse_key = (&old_value).into(); - self.db_tx - .storage_as_mut::() - .remove(&old_reverse_key)?; - } - - // Add the new value to the index - let reverse_key = value.into(); - self.db_tx - .storage_as_mut::() - .insert(&reverse_key, key)?; - - // Update the timestamp - self.db_tx - .storage_as_mut::() - .insert(&TimestampKey { keyspace: TimestampKeyspace::$type, key: *key }, ×tamp)?; - - Ok(()) - } - - fn registry_index_lookup( - &self, - value: &$type, - ) -> anyhow::Result> - { - let reverse_key = value.into(); - Ok(self - .db_tx - .storage_as_ref::() - .get(&reverse_key)? - .map(|v| v.into_owned())) - } - } - - impl<'a, Tx> TemporalRegistry<$type> for CompressDbTx<'a, Tx> - where - Tx: OffChainDatabaseTransaction, - { - fn read_registry( - &self, - key: &fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result<$type> { - self.db_tx.read_registry(key) - } - - fn read_timestamp( - &self, - key: &fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result { - <_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key) - } - - fn write_registry( - &mut self, - key: &fuel_core_types::fuel_compression::RegistryKey, - value: &$type, - timestamp: Tai64, - ) -> anyhow::Result<()> { - self.db_tx.write_registry(key, value, timestamp) - } - - fn registry_index_lookup( - &self, - value: &$type, - ) -> anyhow::Result> - { - self.db_tx.registry_index_lookup(value) - } - } - - impl<'a, Tx, Offchain> TemporalRegistry<$type> for DecompressDbTx<'a, Tx, Offchain> - where - Tx: OffChainDatabaseTransaction, - { - fn read_registry( - &self, - key: &fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result<$type> { - self.db_tx.read_registry(key) - } - - fn read_timestamp( - &self, - key: &fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result { - <_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key) - } - - fn write_registry( - &mut self, - key: &fuel_core_types::fuel_compression::RegistryKey, - value: &$type, - timestamp: Tai64, - ) -> anyhow::Result<()> { - self.db_tx.write_registry(key, value, timestamp) - } - - fn registry_index_lookup( - &self, - value: &$type, - ) -> anyhow::Result> - { - self.db_tx.registry_index_lookup(value) - } - } - - impl<'a, Tx> EvictorDb<$type> for CompressDbTx<'a, Tx> - where - Tx: OffChainDatabaseTransaction, - { - fn set_latest_assigned_key( - &mut self, - key: fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result<()> { - self.db_tx.db_tx - .storage_as_mut::() - .insert(&MetadataKey::$type, &key)?; - Ok(()) - } - - fn get_latest_assigned_key( - &self, - ) -> anyhow::Result> { - Ok(self - .db_tx.db_tx - .storage_as_ref::() - .get(&MetadataKey::$type)? - .map(|v| v.into_owned()) - ) - } - } - - }}; - } - - impl_temporal_registry!(Address); - impl_temporal_registry!(AssetId); - impl_temporal_registry!(ContractId); - impl_temporal_registry!(ScriptCode); - impl_temporal_registry!(PredicateCode); -} - -#[cfg(feature = "fault-proving")] -mod v2_impl_temporal_registry { - use super::*; - - use v2::{ - address::DaCompressionTemporalRegistryAddressV2, - asset_id::DaCompressionTemporalRegistryAssetIdV2, - contract_id::DaCompressionTemporalRegistryContractIdV2, - evictor_cache::DaCompressionTemporalRegistryEvictorCacheV2, - predicate_code::DaCompressionTemporalRegistryPredicateCodeV2, - registry_index::DaCompressionTemporalRegistryIndexV2, - script_code::DaCompressionTemporalRegistryScriptCodeV2, - timestamps::DaCompressionTemporalRegistryTimestampsV2, - }; - - macro_rules! impl_temporal_registry_v2 { - ($type:ty) => { paste::paste! { - impl<'a, Tx> TemporalRegistry<$type> for DbTx<'a, Tx> - where - Tx: OffChainDatabaseTransaction, - { - fn read_registry( - &self, - key: &fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result<$type> { - Ok(self - .db_tx - .storage_as_ref::<[< DaCompressionTemporalRegistry $type V2>]>() - .get(key)? - .ok_or(not_found!([< DaCompressionTemporalRegistry $type V2>]))? - .into_owned()) - } - - fn read_timestamp( - &self, - key: &fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result { - Ok(self - .db_tx - .storage_as_ref::<[< DaCompressionTemporalRegistryTimestampsV2 >]>() - .get(&TimestampKey { - keyspace: TimestampKeyspace::$type, - key: *key, - })? - .ok_or(not_found!(DaCompressionTemporalRegistryTimestampsV2))? - .into_owned()) - } - - fn write_registry( - &mut self, - key: &fuel_core_types::fuel_compression::RegistryKey, - value: &$type, - timestamp: Tai64, - ) -> anyhow::Result<()> { - // Write the actual value - let old_value = self.db_tx - .storage_as_mut::<[< DaCompressionTemporalRegistry $type V2>]>() - .replace(key, value)?; - - // Remove the overwritten value from index, if any - if let Some(old_value) = old_value { - let old_reverse_key = (&old_value).into(); - self.db_tx - .storage_as_mut::() - .remove(&old_reverse_key)?; - } - - // Add the new value to the index - let reverse_key = value.into(); - self.db_tx - .storage_as_mut::() - .insert(&reverse_key, key)?; - - // Update the timestamp - self.db_tx - .storage_as_mut::() - .insert(&TimestampKey { keyspace: TimestampKeyspace::$type, key: *key }, ×tamp)?; - - Ok(()) - } - - fn registry_index_lookup( - &self, - value: &$type, - ) -> anyhow::Result> - { - let reverse_key = value.into(); - Ok(self - .db_tx - .storage_as_ref::() - .get(&reverse_key)? - .map(|v| v.into_owned())) - } - } - - impl<'a, Tx> TemporalRegistry<$type> for CompressDbTx<'a, Tx> - where - Tx: OffChainDatabaseTransaction, - { - fn read_registry( - &self, - key: &fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result<$type> { - self.db_tx.read_registry(key) - } - - fn read_timestamp( - &self, - key: &fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result { - <_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key) - } - - fn write_registry( - &mut self, - key: &fuel_core_types::fuel_compression::RegistryKey, - value: &$type, - timestamp: Tai64, - ) -> anyhow::Result<()> { - self.db_tx.write_registry(key, value, timestamp) - } - - fn registry_index_lookup( - &self, - value: &$type, - ) -> anyhow::Result> - { - self.db_tx.registry_index_lookup(value) - } - } - - impl<'a, Tx, Offchain> TemporalRegistry<$type> for DecompressDbTx<'a, Tx, Offchain> - where - Tx: OffChainDatabaseTransaction, - { - fn read_registry( - &self, - key: &fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result<$type> { - self.db_tx.read_registry(key) - } - - fn read_timestamp( - &self, - key: &fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result { - <_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key) - } - - fn write_registry( - &mut self, - key: &fuel_core_types::fuel_compression::RegistryKey, - value: &$type, - timestamp: Tai64, - ) -> anyhow::Result<()> { - self.db_tx.write_registry(key, value, timestamp) - } - - fn registry_index_lookup( - &self, - value: &$type, - ) -> anyhow::Result> - { - self.db_tx.registry_index_lookup(value) - } - } - - impl<'a, Tx> EvictorDb<$type> for CompressDbTx<'a, Tx> - where - Tx: OffChainDatabaseTransaction, - { - fn set_latest_assigned_key( - &mut self, - key: fuel_core_types::fuel_compression::RegistryKey, - ) -> anyhow::Result<()> { - self.db_tx.db_tx - .storage_as_mut::() - .insert(&MetadataKey::$type, &key)?; - Ok(()) - } - - fn get_latest_assigned_key( - &self, - ) -> anyhow::Result> { - Ok(self - .db_tx.db_tx - .storage_as_ref::() - .get(&MetadataKey::$type)? - .map(|v| v.into_owned()) - ) - } - } - }}; - } - - impl_temporal_registry_v2!(Address); - impl_temporal_registry_v2!(AssetId); - impl_temporal_registry_v2!(ContractId); - impl_temporal_registry_v2!(ScriptCode); - impl_temporal_registry_v2!(PredicateCode); -} - -impl<'a, Tx> UtxoIdToPointer for CompressDbTx<'a, Tx> { - fn lookup( - &self, - utxo_id: fuel_core_types::fuel_tx::UtxoId, - ) -> anyhow::Result { - for event in self.block_events { - match event { - Event::CoinCreated(coin) | Event::CoinConsumed(coin) - if coin.utxo_id == utxo_id => - { - let output_index = coin.utxo_id.output_index(); - return Ok(fuel_core_types::fuel_tx::CompressedUtxoId { - tx_pointer: coin.tx_pointer, - output_index, - }); - } - _ => {} - } - } - anyhow::bail!("UtxoId not found in the block events"); - } -} - -impl<'a, Tx, Onchain> HistoryLookup for DecompressDbTx<'a, Tx, Onchain> -where - Onchain: StorageInspect - + StorageInspect - + StorageInspect, -{ - fn utxo_id( - &self, - c: fuel_core_types::fuel_tx::CompressedUtxoId, - ) -> anyhow::Result { - #[cfg(feature = "test-helpers")] - if c.tx_pointer.block_height() == 0u32.into() { - // This is a genesis coin, which is handled differently. - // See CoinConfigGenerator::generate which generates the genesis coins. - let tx_id = - fuel_core_chain_config::coin_config_helpers::tx_id(c.output_index); - - let utxo_id = fuel_core_types::fuel_tx::UtxoId::new(tx_id, c.output_index); - - return Ok(utxo_id); - } - - let block_info = self - .onchain_db - .storage_as_ref::() - .get(&c.tx_pointer.block_height())? - .ok_or(not_found!(FuelBlocks))?; - - let tx_id = *block_info - .transactions() - .get(c.tx_pointer.tx_index() as usize) - .ok_or(anyhow::anyhow!( - "Transaction not found in the block: {:?}", - c.tx_pointer - ))?; - - Ok(fuel_core_types::fuel_tx::UtxoId::new(tx_id, c.output_index)) - } - - fn coin( - &self, - utxo_id: fuel_core_types::fuel_tx::UtxoId, - ) -> anyhow::Result { - let coin = self - .onchain_db - .storage_as_ref::() - .get(&utxo_id)? - .ok_or(not_found!(fuel_core_storage::tables::Coins))?; - Ok(fuel_core_compression::ports::CoinInfo { - owner: *coin.owner(), - asset_id: *coin.asset_id(), - amount: *coin.amount(), - }) - } - - fn message( - &self, - nonce: fuel_core_types::fuel_types::Nonce, - ) -> anyhow::Result { - let message = self - .onchain_db - .storage_as_ref::() - .get(&nonce)? - .ok_or(not_found!(fuel_core_storage::tables::Messages))?; - Ok(fuel_core_compression::ports::MessageInfo { - sender: *message.sender(), - recipient: *message.recipient(), - amount: message.amount(), - data: message.data().clone(), - }) - } -} diff --git a/crates/fuel-core/src/graphql_api/database.rs b/crates/fuel-core/src/graphql_api/database.rs index 762a9852939..983fac12adb 100644 --- a/crates/fuel-core/src/graphql_api/database.rs +++ b/crates/fuel-core/src/graphql_api/database.rs @@ -430,10 +430,6 @@ impl ReadView { self.off_chain.block_height(block_id) } - pub fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult> { - self.off_chain.da_compressed_block(height) - } - pub fn tx_status(&self, tx_id: &TxId) -> StorageResult { self.off_chain.tx_status(tx_id) } diff --git a/crates/fuel-core/src/graphql_api/ports.rs b/crates/fuel-core/src/graphql_api/ports.rs index 8680177179f..58b80f16964 100644 --- a/crates/fuel-core/src/graphql_api/ports.rs +++ b/crates/fuel-core/src/graphql_api/ports.rs @@ -87,8 +87,6 @@ pub struct CoinsToSpendIndexIter<'a> { pub trait OffChainDatabase: Send + Sync { fn block_height(&self, block_id: &BlockId) -> StorageResult; - fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult>; - fn tx_status( &self, tx_id: &TxId, @@ -197,11 +195,9 @@ pub trait DatabaseBlocks { } /// Trait that specifies all the getters required for DA compressed blocks. -pub trait DatabaseDaCompressedBlocks { +pub trait DatabaseDaCompressedBlocks: Send + Sync { /// Get a DA compressed block by its height. fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult>; - - fn latest_height(&self) -> StorageResult; } /// Trait that specifies all the getters required for messages. @@ -344,7 +340,6 @@ pub mod worker { MessageBalances, }, coins::CoinsToSpendIndex, - da_compression::*, old::{ OldFuelBlockConsensus, OldFuelBlocks, @@ -423,17 +418,7 @@ pub mod worker { + StorageMutate + StorageMutate + StorageMutate - + StorageMutate - + StorageMutate - + StorageMutate - + StorageMutate - + StorageMutate - + StorageMutate - + StorageMutate - + StorageMutate - + StorageMutate + StorageMutate - + MaybeTemporalRegistryV2Bounds { fn record_tx_id_owner( &mut self, @@ -460,75 +445,6 @@ pub mod worker { fn commit(self) -> StorageResult<()>; } - #[cfg(feature = "fault-proving")] - pub mod v2_off_chain_database_tx { - use super::*; - use v2::{ - address::DaCompressionTemporalRegistryAddressV2, - asset_id::DaCompressionTemporalRegistryAssetIdV2, - contract_id::DaCompressionTemporalRegistryContractIdV2, - evictor_cache::DaCompressionTemporalRegistryEvictorCacheV2, - predicate_code::DaCompressionTemporalRegistryPredicateCodeV2, - registry_index::DaCompressionTemporalRegistryIndexV2, - script_code::DaCompressionTemporalRegistryScriptCodeV2, - timestamps::DaCompressionTemporalRegistryTimestampsV2, - }; - - pub trait TemporalRegistryV2Bounds: StorageMutate - + StorageMutate - + StorageMutate - + StorageMutate - + StorageMutate< - DaCompressionTemporalRegistryPredicateCodeV2, - Error = StorageError, - > + StorageMutate - + StorageMutate - + StorageMutate< - DaCompressionTemporalRegistryEvictorCacheV2, - Error = StorageError, - > - { - } - - impl TemporalRegistryV2Bounds for T where - T: StorageMutate< - DaCompressionTemporalRegistryAddressV2, - Error = StorageError, - > + StorageMutate< - DaCompressionTemporalRegistryAssetIdV2, - Error = StorageError, - > + StorageMutate< - DaCompressionTemporalRegistryContractIdV2, - Error = StorageError, - > + StorageMutate< - DaCompressionTemporalRegistryScriptCodeV2, - Error = StorageError, - > + StorageMutate< - DaCompressionTemporalRegistryPredicateCodeV2, - Error = StorageError, - > + StorageMutate - + StorageMutate< - DaCompressionTemporalRegistryTimestampsV2, - Error = StorageError, - > + StorageMutate< - DaCompressionTemporalRegistryEvictorCacheV2, - Error = StorageError, - > - { - } - } - - #[cfg(not(feature = "fault-proving"))] - pub mod not_fault_proving { - pub trait MaybeTemporalRegistryV2Bounds {} - impl MaybeTemporalRegistryV2Bounds for T {} - } - - #[cfg(not(feature = "fault-proving"))] - pub use not_fault_proving::MaybeTemporalRegistryV2Bounds; - #[cfg(feature = "fault-proving")] - pub use v2_off_chain_database_tx::TemporalRegistryV2Bounds as MaybeTemporalRegistryV2Bounds; - pub trait BlockImporter: Send + Sync { /// Returns a stream of imported block. fn block_events(&self) -> BoxStream; diff --git a/crates/fuel-core/src/graphql_api/storage.rs b/crates/fuel-core/src/graphql_api/storage.rs index 7bb311dd5bc..25b89ba68dd 100644 --- a/crates/fuel-core/src/graphql_api/storage.rs +++ b/crates/fuel-core/src/graphql_api/storage.rs @@ -41,7 +41,6 @@ pub mod balances; pub mod blocks; pub mod coins; pub mod contracts; -pub mod da_compression; pub mod messages; pub mod old; pub mod statistic; @@ -96,86 +95,14 @@ pub enum Column { /// Existence of a key in this column means that the message has been spent. /// See [`SpentMessages`](messages::SpentMessages) SpentMessages = 13, - /// DA compression and postcard serialized blocks. - /// See [`DaCompressedBlocks`](da_compression::DaCompressedBlocks) - DaCompressedBlocks = 14, - /// See [`DaCompressionTemporalRegistryIndex`](da_compression::DaCompressionTemporalRegistryIndex) - DaCompressionTemporalRegistryIndex = 15, - /// See [`DaCompressionTemporalRegistryTimestamps`](da_compression::DaCompressionTemporalRegistryTimestamps) - DaCompressionTemporalRegistryTimestamps = 16, - /// See [`DaCompressionTemporalRegistryEvictorCache`](da_compression::DaCompressionTemporalRegistryEvictorCache) - DaCompressionTemporalRegistryEvictorCache = 17, - /// See [`DaCompressionTemporalRegistryAddress`](da_compression::DaCompressionTemporalRegistryAddress) - DaCompressionTemporalRegistryAddress = 18, - /// See [`DaCompressionTemporalRegistryAssetId`](da_compression::DaCompressionTemporalRegistryAssetId) - DaCompressionTemporalRegistryAssetId = 19, - /// See [`DaCompressionTemporalRegistryContractId`](da_compression::DaCompressionTemporalRegistryContractId) - DaCompressionTemporalRegistryContractId = 20, - /// See [`DaCompressionTemporalRegistryScriptCode`](da_compression::DaCompressionTemporalRegistryScriptCode) - DaCompressionTemporalRegistryScriptCode = 21, - /// See [`DaCompressionTemporalRegistryPredicateCode`](da_compression::DaCompressionTemporalRegistryPredicateCode) - DaCompressionTemporalRegistryPredicateCode = 22, /// Coin balances per account and asset. - CoinBalances = 23, + CoinBalances = 14, /// Message balances per account. - MessageBalances = 24, + MessageBalances = 15, /// See [`AssetsInfo`](assets::AssetsInfo) - AssetsInfo = 25, + AssetsInfo = 16, /// Index of the coins that are available to spend. - CoinsToSpend = 26, - /// See [`DaCompressionTemporalRegistryAddressV2`](da_compression::v2::address::DaCompressionTemporalRegistryAddressV2) - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryAddressV2 = 27, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalAddressMerkleData = 28, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalAddressMerkleMetadata = 29, - // See [`DaCompressionTemporalRegistryAssetIdV2`](da_compression::v2::asset_id::DaCompressionTemporalRegistryAssetIdV2) - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryAssetIdV2 = 30, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalAssetIdMerkleData = 31, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalAssetIdMerkleMetadata = 32, - /// See [`DaCompressionTemporalRegistryContractIdV2`](da_compression::v2::contract_id::DaCompressionTemporalRegistryContractIdV2) - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryContractIdV2 = 33, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalContractIdMerkleData = 34, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalContractIdMerkleMetadata = 35, - /// See [`DaCompressionTemporalRegistryScriptCodeV2`](da_compression::v2::script_code::DaCompressionTemporalRegistryScriptCodeV2) - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryScriptCodeV2 = 36, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalScriptCodeMerkleData = 37, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalScriptCodeMerkleMetadata = 38, - /// See [`DaCompressionTemporalRegistryPredicateCodeV2`](da_compression::v2::predicate_code::DaCompressionTemporalRegistryPredicateCodeV2) - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryPredicateCodeV2 = 39, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalPredicateCodeMerkleData = 40, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalPredicateCodeMerkleMetadata = 41, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryIndexV2 = 42, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryIndexMerkleData = 43, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryIndexMerkleMetadata = 44, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryTimestampsV2 = 45, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryTimestampsMerkleData = 46, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryTimestampsMerkleMetadata = 47, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryEvictorCacheV2 = 48, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryEvictorCacheMerkleData = 49, - #[cfg(feature = "fault-proving")] - DaCompressionTemporalRegistryEvictorCacheMerkleMetadata = 50, + CoinsToSpend = 17, } impl Column { diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression.rs b/crates/fuel-core/src/graphql_api/storage/da_compression.rs deleted file mode 100644 index de01736e0eb..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression.rs +++ /dev/null @@ -1,203 +0,0 @@ -use self::{ - evictor_cache::MetadataKey, - predicate_code_codec::PredicateCodeCodec, - reverse_key::ReverseKey, - script_code_codec::ScriptCodeCodec, - timestamps::TimestampKey, -}; -use fuel_core_compression::VersionedCompressedBlock; -use fuel_core_storage::{ - blueprint::plain::Plain, - codec::{ - postcard::Postcard, - primitive::Primitive, - raw::Raw, - }, - structured_storage::TableWithBlueprint, - Mappable, -}; -use fuel_core_types::{ - fuel_compression::RegistryKey, - fuel_tx::{ - input::PredicateCode, - Address, - AssetId, - ContractId, - ScriptCode, - }, - fuel_types::BlockHeight, - tai64::Tai64, -}; - -pub mod evictor_cache; -pub mod predicate_code_codec; -pub mod reverse_key; -pub mod script_code_codec; -pub mod timestamps; - -#[cfg(feature = "fault-proving")] -pub mod v2; - -/// The table for the compressed blocks sent to DA. -pub struct DaCompressedBlocks; - -impl Mappable for DaCompressedBlocks { - type Key = Self::OwnedKey; - type OwnedKey = BlockHeight; - type Value = Self::OwnedValue; - type OwnedValue = VersionedCompressedBlock; -} - -impl TableWithBlueprint for DaCompressedBlocks { - type Blueprint = Plain, Postcard>; - type Column = super::Column; - - fn column() -> Self::Column { - Self::Column::DaCompressedBlocks - } -} - -/// Mapping from the type to the registry key in the temporal registry. -pub struct DaCompressionTemporalRegistryIndex; - -impl Mappable for DaCompressionTemporalRegistryIndex { - type Key = Self::OwnedKey; - type OwnedKey = ReverseKey; - type Value = Self::OwnedValue; - type OwnedValue = RegistryKey; -} - -impl TableWithBlueprint for DaCompressionTemporalRegistryIndex { - // TODO: Use Raw codec for value instead of Postcard - type Blueprint = Plain; - type Column = super::Column; - - fn column() -> Self::Column { - Self::Column::DaCompressionTemporalRegistryIndex - } -} - -/// This table keeps track of last written timestamp for each key, -/// so that we can keep track of expiration. -pub struct DaCompressionTemporalRegistryTimestamps; - -impl Mappable for DaCompressionTemporalRegistryTimestamps { - type Key = Self::OwnedKey; - type OwnedKey = TimestampKey; - type Value = Self::OwnedValue; - type OwnedValue = Tai64; -} - -impl TableWithBlueprint for DaCompressionTemporalRegistryTimestamps { - // TODO: Use Raw codec for value instead of Postcard - type Blueprint = Plain; - type Column = super::Column; - - fn column() -> Self::Column { - Self::Column::DaCompressionTemporalRegistryTimestamps - } -} - -/// This table is used to hold "next key to evict" for each keyspace. -/// In the future we'll likely switch to use LRU or something, in which -/// case this table can be repurposed. -pub struct DaCompressionTemporalRegistryEvictorCache; - -impl Mappable for DaCompressionTemporalRegistryEvictorCache { - type Key = Self::OwnedKey; - type OwnedKey = MetadataKey; - type Value = Self::OwnedValue; - type OwnedValue = RegistryKey; -} - -impl TableWithBlueprint for DaCompressionTemporalRegistryEvictorCache { - // TODO: Use Raw codec for value instead of Postcard - type Blueprint = Plain; - type Column = super::Column; - - fn column() -> Self::Column { - Self::Column::DaCompressionTemporalRegistryEvictorCache - } -} - -macro_rules! temporal_registry { - ($type:ty, $code:ty) => { - paste::paste! { - pub struct [< DaCompressionTemporalRegistry $type >]; - - impl Mappable for [< DaCompressionTemporalRegistry $type >] { - type Key = Self::OwnedKey; - type OwnedKey = RegistryKey; - type Value = Self::OwnedValue; - type OwnedValue = $type; - } - - impl TableWithBlueprint for [< DaCompressionTemporalRegistry $type >] { - // TODO: Use Raw codec for value instead of Postcard - type Blueprint = Plain; - type Column = super::Column; - - fn column() -> Self::Column { - Self::Column::[< DaCompressionTemporalRegistry $type >] - } - } - - - #[cfg(test)] - fuel_core_storage::basic_storage_tests!( - [< DaCompressionTemporalRegistry $type >], - RegistryKey::ZERO, - <[< DaCompressionTemporalRegistry $type >] as Mappable>::Value::default(), - <[< DaCompressionTemporalRegistry $type >] as Mappable>::Value::default(), - tests::generate_key - ); - } - }; -} - -temporal_registry!(Address, Raw); -temporal_registry!(AssetId, Raw); -temporal_registry!(ContractId, Raw); -temporal_registry!(ScriptCode, ScriptCodeCodec); -temporal_registry!(PredicateCode, PredicateCodeCodec); - -#[cfg(test)] -mod tests { - use super::*; - - #[cfg(test)] - fuel_core_storage::basic_storage_tests!( - DaCompressionTemporalRegistryIndex, - ReverseKey::Address(Address::zeroed()), - RegistryKey::ZERO - ); - - #[cfg(test)] - fuel_core_storage::basic_storage_tests!( - DaCompressionTemporalRegistryTimestamps, - TimestampKey { - keyspace: timestamps::TimestampKeyspace::Address, - key: RegistryKey::ZERO - }, - Tai64::UNIX_EPOCH - ); - - #[cfg(test)] - fuel_core_storage::basic_storage_tests!( - DaCompressionTemporalRegistryEvictorCache, - MetadataKey::Address, - RegistryKey::ZERO - ); - - fuel_core_storage::basic_storage_tests!( - DaCompressedBlocks, - ::Key::default(), - ::Value::default() - ); - - #[allow(clippy::arithmetic_side_effects)] // Test code, and also safe - pub fn generate_key(rng: &mut impl rand::Rng) -> RegistryKey { - let raw_key: u32 = rng.gen_range(0..2u32.pow(24) - 2); - RegistryKey::try_from(raw_key).unwrap() - } -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/evictor_cache.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/evictor_cache.rs deleted file mode 100644 index 870d02722f6..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/evictor_cache.rs +++ /dev/null @@ -1,34 +0,0 @@ -/// The metadata key used by `DaCompressionTemporalRegistryEvictorCache` table to -/// store progress of the evictor. -#[derive( - Debug, - Clone, - Copy, - PartialEq, - Eq, - serde::Serialize, - serde::Deserialize, - strum::EnumCount, -)] -pub enum MetadataKey { - Address, - AssetId, - ContractId, - ScriptCode, - PredicateCode, -} - -#[cfg(feature = "test-helpers")] -impl rand::distributions::Distribution for rand::distributions::Standard { - fn sample(&self, rng: &mut R) -> MetadataKey { - use strum::EnumCount; - match rng.next_u32() as usize % MetadataKey::COUNT { - 0 => MetadataKey::Address, - 1 => MetadataKey::AssetId, - 2 => MetadataKey::ContractId, - 3 => MetadataKey::ScriptCode, - 4 => MetadataKey::PredicateCode, - _ => unreachable!("New metadata key is added but not supported here"), - } - } -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/predicate_code_codec.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/predicate_code_codec.rs deleted file mode 100644 index 6c165c09f3a..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/predicate_code_codec.rs +++ /dev/null @@ -1,28 +0,0 @@ -use fuel_core_storage::codec::{ - Decode, - Encode, -}; -use fuel_core_types::fuel_tx::input::PredicateCode; -use std::{ - borrow::Cow, - ops::Deref, -}; - -// TODO: Remove this codec when the `PredicateCode` implements -// `AsRef<[u8]>` and `TryFrom<[u8]>` and use `Raw` codec instead. - -pub struct PredicateCodeCodec; - -impl Encode for PredicateCodeCodec { - type Encoder<'a> = Cow<'a, [u8]>; - - fn encode(t: &PredicateCode) -> Self::Encoder<'_> { - Cow::Borrowed(t.deref()) - } -} - -impl Decode for PredicateCodeCodec { - fn decode(bytes: &[u8]) -> anyhow::Result { - Ok(bytes.to_vec().into()) - } -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/reverse_key.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/reverse_key.rs deleted file mode 100644 index eeb619d7ada..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/reverse_key.rs +++ /dev/null @@ -1,82 +0,0 @@ -use fuel_core_types::{ - fuel_tx::{ - input::PredicateCode, - ScriptCode, - }, - fuel_types::{ - Address, - AssetId, - Bytes32, - ContractId, - }, -}; -use std::ops::Deref; - -#[derive( - Debug, - Clone, - Copy, - PartialEq, - Eq, - serde::Serialize, - serde::Deserialize, - strum::EnumCount, -)] -/// The reverse key for the temporal registry index. -/// By this key we can find the registry key from the temporal registry. -pub enum ReverseKey { - Address(Address), - AssetId(AssetId), - ContractId(ContractId), - /// Hash of the script code. - ScriptCode(Bytes32), - /// Hash of the predicate code. - PredicateCode(Bytes32), -} - -impl From<&Address> for ReverseKey { - fn from(address: &Address) -> Self { - Self::Address(*address) - } -} - -impl From<&AssetId> for ReverseKey { - fn from(asset_id: &AssetId) -> Self { - Self::AssetId(*asset_id) - } -} - -impl From<&ContractId> for ReverseKey { - fn from(contract_id: &ContractId) -> Self { - Self::ContractId(*contract_id) - } -} - -impl From<&ScriptCode> for ReverseKey { - fn from(script_code: &ScriptCode) -> Self { - let hash = fuel_core_types::fuel_crypto::Hasher::hash(script_code.deref()); - ReverseKey::ScriptCode(hash) - } -} - -impl From<&PredicateCode> for ReverseKey { - fn from(predicate_code: &PredicateCode) -> Self { - let hash = fuel_core_types::fuel_crypto::Hasher::hash(predicate_code.deref()); - ReverseKey::PredicateCode(hash) - } -} - -#[cfg(feature = "test-helpers")] -impl rand::distributions::Distribution for rand::distributions::Standard { - fn sample(&self, rng: &mut R) -> ReverseKey { - use strum::EnumCount; - match rng.next_u32() as usize % ReverseKey::COUNT { - 0 => ReverseKey::Address(rng.gen()), - 1 => ReverseKey::AssetId(rng.gen()), - 2 => ReverseKey::ContractId(rng.gen()), - 3 => ReverseKey::ScriptCode(rng.gen()), - 4 => ReverseKey::PredicateCode(rng.gen()), - _ => unreachable!("New reverse key is added but not supported here"), - } - } -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/script_code_codec.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/script_code_codec.rs deleted file mode 100644 index a4d6c8d1ac3..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/script_code_codec.rs +++ /dev/null @@ -1,28 +0,0 @@ -use fuel_core_storage::codec::{ - Decode, - Encode, -}; -use fuel_core_types::fuel_tx::ScriptCode; -use std::{ - borrow::Cow, - ops::Deref, -}; - -// TODO: Remove this codec when the `ScriptCode` implements -// `AsRef<[u8]>` and `TryFrom<[u8]>` and use `Raw` codec instead. - -pub struct ScriptCodeCodec; - -impl Encode for ScriptCodeCodec { - type Encoder<'a> = Cow<'a, [u8]>; - - fn encode(t: &ScriptCode) -> Self::Encoder<'_> { - Cow::Borrowed(t.deref()) - } -} - -impl Decode for ScriptCodeCodec { - fn decode(bytes: &[u8]) -> anyhow::Result { - Ok(bytes.to_vec().into()) - } -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/timestamps.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/timestamps.rs deleted file mode 100644 index dc8f016f50b..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/timestamps.rs +++ /dev/null @@ -1,57 +0,0 @@ -use fuel_core_types::fuel_compression::RegistryKey; - -/// The metadata key used by `DaCompressionTemporalRegistryTimsetamps` table to -/// keep track of when each key was last updated. -#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct TimestampKey { - /// The column where the key is stored. - pub keyspace: TimestampKeyspace, - /// The key itself. - pub key: RegistryKey, -} - -#[derive( - Debug, - Clone, - Copy, - PartialEq, - Eq, - serde::Serialize, - serde::Deserialize, - strum::EnumCount, -)] -pub enum TimestampKeyspace { - Address, - AssetId, - ContractId, - ScriptCode, - PredicateCode, -} - -#[cfg(feature = "test-helpers")] -impl rand::distributions::Distribution for rand::distributions::Standard { - #![allow(clippy::arithmetic_side_effects)] // Test-only code, and also safe - fn sample(&self, rng: &mut R) -> TimestampKey { - TimestampKey { - keyspace: rng.gen(), - key: RegistryKey::try_from(rng.gen_range(0..2u32.pow(24) - 2)).unwrap(), - } - } -} - -#[cfg(feature = "test-helpers")] -impl rand::distributions::Distribution - for rand::distributions::Standard -{ - fn sample(&self, rng: &mut R) -> TimestampKeyspace { - use strum::EnumCount; - match rng.next_u32() as usize % TimestampKeyspace::COUNT { - 0 => TimestampKeyspace::Address, - 1 => TimestampKeyspace::AssetId, - 2 => TimestampKeyspace::ContractId, - 3 => TimestampKeyspace::ScriptCode, - 4 => TimestampKeyspace::PredicateCode, - _ => unreachable!("New metadata key is added but not supported here"), - } - } -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/v2.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/v2.rs deleted file mode 100644 index ae8dfe32fc5..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/v2.rs +++ /dev/null @@ -1,8 +0,0 @@ -pub mod address; -pub mod asset_id; -pub mod contract_id; -pub mod evictor_cache; -pub mod predicate_code; -pub mod registry_index; -pub mod script_code; -pub mod timestamps; diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/address.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/v2/address.rs deleted file mode 100644 index 846d58445d4..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/address.rs +++ /dev/null @@ -1,119 +0,0 @@ -use crate::graphql_api::storage::{ - da_compression::RegistryKey, - Column, -}; -use core::borrow::Borrow; -use fuel_core_storage::{ - blueprint::{ - merklized::Merklized, - plain::Plain, - }, - codec::{ - postcard::Postcard, - primitive::Primitive, - raw::Raw, - }, - structured_storage::TableWithBlueprint, - tables::merkle::{ - DenseMerkleMetadata, - DenseMetadataKey, - }, - Mappable, -}; -use fuel_core_types::{ - fuel_merkle::binary, - fuel_tx::{ - Address, - Bytes32, - }, -}; - -pub struct TemporalRegistryAddressMerkleData; - -impl Mappable for TemporalRegistryAddressMerkleData { - type Key = u64; - type OwnedKey = Self::Key; - type Value = binary::Primitive; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryAddressMerkleData { - type Blueprint = Plain, Postcard>; - type Column = Column; - - fn column() -> Column { - Column::DaCompressionTemporalAddressMerkleData - } -} - -/// The metadata table for [`TemporalRegistryAddressMerkleData`] table. -pub struct TemporalRegistryAddressMerkleMetadata; - -impl Mappable for TemporalRegistryAddressMerkleMetadata { - type Key = DenseMetadataKey; - type OwnedKey = Self::Key; - type Value = DenseMerkleMetadata; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryAddressMerkleMetadata { - type Blueprint = Plain; - type Column = Column; - - fn column() -> Column { - Column::DaCompressionTemporalAddressMerkleMetadata - } -} - -/// Encoder for the V2 version of the DaCompressionTemporalRegistry for Address. -pub struct DaCompressionTemporalRegistryAddressV2Encoder; - -impl fuel_core_storage::codec::Encode
- for DaCompressionTemporalRegistryAddressV2Encoder -{ - type Encoder<'a> = [u8; Bytes32::LEN]; - - fn encode(value: &Address) -> Self::Encoder<'_> { - *Borrow::<[u8; Bytes32::LEN]>::borrow(value) - } -} - -/// V2 table for storing Address with Merklized encoding. -pub struct DaCompressionTemporalRegistryAddressV2; - -impl Mappable for DaCompressionTemporalRegistryAddressV2 { - type Key = Self::OwnedKey; - type OwnedKey = RegistryKey; - type Value = Self::OwnedValue; - type OwnedValue = Address; -} - -impl TableWithBlueprint for DaCompressionTemporalRegistryAddressV2 { - type Blueprint = Merklized< - Postcard, - Raw, - TemporalRegistryAddressMerkleMetadata, - TemporalRegistryAddressMerkleData, - DaCompressionTemporalRegistryAddressV2Encoder, - >; - type Column = Column; - - fn column() -> Self::Column { - Column::DaCompressionTemporalRegistryAddressV2 - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::fuel_core_graphql_api::storage::da_compression::tests::generate_key; - - #[cfg(test)] - fuel_core_storage::basic_merklelized_storage_tests!( - DaCompressionTemporalRegistryAddressV2, - RegistryKey::ZERO, - ::Value::default(), - ::Value::default(), - generate_key - ); -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/asset_id.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/v2/asset_id.rs deleted file mode 100644 index b145a9a6f33..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/asset_id.rs +++ /dev/null @@ -1,117 +0,0 @@ -use crate::graphql_api::storage::{ - da_compression::RegistryKey, - Column, -}; -use core::borrow::Borrow; -use fuel_core_storage::{ - blueprint::{ - merklized::Merklized, - plain::Plain, - }, - codec::{ - postcard::Postcard, - primitive::Primitive, - raw::Raw, - }, - structured_storage::TableWithBlueprint, - tables::merkle::{ - DenseMerkleMetadata, - DenseMetadataKey, - }, - Mappable, -}; -use fuel_core_types::{ - fuel_merkle::binary, - fuel_tx::{ - AssetId, - Bytes32, - }, -}; - -pub struct TemporalRegistryAssetIdMerkleData; - -impl Mappable for TemporalRegistryAssetIdMerkleData { - type Key = u64; - type OwnedKey = Self::Key; - type Value = binary::Primitive; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryAssetIdMerkleData { - type Blueprint = Plain, Postcard>; - type Column = Column; - - fn column() -> Column { - Column::DaCompressionTemporalAssetIdMerkleData - } -} - -/// The metadata table for [`TemporalRegistryAssetIdMerkleData`] table. -pub struct TemporalRegistryAssetIdMerkleMetadata; - -impl Mappable for TemporalRegistryAssetIdMerkleMetadata { - type Key = DenseMetadataKey; - type OwnedKey = Self::Key; - type Value = DenseMerkleMetadata; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryAssetIdMerkleMetadata { - type Blueprint = Plain; - type Column = Column; - - fn column() -> Column { - Column::DaCompressionTemporalAssetIdMerkleMetadata - } -} - -/// Encoder for the V2 version of the DaCompressionTemporalRegistry for AssetId. -pub struct DaCompressionTemporalRegistryAssetIdV2Encoder; - -impl fuel_core_storage::codec::Encode - for DaCompressionTemporalRegistryAssetIdV2Encoder -{ - type Encoder<'a> = [u8; Bytes32::LEN]; - fn encode(value: &AssetId) -> Self::Encoder<'_> { - *Borrow::<[u8; Bytes32::LEN]>::borrow(value) - } -} - -/// V2 table for storing AssetId with Merklized encoding. -pub struct DaCompressionTemporalRegistryAssetIdV2; - -impl Mappable for DaCompressionTemporalRegistryAssetIdV2 { - type Key = Self::OwnedKey; - type OwnedKey = RegistryKey; - type Value = Self::OwnedValue; - type OwnedValue = AssetId; -} - -impl TableWithBlueprint for DaCompressionTemporalRegistryAssetIdV2 { - type Blueprint = Merklized< - Postcard, - Raw, - TemporalRegistryAssetIdMerkleMetadata, - TemporalRegistryAssetIdMerkleData, - DaCompressionTemporalRegistryAssetIdV2Encoder, - >; - type Column = Column; - fn column() -> Self::Column { - Self::Column::DaCompressionTemporalRegistryAssetIdV2 - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::fuel_core_graphql_api::storage::da_compression::tests::generate_key; - - #[cfg(test)] - fuel_core_storage::basic_merklelized_storage_tests!( - DaCompressionTemporalRegistryAssetIdV2, - RegistryKey::ZERO, - ::Value::default(), - ::Value::default(), - generate_key - ); -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/contract_id.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/v2/contract_id.rs deleted file mode 100644 index 1c58ac628ee..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/contract_id.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::graphql_api::storage::{ - da_compression::RegistryKey, - Column, -}; -use core::borrow::Borrow; -use fuel_core_storage::{ - blueprint::{ - merklized::Merklized, - plain::Plain, - }, - codec::{ - postcard::Postcard, - primitive::Primitive, - raw::Raw, - }, - structured_storage::TableWithBlueprint, - tables::merkle::{ - DenseMerkleMetadata, - DenseMetadataKey, - }, - Mappable, -}; -use fuel_core_types::{ - fuel_merkle::binary, - fuel_tx::{ - Bytes32, - ContractId, - }, -}; - -pub struct TemporalRegistryContractIdMerkleData; - -impl Mappable for TemporalRegistryContractIdMerkleData { - type Key = u64; - type OwnedKey = Self::Key; - type Value = binary::Primitive; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryContractIdMerkleData { - type Blueprint = Plain, Postcard>; - type Column = Column; - - fn column() -> Column { - Column::DaCompressionTemporalContractIdMerkleData - } -} - -/// The metadata table for [`TemporalRegistryContractIdMerkleData`] table. -pub struct TemporalRegistryContractIdMerkleMetadata; - -impl Mappable for TemporalRegistryContractIdMerkleMetadata { - type Key = DenseMetadataKey; - type OwnedKey = Self::Key; - type Value = DenseMerkleMetadata; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryContractIdMerkleMetadata { - type Blueprint = Plain; - type Column = Column; - - fn column() -> Column { - Column::DaCompressionTemporalContractIdMerkleMetadata - } -} - -/// Encoder for the V2 version of the DaCompressionTemporalRegistry for ContractId. -pub struct DaCompressionTemporalRegistryContractIdV2Encoder; - -impl fuel_core_storage::codec::Encode - for DaCompressionTemporalRegistryContractIdV2Encoder -{ - type Encoder<'a> = [u8; Bytes32::LEN]; - fn encode(value: &ContractId) -> Self::Encoder<'_> { - *Borrow::<[u8; Bytes32::LEN]>::borrow(value) - } -} - -/// V2 table for storing ContractId with Merklized encoding. -pub struct DaCompressionTemporalRegistryContractIdV2; - -impl Mappable for DaCompressionTemporalRegistryContractIdV2 { - type Key = Self::OwnedKey; - type OwnedKey = RegistryKey; - type Value = Self::OwnedValue; - type OwnedValue = ContractId; -} - -impl TableWithBlueprint for DaCompressionTemporalRegistryContractIdV2 { - type Blueprint = Merklized< - Postcard, - Raw, - TemporalRegistryContractIdMerkleMetadata, - TemporalRegistryContractIdMerkleData, - DaCompressionTemporalRegistryContractIdV2Encoder, - >; - type Column = Column; - - fn column() -> Self::Column { - Column::DaCompressionTemporalRegistryContractIdV2 - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::fuel_core_graphql_api::storage::da_compression::tests::generate_key; - - #[cfg(test)] - fuel_core_storage::basic_merklelized_storage_tests!( - DaCompressionTemporalRegistryContractIdV2, - RegistryKey::ZERO, - ::Value::default(), - ::Value::default(), - generate_key - ); -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/evictor_cache.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/v2/evictor_cache.rs deleted file mode 100644 index 2ccd073b0cc..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/evictor_cache.rs +++ /dev/null @@ -1,103 +0,0 @@ -use crate::graphql_api::storage::{ - da_compression::{ - MetadataKey, - RegistryKey, - }, - Column, -}; -use fuel_core_storage::{ - blueprint::{ - merklized::Merklized, - plain::Plain, - }, - codec::{ - postcard::Postcard, - primitive::Primitive, - }, - structured_storage::TableWithBlueprint, - tables::merkle::{ - DenseMerkleMetadata, - DenseMetadataKey, - }, - Mappable, -}; -use fuel_core_types::fuel_merkle::binary; - -pub struct TemporalRegistryEvictorCacheMerkleData; - -impl Mappable for TemporalRegistryEvictorCacheMerkleData { - type Key = u64; - type OwnedKey = Self::Key; - type Value = binary::Primitive; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryEvictorCacheMerkleData { - type Blueprint = Plain, Postcard>; - type Column = Column; - - fn column() -> Self::Column { - Self::Column::DaCompressionTemporalRegistryEvictorCacheMerkleData - } -} - -/// The metadata table for [`TemporalRegistryEvictorCacheMerkleData`] table. -pub struct TemporalRegistryEvictorCacheMerkleMetadata; - -impl Mappable for TemporalRegistryEvictorCacheMerkleMetadata { - type Key = DenseMetadataKey; - type OwnedKey = Self::Key; - type Value = DenseMerkleMetadata; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryEvictorCacheMerkleMetadata { - type Blueprint = Plain; - type Column = Column; - - fn column() -> Self::Column { - Self::Column::DaCompressionTemporalRegistryEvictorCacheMerkleMetadata - } -} - -impl Mappable for DaCompressionTemporalRegistryEvictorCacheV2 { - type Key = Self::OwnedKey; - type OwnedKey = MetadataKey; - type Value = Self::OwnedValue; - type OwnedValue = RegistryKey; -} - -/// Encoder for the V2 version of the DaCompressionTemporalRegistry for EvictorCache. -pub struct DaCompressionTemporalEvictorCacheV2Encoder; - -impl fuel_core_storage::codec::Encode - for DaCompressionTemporalEvictorCacheV2Encoder -{ - type Encoder<'a> = [u8; RegistryKey::SIZE]; - - fn encode(value: &RegistryKey) -> Self::Encoder<'_> { - let mut bytes = [0u8; RegistryKey::SIZE]; - bytes.copy_from_slice(value.as_ref()); - bytes - } -} - -/// This table is used to hold "next key to evict" for each keyspace. -/// In the future we'll likely switch to use LRU or something, in which -/// case this table can be repurposed. -pub struct DaCompressionTemporalRegistryEvictorCacheV2; - -impl TableWithBlueprint for DaCompressionTemporalRegistryEvictorCacheV2 { - type Blueprint = Merklized< - Postcard, - Postcard, - TemporalRegistryEvictorCacheMerkleMetadata, - TemporalRegistryEvictorCacheMerkleData, - DaCompressionTemporalEvictorCacheV2Encoder, - >; - type Column = Column; - - fn column() -> Self::Column { - Self::Column::DaCompressionTemporalRegistryEvictorCacheV2 - } -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/predicate_code.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/v2/predicate_code.rs deleted file mode 100644 index f8998c60e65..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/predicate_code.rs +++ /dev/null @@ -1,116 +0,0 @@ -use crate::graphql_api::storage::{ - da_compression::{ - PredicateCode, - PredicateCodeCodec, - RegistryKey, - }, - Column, -}; -use fuel_core_storage::{ - blueprint::{ - merklized::Merklized, - plain::Plain, - }, - codec::{ - postcard::Postcard, - primitive::Primitive, - }, - structured_storage::TableWithBlueprint, - tables::merkle::{ - DenseMerkleMetadata, - DenseMetadataKey, - }, - Mappable, -}; -use fuel_core_types::fuel_merkle::binary; - -pub struct TemporalRegistryPredicateCodeMerkleData; - -impl Mappable for TemporalRegistryPredicateCodeMerkleData { - type Key = u64; - type OwnedKey = Self::Key; - type Value = binary::Primitive; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryPredicateCodeMerkleData { - type Blueprint = Plain, Postcard>; - type Column = Column; - - fn column() -> Column { - Column::DaCompressionTemporalPredicateCodeMerkleData - } -} - -/// The metadata table for [`TemporalRegistryPredicateCodeMerkleData`] table. -pub struct TemporalRegistryPredicateCodeMerkleMetadata; - -impl Mappable for TemporalRegistryPredicateCodeMerkleMetadata { - type Key = DenseMetadataKey; - type OwnedKey = Self::Key; - type Value = DenseMerkleMetadata; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryPredicateCodeMerkleMetadata { - type Blueprint = Plain; - type Column = Column; - - fn column() -> Column { - Column::DaCompressionTemporalPredicateCodeMerkleMetadata - } -} - -/// Encoder for the V2 version of the DaCompressionTemporalRegistry for PredicateCode. -pub struct DaCompressionTemporalRegistryPredicateCodeV2Encoder; - -impl fuel_core_storage::codec::Encode - for DaCompressionTemporalRegistryPredicateCodeV2Encoder -{ - type Encoder<'a> = std::borrow::Cow<'a, [u8]>; - - fn encode(value: &PredicateCode) -> Self::Encoder<'_> { - let bytes: Vec = value.bytes.clone(); - std::borrow::Cow::Owned(bytes) - } -} - -/// V2 table for storing PredicateCode with Merklized encoding. -pub struct DaCompressionTemporalRegistryPredicateCodeV2; - -impl Mappable for DaCompressionTemporalRegistryPredicateCodeV2 { - type Key = Self::OwnedKey; - type OwnedKey = RegistryKey; - type Value = Self::OwnedValue; - type OwnedValue = PredicateCode; -} - -impl TableWithBlueprint for DaCompressionTemporalRegistryPredicateCodeV2 { - type Blueprint = Merklized< - Postcard, - PredicateCodeCodec, - TemporalRegistryPredicateCodeMerkleMetadata, - TemporalRegistryPredicateCodeMerkleData, - DaCompressionTemporalRegistryPredicateCodeV2Encoder, - >; - type Column = Column; - - fn column() -> Self::Column { - Column::DaCompressionTemporalRegistryPredicateCodeV2 - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::fuel_core_graphql_api::storage::da_compression::tests::generate_key; - - #[cfg(test)] - fuel_core_storage::basic_merklelized_storage_tests!( - DaCompressionTemporalRegistryPredicateCodeV2, - RegistryKey::ZERO, - ::Value::default(), - ::Value::default(), - generate_key - ); -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/registry_index.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/v2/registry_index.rs deleted file mode 100644 index afdf9553afa..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/registry_index.rs +++ /dev/null @@ -1,114 +0,0 @@ -use crate::graphql_api::storage::{ - da_compression::{ - reverse_key::ReverseKey, - RegistryKey, - }, - Column, -}; -use fuel_core_storage::{ - blueprint::{ - merklized::Merklized, - plain::Plain, - }, - codec::{ - postcard::Postcard, - primitive::Primitive, - }, - structured_storage::TableWithBlueprint, - tables::merkle::{ - DenseMerkleMetadata, - DenseMetadataKey, - }, - Mappable, -}; -use fuel_core_types::fuel_merkle::binary; - -pub struct TemporalRegistryIndexMerkleData; - -impl Mappable for TemporalRegistryIndexMerkleData { - type Key = u64; - type OwnedKey = Self::Key; - type Value = binary::Primitive; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryIndexMerkleData { - type Blueprint = Plain, Postcard>; - type Column = Column; - - fn column() -> Column { - Column::DaCompressionTemporalRegistryIndexMerkleData - } -} - -/// The metadata table for [`TemporalRegistryIndexMerkleData`] table. -pub struct TemporalRegistryIndexMerkleMetadata; - -impl Mappable for TemporalRegistryIndexMerkleMetadata { - type Key = DenseMetadataKey; - type OwnedKey = Self::Key; - type Value = DenseMerkleMetadata; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryIndexMerkleMetadata { - type Blueprint = Plain; - type Column = Column; - - fn column() -> Column { - Column::DaCompressionTemporalRegistryIndexMerkleMetadata - } -} - -/// Encoder for the V2 version of the DaCompressionTemporalRegistry for RegistryIndex. -pub struct DaCompressionTemporalRegistryIndexV2Encoder; - -impl fuel_core_storage::codec::Encode - for DaCompressionTemporalRegistryIndexV2Encoder -{ - type Encoder<'a> = [u8; RegistryKey::SIZE]; - - fn encode(value: &RegistryKey) -> Self::Encoder<'_> { - let mut bytes = [0u8; RegistryKey::SIZE]; - bytes.copy_from_slice(value.as_ref()); - bytes - } -} - -/// Mapping from the type to the registry key in the temporal registry. -pub struct DaCompressionTemporalRegistryIndexV2; - -impl Mappable for DaCompressionTemporalRegistryIndexV2 { - type Key = Self::OwnedKey; - type OwnedKey = ReverseKey; - type Value = Self::OwnedValue; - type OwnedValue = RegistryKey; -} - -impl TableWithBlueprint for DaCompressionTemporalRegistryIndexV2 { - type Blueprint = Merklized< - Postcard, - Postcard, - TemporalRegistryIndexMerkleMetadata, - TemporalRegistryIndexMerkleData, - DaCompressionTemporalRegistryIndexV2Encoder, - >; - type Column = Column; - - fn column() -> Self::Column { - Self::Column::DaCompressionTemporalRegistryIndexV2 - } -} - -#[cfg(test)] -mod tests { - use super::*; - use fuel_core_types::fuel_tx::Address; - - #[cfg(test)] - fuel_core_storage::basic_merklelized_storage_tests!( - DaCompressionTemporalRegistryIndexV2, - ReverseKey::Address(Address::zeroed()), - RegistryKey::ZERO - ); -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/script_code.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/v2/script_code.rs deleted file mode 100644 index 0ceda0b4d03..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/script_code.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::graphql_api::storage::{ - da_compression::{ - RegistryKey, - ScriptCodeCodec, - }, - Column, -}; -use fuel_core_storage::{ - blueprint::{ - merklized::Merklized, - plain::Plain, - }, - codec::{ - postcard::Postcard, - primitive::Primitive, - }, - structured_storage::TableWithBlueprint, - tables::merkle::{ - DenseMerkleMetadata, - DenseMetadataKey, - }, - Mappable, -}; -use fuel_core_types::{ - fuel_merkle::binary, - fuel_tx::ScriptCode, -}; - -pub struct TemporalRegistryScriptCodeMerkleData; - -impl Mappable for TemporalRegistryScriptCodeMerkleData { - type Key = u64; - type OwnedKey = Self::Key; - type Value = binary::Primitive; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryScriptCodeMerkleData { - type Blueprint = Plain, Postcard>; - type Column = Column; - - fn column() -> Column { - Column::DaCompressionTemporalScriptCodeMerkleData - } -} - -/// The metadata table for [`TemporalRegistryScriptCodeMerkleData`] table. -pub struct TemporalRegistryScriptCodeMerkleMetadata; - -impl Mappable for TemporalRegistryScriptCodeMerkleMetadata { - type Key = DenseMetadataKey; - type OwnedKey = Self::Key; - type Value = DenseMerkleMetadata; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryScriptCodeMerkleMetadata { - type Blueprint = Plain; - type Column = Column; - - fn column() -> Column { - Column::DaCompressionTemporalScriptCodeMerkleMetadata - } -} - -/// Encoder for the V2 version of the DaCompressionTemporalRegistry for ScriptCode. -pub struct DaCompressionTemporalRegistryScriptCodeV2Encoder; - -impl fuel_core_storage::codec::Encode - for DaCompressionTemporalRegistryScriptCodeV2Encoder -{ - type Encoder<'a> = std::borrow::Cow<'a, [u8]>; - - fn encode(value: &ScriptCode) -> Self::Encoder<'_> { - let bytes: Vec = value.bytes.clone(); - std::borrow::Cow::Owned(bytes) - } -} - -/// V2 table for storing ScriptCode with Merklized encoding. -pub struct DaCompressionTemporalRegistryScriptCodeV2; - -impl Mappable for DaCompressionTemporalRegistryScriptCodeV2 { - type Key = Self::OwnedKey; - type OwnedKey = RegistryKey; - type Value = Self::OwnedValue; - type OwnedValue = ScriptCode; -} - -impl TableWithBlueprint for DaCompressionTemporalRegistryScriptCodeV2 { - type Blueprint = Merklized< - Postcard, - ScriptCodeCodec, - TemporalRegistryScriptCodeMerkleMetadata, - TemporalRegistryScriptCodeMerkleData, - DaCompressionTemporalRegistryScriptCodeV2Encoder, - >; - type Column = Column; - - fn column() -> Self::Column { - Column::DaCompressionTemporalRegistryScriptCodeV2 - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::fuel_core_graphql_api::storage::da_compression::tests::generate_key; - - #[cfg(test)] - fuel_core_storage::basic_merklelized_storage_tests!( - DaCompressionTemporalRegistryScriptCodeV2, - RegistryKey::ZERO, - ::Value::default(), - ::Value::default(), - generate_key - ); -} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/timestamps.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/v2/timestamps.rs deleted file mode 100644 index e9c96ec848f..00000000000 --- a/crates/fuel-core/src/graphql_api/storage/da_compression/v2/timestamps.rs +++ /dev/null @@ -1,122 +0,0 @@ -use crate::graphql_api::storage::{ - da_compression::timestamps::TimestampKey, - Column, -}; -use fuel_core_storage::{ - blueprint::{ - merklized::Merklized, - plain::Plain, - }, - codec::{ - postcard::Postcard, - primitive::Primitive, - }, - structured_storage::TableWithBlueprint, - tables::merkle::{ - DenseMerkleMetadata, - DenseMetadataKey, - }, - Mappable, -}; -use fuel_core_types::{ - fuel_merkle::binary, - tai64::Tai64, -}; - -pub struct TemporalRegistryTimestampsMerkleData; - -impl Mappable for TemporalRegistryTimestampsMerkleData { - type Key = u64; - type OwnedKey = Self::Key; - type Value = binary::Primitive; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryTimestampsMerkleData { - type Blueprint = Plain, Postcard>; - type Column = Column; - - fn column() -> Self::Column { - Self::Column::DaCompressionTemporalRegistryTimestampsMerkleData - } -} - -/// The metadata table for [`TemporalRegistryTimestampsMerkleData`] table. -pub struct TemporalRegistryTimestampsMerkleMetadata; - -impl Mappable for TemporalRegistryTimestampsMerkleMetadata { - type Key = DenseMetadataKey; - type OwnedKey = Self::Key; - type Value = DenseMerkleMetadata; - type OwnedValue = Self::Value; -} - -impl TableWithBlueprint for TemporalRegistryTimestampsMerkleMetadata { - type Blueprint = Plain; - type Column = Column; - - fn column() -> Self::Column { - Self::Column::DaCompressionTemporalRegistryTimestampsMerkleMetadata - } -} - -/// Encoder for the V2 version of the DaCompressionTemporalRegistry for Timestamps. -pub struct DaCompressionTemporalRegistryTimestampsV2Encoder; - -impl fuel_core_storage::codec::Encode - for DaCompressionTemporalRegistryTimestampsV2Encoder -{ - type Encoder<'a> = [u8; Tai64::BYTE_SIZE]; - - fn encode(value: &Tai64) -> Self::Encoder<'_> { - value.to_bytes() - } -} - -/// This table keeps track of last written timestamp for each key, -/// so that we can keep track of expiration. -pub struct DaCompressionTemporalRegistryTimestampsV2; - -impl Mappable for DaCompressionTemporalRegistryTimestampsV2 { - type Key = Self::OwnedKey; - type OwnedKey = TimestampKey; - type Value = Self::OwnedValue; - type OwnedValue = Tai64; -} - -impl TableWithBlueprint for DaCompressionTemporalRegistryTimestampsV2 { - type Blueprint = Merklized< - Postcard, - Postcard, - TemporalRegistryTimestampsMerkleMetadata, - TemporalRegistryTimestampsMerkleData, - DaCompressionTemporalRegistryTimestampsV2Encoder, - >; - type Column = Column; - - fn column() -> Self::Column { - Self::Column::DaCompressionTemporalRegistryTimestampsV2 - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::graphql_api::storage::da_compression::{ - timestamps::{ - TimestampKey, - TimestampKeyspace, - }, - RegistryKey, - }; - - #[cfg(test)] - fuel_core_storage::basic_merklelized_storage_tests!( - DaCompressionTemporalRegistryTimestampsV2, - TimestampKey { - keyspace: TimestampKeyspace::Address, - key: RegistryKey::ZERO - }, - Tai64::UNIX_EPOCH - ); -} diff --git a/crates/fuel-core/src/graphql_api/worker_service.rs b/crates/fuel-core/src/graphql_api/worker_service.rs index 58d9991229f..a0272cd6959 100644 --- a/crates/fuel-core/src/graphql_api/worker_service.rs +++ b/crates/fuel-core/src/graphql_api/worker_service.rs @@ -2,7 +2,6 @@ use self::indexation::error::IndexationError; use super::{ block_height_subscription, - da_compression::da_compress_block, indexation, storage::old::{ OldFuelBlockConsensus, @@ -116,21 +115,13 @@ pub(crate) struct Context<'a, TxStatusManager, BlockImporter, OnChain, OffChain> pub(crate) block_importer: BlockImporter, pub(crate) on_chain_database: OnChain, pub(crate) off_chain_database: OffChain, - pub(crate) da_compression_config: DaCompressionConfig, pub(crate) continue_on_error: bool, pub(crate) consensus_parameters: &'a ConsensusParameters, } -#[derive(Debug, Clone)] -pub enum DaCompressionConfig { - Disabled, - Enabled(fuel_core_compression::config::Config), -} - /// The initialization task recovers the state of the GraphQL service database on startup. pub struct InitializeTask { chain_id: ChainId, - da_compression_config: DaCompressionConfig, continue_on_error: bool, tx_status_manager: TxStatusManager, blocks_events: BoxStream, @@ -148,7 +139,6 @@ pub struct Task { block_importer: BoxStream, database: D, chain_id: ChainId, - da_compression_config: DaCompressionConfig, continue_on_error: bool, balances_indexation_enabled: bool, coins_to_spend_indexation_enabled: bool, @@ -196,13 +186,6 @@ where &self.base_asset_id, )?; - match self.da_compression_config { - DaCompressionConfig::Disabled => {} - DaCompressionConfig::Enabled(config) => { - da_compress_block(config, block, &result.events, &mut transaction)?; - } - } - transaction.commit()?; for status in result.tx_status.iter() { @@ -607,7 +590,6 @@ where let InitializeTask { chain_id, - da_compression_config, tx_status_manager, block_importer, blocks_events, @@ -623,7 +605,6 @@ where block_importer: blocks_events, database: off_chain_database, chain_id, - da_compression_config, continue_on_error, balances_indexation_enabled, coins_to_spend_indexation_enabled, @@ -756,7 +737,6 @@ where block_importer, on_chain_database, off_chain_database, - da_compression_config, continue_on_error, consensus_parameters, } = context; @@ -770,7 +750,6 @@ where on_chain_database, off_chain_database, chain_id: consensus_parameters.chain_id(), - da_compression_config, continue_on_error, base_asset_id: *consensus_parameters.base_asset_id(), block_height_subscription_handler: block_height_subscription::Handler::new( diff --git a/crates/fuel-core/src/graphql_api/worker_service/tests.rs b/crates/fuel-core/src/graphql_api/worker_service/tests.rs index c34f69a8867..38c6b7e2a6f 100644 --- a/crates/fuel-core/src/graphql_api/worker_service/tests.rs +++ b/crates/fuel-core/src/graphql_api/worker_service/tests.rs @@ -81,7 +81,6 @@ fn worker_task_with_block_importer_and_db( block_importer, database, chain_id, - da_compression_config: DaCompressionConfig::Disabled, continue_on_error: false, balances_indexation_enabled: true, coins_to_spend_indexation_enabled: true, diff --git a/crates/fuel-core/src/query.rs b/crates/fuel-core/src/query.rs index 3b0005967c7..ea7608486dd 100644 --- a/crates/fuel-core/src/query.rs +++ b/crates/fuel-core/src/query.rs @@ -9,8 +9,6 @@ mod subscriptions; mod tx; mod upgrades; -pub mod da_compressed; - // TODO: Remove reexporting of everything pub use balance::*; pub use message::*; diff --git a/crates/fuel-core/src/query/da_compressed.rs b/crates/fuel-core/src/query/da_compressed.rs deleted file mode 100644 index 669e55d584e..00000000000 --- a/crates/fuel-core/src/query/da_compressed.rs +++ /dev/null @@ -1,16 +0,0 @@ -use crate::graphql_api::ports::DatabaseDaCompressedBlocks; -use fuel_core_storage::Result as StorageResult; -use fuel_core_types::fuel_types::BlockHeight; - -pub trait DaCompressedBlockData: Send + Sync { - fn da_compressed_block(&self, id: &BlockHeight) -> StorageResult>; -} - -impl DaCompressedBlockData for D -where - D: DatabaseDaCompressedBlocks + ?Sized + Send + Sync, -{ - fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult> { - self.da_compressed_block(height) - } -} diff --git a/crates/fuel-core/src/schema/da_compressed.rs b/crates/fuel-core/src/schema/da_compressed.rs index 2f2939be83f..994fd60bd59 100644 --- a/crates/fuel-core/src/schema/da_compressed.rs +++ b/crates/fuel-core/src/schema/da_compressed.rs @@ -1,12 +1,10 @@ -use super::{ - scalars::HexString, - ReadViewProvider, -}; +use super::scalars::HexString; use crate::{ fuel_core_graphql_api::{ query_costs, IntoApiResult, }, + graphql_api::api_service::DaCompressionProvider, schema::scalars::U32, }; use async_graphql::{ @@ -42,8 +40,8 @@ impl DaCompressedBlockQuery { ctx: &Context<'_>, #[graphql(desc = "Height of the block")] height: U32, ) -> async_graphql::Result> { - let query = ctx.read_view()?; - query + let da_compression_provider = ctx.data_unchecked::(); + da_compression_provider .da_compressed_block(&height.0.into()) .into_api_result() } diff --git a/crates/fuel-core/src/service.rs b/crates/fuel-core/src/service.rs index 4ef0df9cf9a..b18a1dc311e 100644 --- a/crates/fuel-core/src/service.rs +++ b/crates/fuel-core/src/service.rs @@ -91,6 +91,8 @@ pub struct SharedState { pub executor: ExecutorAdapter, /// The config of the service. pub config: Config, + /// The compression service shared data. + pub compression: Option, } pub struct FuelService { @@ -186,6 +188,7 @@ impl FuelService { Default::default(), Default::default(), Default::default(), + Default::default(), ); Self::from_combined_database(combined_database, config).await } @@ -226,6 +229,14 @@ impl FuelService { Ok(()) } + /// Wait for the compression service to be in sync with L2 height + pub async fn await_compression_synced(&self) -> anyhow::Result<()> { + if let Some(sync_observer) = &self.runner.shared.compression { + sync_observer.await_synced().await?; + } + Ok(()) + } + fn make_database_compatible_with_config( combined_database: &mut CombinedDatabase, config: &Config, diff --git a/crates/fuel-core/src/service/adapters.rs b/crates/fuel-core/src/service/adapters.rs index 7074610d41b..5ea99db0533 100644 --- a/crates/fuel-core/src/service/adapters.rs +++ b/crates/fuel-core/src/service/adapters.rs @@ -66,6 +66,7 @@ use crate::{ pub mod block_importer; pub mod chain_state_info_provider; +pub mod compression_adapters; pub mod consensus_module; pub mod executor; pub mod fuel_gas_price_provider; @@ -75,6 +76,7 @@ pub mod import_result_provider; #[cfg(feature = "p2p")] pub mod p2p; pub mod producer; +pub mod ready_signal; #[cfg(feature = "relayer")] pub mod relayer; #[cfg(feature = "shared-sequencer")] @@ -84,8 +86,6 @@ pub mod sync; pub mod tx_status_manager; pub mod txpool; -pub mod ready_signal; - #[derive(Debug, Clone)] pub struct ChainStateInfoProvider { shared_state: chain_state_info_provider::SharedState, diff --git a/crates/fuel-core/src/service/adapters/compression_adapters.rs b/crates/fuel-core/src/service/adapters/compression_adapters.rs new file mode 100644 index 00000000000..e3f8776910d --- /dev/null +++ b/crates/fuel-core/src/service/adapters/compression_adapters.rs @@ -0,0 +1,52 @@ +use crate::{ + database::{ + database_description::compression::CompressionDatabase, + Database, + }, + service::adapters::BlockImporterAdapter, +}; +use fuel_core_compression_service::{ + config, + ports::{ + block_source, + configuration, + }, +}; +use fuel_core_services::stream::IntoBoxStream; + +impl block_source::BlockSource for BlockImporterAdapter { + fn subscribe( + &self, + ) -> fuel_core_services::stream::BoxStream { + use futures::StreamExt; + self.events_shared_result() + .map(|result| { + let sealed_block = result.sealed_block.clone(); + let events = result.events.clone(); + block_source::BlockWithMetadata::new(sealed_block.entity, events) + }) + .into_boxed() + } +} + +impl configuration::CompressionConfigProvider + for crate::service::config::DaCompressionConfig +{ + fn config(&self) -> config::CompressionConfig { + config::CompressionConfig::new(self.retention_duration) + } +} + +pub struct CompressionServiceAdapter { + db: Database, +} + +impl CompressionServiceAdapter { + pub fn new(db: Database) -> Self { + Self { db } + } + + pub fn storage(&self) -> &Database { + &self.db + } +} diff --git a/crates/fuel-core/src/service/adapters/graphql_api.rs b/crates/fuel-core/src/service/adapters/graphql_api.rs index 0f7d187eab5..771c4a4e123 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api.rs @@ -1,4 +1,5 @@ use super::{ + compression_adapters::CompressionServiceAdapter, BlockImporterAdapter, BlockProducerAdapter, ChainStateInfoProvider, @@ -7,7 +8,11 @@ use super::{ TxStatusManagerAdapter, }; use crate::{ - database::OnChainIterableKeyValueView, + database::{ + database_description::compression::CompressionDatabase, + Database, + OnChainIterableKeyValueView, + }, fuel_core_graphql_api::ports::{ worker::{ self, @@ -21,6 +26,7 @@ use crate::{ TxPoolPort, }, graphql_api::ports::{ + DatabaseDaCompressedBlocks, MemoryPool, TxStatusManager, }, @@ -34,8 +40,15 @@ use crate::{ }, }; use async_trait::async_trait; +use fuel_core_compression_service::storage::CompressedBlocks; use fuel_core_services::stream::BoxStream; -use fuel_core_storage::Result as StorageResult; +use fuel_core_storage::{ + blueprint::BlueprintInspect, + kv_store::KeyValueInspect, + not_found, + structured_storage::TableWithBlueprint, + Result as StorageResult, +}; use fuel_core_tx_status_manager::TxStatusMessage; use fuel_core_txpool::TxPoolStats; use fuel_core_types::{ @@ -256,3 +269,20 @@ impl MemoryPool for SharedMemoryPool { self.memory_pool.take_raw().await } } + +impl DatabaseDaCompressedBlocks for CompressionServiceAdapter { + fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult> { + use fuel_core_storage::codec::Encode; + + let encoded_height = + <::Blueprint as BlueprintInspect< + CompressedBlocks, + Database, /* in the future it would be nice to use a dummy impl, but it's not worth the effort rn */ + >>::KeyCodec::encode(height); + let column = ::column(); + self.storage() + .get(&encoded_height, column)? + .ok_or_else(|| not_found!(CompressedBlocks)) + .map(|block| block.to_vec()) + } +} diff --git a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs index a280451faf2..d1d57bc1664 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs @@ -16,7 +16,6 @@ use crate::{ }, storage::{ contracts::ContractsInfo, - da_compression::DaCompressedBlocks, relayed_transactions::RelayedTransactionStatuses, transactions::OwnedTransactionIndexCursor, }, @@ -46,17 +45,13 @@ use crate::{ }, }; use fuel_core_storage::{ - blueprint::BlueprintInspect, - codec::Encode, iter::{ BoxedIter, IntoBoxedIter, IterDirection, IteratorOverTable, }, - kv_store::KeyValueInspect, not_found, - structured_storage::TableWithBlueprint, transactional::{ IntoTransaction, StorageTransaction, @@ -97,19 +92,6 @@ impl OffChainDatabase for OffChainIterableKeyValueView { .and_then(|height| height.ok_or(not_found!("BlockHeight"))) } - fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult> { - let column = ::column(); - let encoder = - <::Blueprint as BlueprintInspect< - DaCompressedBlocks, - Self, - >>::KeyCodec::encode(height); - - self.get(encoder.as_ref(), column)? - .ok_or_else(|| not_found!(DaCompressedBlocks)) - .map(|value| value.to_vec()) - } - fn tx_status( &self, tx_id: &TxId, diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index 36efae52689..bfe051ad498 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -35,10 +35,7 @@ use fuel_core_types::{ use crate::{ combined_database::CombinedDatabaseConfig, - graphql_api::{ - worker_service::DaCompressionConfig, - ServiceConfig as GraphQLConfig, - }, + graphql_api::ServiceConfig as GraphQLConfig, }; use fuel_core_types::fuel_types::AssetId; @@ -72,7 +69,7 @@ pub struct Config { pub tx_status_manager: TxStatusManagerConfig, pub block_producer: fuel_core_producer::Config, pub gas_price_config: GasPriceConfig, - pub da_compression: DaCompressionConfig, + pub da_compression: DaCompressionMode, pub block_importer: fuel_core_importer::Config, #[cfg(feature = "relayer")] pub relayer: Option, @@ -195,7 +192,7 @@ impl Config { block_producer: fuel_core_producer::Config { ..Default::default() }, - da_compression: DaCompressionConfig::Disabled, + da_compression: DaCompressionMode::Disabled, gas_price_config, block_importer, #[cfg(feature = "relayer")] @@ -324,3 +321,14 @@ impl GasPriceConfig { } } } + +#[derive(Debug, Clone)] +pub struct DaCompressionConfig { + pub retention_duration: Duration, +} + +#[derive(Debug, Clone)] +pub enum DaCompressionMode { + Disabled, + Enabled(DaCompressionConfig), +} diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index a5860b144b8..5e94db342ab 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -27,6 +27,8 @@ use fuel_core_storage::{ use fuel_core_types::blockchain::primitives::DaBlockHeight; use fuel_core_types::signer::SignMode; +use fuel_core_compression_service::service::new_service as new_compression_service; + #[cfg(feature = "relayer")] use crate::relayer::Config as RelayerConfig; @@ -43,10 +45,12 @@ use crate::service::adapters::consensus_module::poa::pre_confirmation_signature: use super::{ adapters::{ + compression_adapters::CompressionServiceAdapter, FuelBlockSigner, P2PAdapter, TxStatusManagerAdapter, }, + config::DaCompressionMode, genesis::create_genesis_block, DbType, }; @@ -57,7 +61,9 @@ use crate::{ self, Config as GraphQLConfig, }, - graphql_api::worker_service, + graphql_api::worker_service::{ + self, + }, schema::build_schema, service::{ adapters::{ @@ -377,6 +383,23 @@ pub fn init_sub_services( config.sync, )?; + // we allow the consumers of the database access even + // when the compression service is disabled + let compression_service_adapter = + CompressionServiceAdapter::new(database.compression().clone()); + + let compression_service = match &config.da_compression { + DaCompressionMode::Disabled => None, + DaCompressionMode::Enabled(cfg) => Some( + new_compression_service( + importer_adapter.clone(), + database.compression().clone(), + cfg.clone(), + ) + .map_err(|e| anyhow::anyhow!(e))?, + ), + }; + let schema = crate::schema::dap::init(build_schema(), config.debug) .data(database.on_chain().clone()); @@ -387,7 +410,6 @@ pub fn init_sub_services( block_importer: graphql_block_importer, on_chain_database: database.on_chain().clone(), off_chain_database: database.off_chain().clone(), - da_compression_config: config.da_compression.clone(), continue_on_error: config.continue_on_error, consensus_parameters: &chain_config.consensus_parameters, }; @@ -423,6 +445,7 @@ pub fn init_sub_services( Box::new(chain_state_info_provider), SharedMemoryPool::new(config.memory_pool_size), graphql_block_height_subscription_handle, + Box::new(compression_service_adapter), )?; let shared = SharedState { @@ -438,6 +461,7 @@ pub fn init_sub_services( executor, config: config.clone(), tx_status_manager: tx_status_manager_adapter, + compression: compression_service.as_ref().map(|c| c.shared.clone()), }; #[allow(unused_mut)] @@ -467,6 +491,10 @@ pub fn init_sub_services( services.push(Box::new(graphql_worker)); services.push(Box::new(tx_status_manager)); + if let Some(compression_service) = compression_service { + services.push(Box::new(compression_service)); + } + // always make sure that the block producer is inserted last if let Some(poa) = poa { services.push(Box::new(poa)); diff --git a/crates/services/compression/src/errors.rs b/crates/services/compression/src/errors.rs index 27b43e82e59..327fc5166b3 100644 --- a/crates/services/compression/src/errors.rs +++ b/crates/services/compression/src/errors.rs @@ -32,4 +32,7 @@ pub enum CompressionError { /// Failed to handle new block #[error("failed to handle new block: `{0}`")] FailedToHandleNewBlock(String), + /// Failed to get the sync status of the storages + #[error("failed to get sync status")] + FailedToGetSyncStatus, } diff --git a/crates/services/compression/src/lib.rs b/crates/services/compression/src/lib.rs index 369234b914b..2614c702530 100644 --- a/crates/services/compression/src/lib.rs +++ b/crates/services/compression/src/lib.rs @@ -16,6 +16,8 @@ pub mod ports; pub mod service; /// Storage traits for the compression service pub mod storage; +/// Sync state for the compression service +pub mod sync_state; /// Temporal Registry implementations pub mod temporal_registry; diff --git a/crates/services/compression/src/ports/block_source.rs b/crates/services/compression/src/ports/block_source.rs index 221aa9af068..453ba2e582b 100644 --- a/crates/services/compression/src/ports/block_source.rs +++ b/crates/services/compression/src/ports/block_source.rs @@ -9,6 +9,8 @@ pub struct BlockWithMetadata { events: Vec, } +pub(crate) type BlockHeight = u32; + impl BlockWithMetadata { /// Create a new block with metadata pub fn new( @@ -18,7 +20,7 @@ impl BlockWithMetadata { Self { block, events } } - pub(crate) fn height(&self) -> u32 { + pub(crate) fn height(&self) -> BlockHeight { (*self.block.header().height()).into() } diff --git a/crates/services/compression/src/service.rs b/crates/services/compression/src/service.rs index ec976816699..30702a22fa6 100644 --- a/crates/services/compression/src/service.rs +++ b/crates/services/compression/src/service.rs @@ -8,6 +8,11 @@ use crate::{ }, configuration::CompressionConfigProvider, }, + sync_state::{ + new_sync_state_channel, + SyncStateNotifier, + SyncStateObserver, + }, temporal_registry::{ CompressionContext, CompressionStorageWrapper, @@ -15,7 +20,6 @@ use crate::{ }; use fuel_core_compression::compress::compress; use fuel_core_services::{ - EmptyShared, RunnableService, RunnableTask, ServiceRunner, @@ -35,6 +39,8 @@ pub struct CompressionService { storage: S, /// The compression config. config: CompressionConfig, + /// The sync notifier + sync_notifier: SyncStateNotifier, } use fuel_core_storage::transactional::WriteTransaction; @@ -48,10 +54,13 @@ where storage: S, config: CompressionConfig, ) -> Self { + let (sync_notifier, _) = new_sync_state_channel(); + Self { block_stream, storage, config, + sync_notifier, } } } @@ -96,9 +105,51 @@ where &mut self, block_with_metadata: &crate::ports::block_source::BlockWithMetadata, ) -> crate::Result<()> { + // set the status to not synced + if let Err(err) = self + .sync_notifier + .send(crate::sync_state::SyncState::NotSynced) + { + tracing::error!("Failed to set sync status to not synced: {:?}", err); + } // compress the block self.compress_block(block_with_metadata)?; - // get registry root (?) and push to shared state + // set the status to synced + if let Err(err) = self + .sync_notifier + .send(crate::sync_state::SyncState::Synced( + block_with_metadata.height(), + )) + { + tracing::error!("Failed to set sync status to synced: {:?}", err); + } + Ok(()) + } +} + +/// Shared data for the compression service. +#[derive(Debug, Clone)] +pub struct SharedData { + /// Allows to observe the sync state. + sync_observer: SyncStateObserver, +} + +impl SharedData { + /// Waits until the compression service has synced + /// with current l2 block height + pub async fn await_synced(&self) -> crate::Result<()> { + let mut observer = self.sync_observer.clone(); + loop { + if observer.borrow_and_update().is_synced() { + break; + } + + observer + .changed() + .await + .map_err(|_| crate::errors::CompressionError::FailedToGetSyncStatus)?; + } + Ok(()) } } @@ -110,11 +161,13 @@ where { const NAME: &'static str = "CompressionService"; type Task = Self; - type SharedData = EmptyShared; + type SharedData = SharedData; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { - EmptyShared + SharedData { + sync_observer: self.sync_notifier.subscribe(), + } } async fn into_task( @@ -315,11 +368,13 @@ mod tests { storage, config_provider.config(), ); + let sync_observer = service.shared_data(); // when let _ = service.run(&mut StateWatcher::started()).await; // then + sync_observer.await_synced().await.unwrap(); let maybe_block = service .storage .storage_as_ref::() diff --git a/crates/services/compression/src/sync_state.rs b/crates/services/compression/src/sync_state.rs new file mode 100644 index 00000000000..8f0046b49a5 --- /dev/null +++ b/crates/services/compression/src/sync_state.rs @@ -0,0 +1,27 @@ +//! This module contains the `SyncState` enum, which represents the state of the synchronization process between storages. + +use crate::ports::block_source::BlockHeight; + +/// The synchronization state between storages. +#[derive(Debug, Clone)] +pub enum SyncState { + /// The storages are not synchronized. + NotSynced, + /// The storages are synchronized to the given height. + Synced(BlockHeight), +} + +impl SyncState { + pub(crate) fn is_synced(&self) -> bool { + matches!(self, Self::Synced(_)) + } +} + +/// A receiver for the synchronization state. +pub type SyncStateObserver = tokio::sync::watch::Receiver; +/// A sender for the synchronization state. +pub(crate) type SyncStateNotifier = tokio::sync::watch::Sender; + +pub(crate) fn new_sync_state_channel() -> (SyncStateNotifier, SyncStateObserver) { + tokio::sync::watch::channel(SyncState::NotSynced) +} diff --git a/tests/Cargo.toml b/tests/Cargo.toml index f11175776e8..07685bcf4cc 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -38,6 +38,9 @@ fuel-core-benches = { path = "../benches" } fuel-core-bin = { path = "../bin/fuel-core", features = ["parquet", "p2p"] } fuel-core-client = { path = "../crates/client", features = ["test-helpers"] } fuel-core-compression = { path = "../crates/compression" } +fuel-core-compression-service = { path = "../crates/services/compression", features = [ + "test-helpers", +] } fuel-core-gas-price-service = { path = "../crates/services/gas_price_service", features = [ "test-helpers", ] } diff --git a/tests/tests/da_compression.rs b/tests/tests/da_compression.rs index 039db6b1a3b..b2fc05a4020 100644 --- a/tests/tests/da_compression.rs +++ b/tests/tests/da_compression.rs @@ -1,15 +1,12 @@ use core::time::Duration; use fuel_core::{ chain_config::TESTNET_WALLET_SECRETS, - fuel_core_graphql_api::{ - da_compression::{ - DbTx, - DecompressDbTx, - }, - worker_service::DaCompressionConfig, - }, p2p_test_helpers::*, service::{ + config::{ + DaCompressionConfig, + DaCompressionMode, + }, Config, FuelService, }, @@ -22,6 +19,10 @@ use fuel_core_compression::{ decompress::decompress, VersionedCompressedBlock, }; +use fuel_core_compression_service::temporal_registry::{ + CompressionStorageWrapper, + DecompressionContext, +}; use fuel_core_storage::transactional::{ AtomicView, HistoricalView, @@ -60,10 +61,10 @@ async fn can_fetch_da_compressed_block_from_graphql() { let mut config = config_with_fee(); config.consensus_signer = SignMode::Key(Secret::new(poa_secret.into())); - let compression_config = fuel_core_compression::Config { - temporal_registry_retention: Duration::from_secs(3600), + let compression_config = DaCompressionConfig { + retention_duration: Duration::from_secs(3600), }; - config.da_compression = DaCompressionConfig::Enabled(compression_config); + config.da_compression = DaCompressionMode::Enabled(compression_config.clone()); let chain_id = config .snapshot_reader .chain_config() @@ -90,6 +91,7 @@ async fn can_fetch_da_compressed_block_from_graphql() { panic!("unexpected result {other:?}") } }; + srv.await_compression_synced().await.unwrap(); let block = client .da_compressed_block(block_height) @@ -102,14 +104,22 @@ async fn can_fetch_da_compressed_block_from_graphql() { let db = &srv.shared.database; let on_chain_before_execution = db.on_chain().view_at(&0u32.into()).unwrap(); - let mut tx_inner = db.off_chain().clone().into_transaction(); - let db_tx = DecompressDbTx { - db_tx: DbTx { - db_tx: &mut tx_inner, + let mut tx_inner = db.compression().clone().into_transaction(); + let db_tx = DecompressionContext { + compression_storage: CompressionStorageWrapper { + storage_tx: &mut tx_inner, }, onchain_db: on_chain_before_execution, }; - let decompressed = decompress(compression_config, db_tx, block).await.unwrap(); + let decompressed = decompress( + fuel_core_compression::Config { + temporal_registry_retention: compression_config.retention_duration, + }, + db_tx, + block, + ) + .await + .unwrap(); let block_from_on_chain_db = db .on_chain() @@ -140,8 +150,8 @@ async fn da_compressed_blocks_are_available_from_non_block_producing_nodes() { let pub_key = Input::owner(&secret.public_key()); let mut config = Config::local_node(); - config.da_compression = DaCompressionConfig::Enabled(fuel_core_compression::Config { - temporal_registry_retention: Duration::from_secs(3600), + config.da_compression = DaCompressionMode::Enabled(DaCompressionConfig { + retention_duration: Duration::from_secs(3600), }); let Nodes { @@ -166,13 +176,14 @@ async fn da_compressed_blocks_are_available_from_non_block_producing_nodes() { // Insert some txs let expected = producer.insert_txs().await; validator.consistency_20s(&expected).await; + validator.node.await_compression_synced().await.unwrap(); let block_height = 1u32.into(); - let block = v_client + let compressed_block = v_client .da_compressed_block(block_height) .await .unwrap() .expect("Compressed block not available from validator"); - let _: VersionedCompressedBlock = postcard::from_bytes(&block).unwrap(); + let _: VersionedCompressedBlock = postcard::from_bytes(&compressed_block).unwrap(); }