diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 373fc7fea54..34123189dc2 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -7,4 +7,17 @@ ignore = [ "RUSTSEC-2026-0020", # wasmtime advisory, pending upgrade "RUSTSEC-2026-0021", # wasmtime advisory, pending upgrade "RUSTSEC-2026-0049", # temporary CI ignore; fix in master + "RUSTSEC-2026-0085", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0086", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0087", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0088", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0089", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0091", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0092", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0093", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0094", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0095", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0096", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0098", # rustls-webpki via legacy transitive dependencies + "RUSTSEC-2026-0099", # rustls-webpki via legacy transitive dependencies ] diff --git a/.changes/changed/3138.md b/.changes/changed/3138.md deleted file mode 100644 index 981c7d1d63a..00000000000 --- a/.changes/changed/3138.md +++ /dev/null @@ -1 +0,0 @@ -Migrate CI from BuildJet to WarpBuild runners, update GitHub Actions to latest versions, and use pre-built binaries for cargo-nextest and cargo-audit. \ No newline at end of file diff --git a/.changes/changed/3203.md b/.changes/changed/3203.md deleted file mode 100644 index b1c1afc2af0..00000000000 --- a/.changes/changed/3203.md +++ /dev/null @@ -1 +0,0 @@ -Add lease port for PoA adapter to allow multiple producers to be live but only one leader. \ No newline at end of file diff --git a/.changes/changed/3225.md b/.changes/changed/3225.md deleted file mode 100644 index 9b7e7cbb882..00000000000 --- a/.changes/changed/3225.md +++ /dev/null @@ -1 +0,0 @@ -PoA quorum and HA failover fixes: Redis leader lease adapter improvements, write_block.lua HEIGHT_EXISTS check, sub-quorum block repair, Prometheus metrics, and chaos test harness. diff --git a/.changes/fixed/3124.md b/.changes/fixed/3124.md deleted file mode 100644 index 88a33bab305..00000000000 --- a/.changes/fixed/3124.md +++ /dev/null @@ -1 +0,0 @@ -Using Debian Bookworm as the runtime base image for Docker builds. This is the same base image as the Rust builder images. Keeping the images in-sync will help prevent runtime dependency mismatch issues. diff --git a/.changes/fixed/3271.md b/.changes/fixed/3271.md new file mode 100644 index 00000000000..b4d662ce787 --- /dev/null +++ b/.changes/fixed/3271.md @@ -0,0 +1 @@ +Fix PoA reconciliation deadlock when the same block exists on all Redis nodes but with different epochs. `unreconciled_blocks` now groups votes by `block_id` only (tracking max epoch as tiebreaker), so identical blocks written during re-promotion storms count toward quorum. diff --git a/.changes/fixed/3273.md b/.changes/fixed/3273.md new file mode 100644 index 00000000000..89dd2032571 --- /dev/null +++ b/.changes/fixed/3273.md @@ -0,0 +1 @@ +Improve performance of redis block publish by making more parallel and optimizing the lua code \ No newline at end of file diff --git a/.changes/fixed/3274.md b/.changes/fixed/3274.md new file mode 100644 index 00000000000..4629dfdbbae --- /dev/null +++ b/.changes/fixed/3274.md @@ -0,0 +1 @@ +Fix PoA leader deadlock after reconciliation import where `ensure_synced()` blocked forever because `execute_and_commit` marked reconciliation blocks as `Source::Network`, causing the SyncTask to transition to `NotSynced`. diff --git a/CHANGELOG.md b/CHANGELOG.md index a07a9f91c18..cf2df05c001 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased (see .changes folder)] +## [Version 0.47.4] + +### Changed +- [3138](https://github.com/FuelLabs/fuel-core/pull/3138): Migrate CI from BuildJet to WarpBuild runners, update GitHub Actions to latest versions, and use pre-built binaries for cargo-nextest and cargo-audit. +- [3203](https://github.com/FuelLabs/fuel-core/pull/3203): Add lease port for PoA adapter to allow multiple producers to be live but only one leader. +- [3225](https://github.com/FuelLabs/fuel-core/pull/3225): PoA quorum and HA failover fixes: Redis leader lease adapter improvements, write_block.lua HEIGHT_EXISTS check, sub-quorum block repair, Prometheus metrics, and chaos test harness. + +### Fixed +- [3124](https://github.com/FuelLabs/fuel-core/pull/3124): Using Debian Bookworm as the runtime base image for Docker builds. This is the same base image as the Rust builder images. Keeping the images in-sync will help prevent runtime dependency mismatch issues. +- [3264](https://github.com/FuelLabs/fuel-core/pull/3264): Rollback stale preconfirmations in the mempool when the canonical block at that height omits the preconfirmed transactions, restoring spent inputs and removing dependent transactions. + ## [Version 0.47.1] ### Fixed diff --git a/Cargo.lock b/Cargo.lock index b09f9804d96..e00419b6d15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3529,7 +3529,7 @@ dependencies = [ [[package]] name = "fuel-core" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "assert_matches", @@ -3563,7 +3563,7 @@ dependencies = [ "fuel-core-trace", "fuel-core-tx-status-manager", "fuel-core-txpool", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-core-upgradable-executor", "futures", "hex", @@ -3620,7 +3620,7 @@ dependencies = [ "fuel-core-storage", "fuel-core-sync", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "hex", "itertools 0.12.1", @@ -3643,11 +3643,11 @@ dependencies = [ [[package]] name = "fuel-core-bft" -version = "0.47.3" +version = "0.47.4" [[package]] name = "fuel-core-bin" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "aws-config", @@ -3662,7 +3662,7 @@ dependencies = [ "fuel-core-poa", "fuel-core-shared-sequencer", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "hex", "humantime", "itertools 0.12.1", @@ -3685,7 +3685,7 @@ dependencies = [ [[package]] name = "fuel-core-block-aggregator-api" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -3693,7 +3693,7 @@ dependencies = [ "enum-iterator", "fuel-core-services", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "num_enum", "postcard", @@ -3710,7 +3710,7 @@ dependencies = [ [[package]] name = "fuel-core-chain-config" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "bech32", @@ -3718,7 +3718,7 @@ dependencies = [ "educe", "fuel-core-chain-config", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "insta", "itertools 0.12.1", "parquet", @@ -3736,7 +3736,7 @@ dependencies = [ [[package]] name = "fuel-core-chaos-test" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "clap", @@ -3744,7 +3744,7 @@ dependencies = [ "fuel-core-chain-config", "fuel-core-poa", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "humantime", "rand 0.8.5", @@ -3758,14 +3758,14 @@ dependencies = [ [[package]] name = "fuel-core-client" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "base64 0.22.1", "cynic", "derive_more 0.99.20", "eventsource-client", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "hex", "hyper-rustls 0.24.2", @@ -3783,23 +3783,23 @@ dependencies = [ [[package]] name = "fuel-core-client-bin" -version = "0.47.3" +version = "0.47.4" dependencies = [ "clap", "fuel-core-client", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "serde_json", "tokio", ] [[package]] name = "fuel-core-compression" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "enum_dispatch", "fuel-core-compression", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "paste", "postcard", "proptest", @@ -3812,7 +3812,7 @@ dependencies = [ [[package]] name = "fuel-core-compression-service" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -3823,7 +3823,7 @@ dependencies = [ "fuel-core-metrics", "fuel-core-services", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "paste", "rand 0.8.5", @@ -3838,30 +3838,30 @@ dependencies = [ [[package]] name = "fuel-core-consensus-module" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "fuel-core-chain-config", "fuel-core-poa", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "test-case", ] [[package]] name = "fuel-core-database" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "derive_more 0.99.20", "fuel-core-storage", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", ] [[package]] name = "fuel-core-e2e-client" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "assert_cmd", @@ -3869,7 +3869,7 @@ dependencies = [ "fuel-core-chain-config", "fuel-core-client", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "hex", "humantime-serde", @@ -3887,13 +3887,13 @@ dependencies = [ [[package]] name = "fuel-core-executor" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "fuel-core-storage", "fuel-core-syscall", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "parking_lot", "serde", "sha2 0.10.9", @@ -3902,7 +3902,7 @@ dependencies = [ [[package]] name = "fuel-core-gas-price-service" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -3910,7 +3910,7 @@ dependencies = [ "fuel-core-metrics", "fuel-core-services", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-gas-price-algorithm", "futures", "mockito", @@ -3930,14 +3930,14 @@ dependencies = [ [[package]] name = "fuel-core-importer" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "derive_more 0.99.20", "fuel-core-metrics", "fuel-core-storage", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "mockall", "rayon", "test-case", @@ -3947,18 +3947,18 @@ dependencies = [ [[package]] name = "fuel-core-keygen" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "clap", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "libp2p-identity", "serde", ] [[package]] name = "fuel-core-keygen-bin" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "atty", @@ -3971,7 +3971,7 @@ dependencies = [ [[package]] name = "fuel-core-metrics" -version = "0.47.3" +version = "0.47.4" dependencies = [ "once_cell", "parking_lot", @@ -3986,7 +3986,7 @@ dependencies = [ [[package]] name = "fuel-core-p2p" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -3995,7 +3995,7 @@ dependencies = [ "fuel-core-services", "fuel-core-storage", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "hex", "hickory-resolver", @@ -4019,12 +4019,12 @@ dependencies = [ [[package]] name = "fuel-core-parallel-executor" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "fuel-core-executor", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-core-upgradable-executor", "futures", "rand 0.8.5", @@ -4033,7 +4033,7 @@ dependencies = [ [[package]] name = "fuel-core-poa" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -4041,7 +4041,7 @@ dependencies = [ "fuel-core-services", "fuel-core-storage", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "mockall", "rand 0.8.5", "serde", @@ -4055,7 +4055,7 @@ dependencies = [ [[package]] name = "fuel-core-producer" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -4063,7 +4063,7 @@ dependencies = [ "fuel-core-producer", "fuel-core-storage", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "mockall", "proptest", "rand 0.8.5", @@ -4074,7 +4074,7 @@ dependencies = [ [[package]] name = "fuel-core-relayer" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -4088,7 +4088,7 @@ dependencies = [ "fuel-core-services", "fuel-core-storage", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "mockall", "once_cell", @@ -4106,7 +4106,7 @@ dependencies = [ [[package]] name = "fuel-core-services" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -4122,7 +4122,7 @@ dependencies = [ [[package]] name = "fuel-core-shared-sequencer" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -4130,7 +4130,7 @@ dependencies = [ "cosmos-sdk-proto", "cosmrs", "fuel-core-services", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-sequencer-proto", "futures", "postcard", @@ -4146,13 +4146,13 @@ dependencies = [ [[package]] name = "fuel-core-storage" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "derive_more 0.99.20", "enum-iterator", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-vm 0.65.0", "impl-tools", "itertools 0.12.1", @@ -4170,13 +4170,13 @@ dependencies = [ [[package]] name = "fuel-core-sync" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", "fuel-core-services", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "mockall", "rand 0.8.5", @@ -4189,9 +4189,9 @@ dependencies = [ [[package]] name = "fuel-core-syscall" -version = "0.47.3" +version = "0.47.4" dependencies = [ - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "parking_lot", "tracing", ] @@ -4221,7 +4221,7 @@ dependencies = [ "fuel-core-storage", "fuel-core-trace", "fuel-core-txpool", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-core-upgradable-executor", "futures", "hex", @@ -4250,7 +4250,7 @@ dependencies = [ [[package]] name = "fuel-core-trace" -version = "0.47.3" +version = "0.47.4" dependencies = [ "ctor", "fork", @@ -4266,13 +4266,13 @@ dependencies = [ [[package]] name = "fuel-core-tx-status-manager" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", "fuel-core-metrics", "fuel-core-services", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "mockall", "parking_lot", @@ -4287,7 +4287,7 @@ dependencies = [ [[package]] name = "fuel-core-txpool" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -4297,7 +4297,8 @@ dependencies = [ "fuel-core-storage", "fuel-core-syscall", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-txpool", + "fuel-core-types 0.47.4", "futures", "lru 0.13.0", "mockall", @@ -4328,7 +4329,7 @@ dependencies = [ [[package]] name = "fuel-core-types" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "aws-config", @@ -4352,13 +4353,13 @@ dependencies = [ [[package]] name = "fuel-core-upgradable-executor" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "derive_more 0.99.20", "fuel-core-executor", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-core-wasm-executor", "futures", "parking_lot", @@ -4369,13 +4370,13 @@ dependencies = [ [[package]] name = "fuel-core-wasm-executor" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "fuel-core-executor", "fuel-core-storage", "fuel-core-types 0.35.0", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "postcard", "proptest", @@ -4446,7 +4447,7 @@ dependencies = [ [[package]] name = "fuel-gas-price-algorithm" -version = "0.47.3" +version = "0.47.4" dependencies = [ "proptest", "rand 0.8.5", @@ -10101,7 +10102,7 @@ dependencies = [ "fuel-core-storage", "fuel-core-trace", "fuel-core-txpool", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "itertools 0.12.1", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index 8f99ba359ef..5f73e704dd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ keywords = ["blockchain", "cryptocurrencies", "fuel-vm", "vm"] license = "BUSL-1.1" repository = "https://github.com/FuelLabs/fuel-core" rust-version = "1.90.0" -version = "0.47.3" +version = "0.47.4" [workspace.dependencies] @@ -80,36 +80,36 @@ educe = { version = "0.6", default-features = false, features = ["Eq", "PartialE enum-iterator = "1.2" enum_dispatch = "0.3.13" # Workspace members -fuel-core = { version = "0.47.3", path = "./crates/fuel-core", default-features = false } -fuel-core-bin = { version = "0.47.3", path = "./bin/fuel-core" } -fuel-core-chain-config = { version = "0.47.3", path = "./crates/chain-config", default-features = false } -fuel-core-client = { version = "0.47.3", path = "./crates/client" } -fuel-core-compression = { version = "0.47.3", path = "./crates/compression" } -fuel-core-compression-service = { version = "0.47.3", path = "./crates/services/compression" } -fuel-core-consensus-module = { version = "0.47.3", path = "./crates/services/consensus_module" } -fuel-core-database = { version = "0.47.3", path = "./crates/database" } -fuel-core-executor = { version = "0.47.3", path = "./crates/services/executor", default-features = false } -fuel-core-gas-price-service = { version = "0.47.3", path = "crates/services/gas_price_service" } -fuel-core-importer = { version = "0.47.3", path = "./crates/services/importer" } -fuel-core-keygen = { version = "0.47.3", path = "./crates/keygen" } -fuel-core-metrics = { version = "0.47.3", path = "./crates/metrics" } -fuel-core-p2p = { version = "0.47.3", path = "./crates/services/p2p" } -fuel-core-parallel-executor = { version = "0.47.3", path = "./crates/services/parallel-executor" } -fuel-core-poa = { version = "0.47.3", path = "./crates/services/consensus_module/poa" } -fuel-core-producer = { version = "0.47.3", path = "./crates/services/producer" } -fuel-core-relayer = { version = "0.47.3", path = "./crates/services/relayer" } -fuel-core-services = { version = "0.47.3", path = "./crates/services" } -fuel-core-shared-sequencer = { version = "0.47.3", path = "crates/services/shared-sequencer" } -fuel-core-storage = { version = "0.47.3", path = "./crates/storage", default-features = false } -fuel-core-sync = { version = "0.47.3", path = "./crates/services/sync" } -fuel-core-syscall = { version = "0.47.3", path = "./crates/syscall", default-features = false } -fuel-core-trace = { version = "0.47.3", path = "./crates/trace" } -fuel-core-tx-status-manager = { version = "0.47.3", path = "./crates/services/tx_status_manager" } -fuel-core-txpool = { version = "0.47.3", path = "./crates/services/txpool_v2" } -fuel-core-types = { version = "0.47.3", path = "./crates/types", default-features = false } -fuel-core-upgradable-executor = { version = "0.47.3", path = "./crates/services/upgradable-executor" } -fuel-core-wasm-executor = { version = "0.47.3", path = "./crates/services/upgradable-executor/wasm-executor", default-features = false } -fuel-gas-price-algorithm = { version = "0.47.3", path = "crates/fuel-gas-price-algorithm" } +fuel-core = { version = "0.47.4", path = "./crates/fuel-core", default-features = false } +fuel-core-bin = { version = "0.47.4", path = "./bin/fuel-core" } +fuel-core-chain-config = { version = "0.47.4", path = "./crates/chain-config", default-features = false } +fuel-core-client = { version = "0.47.4", path = "./crates/client" } +fuel-core-compression = { version = "0.47.4", path = "./crates/compression" } +fuel-core-compression-service = { version = "0.47.4", path = "./crates/services/compression" } +fuel-core-consensus-module = { version = "0.47.4", path = "./crates/services/consensus_module" } +fuel-core-database = { version = "0.47.4", path = "./crates/database" } +fuel-core-executor = { version = "0.47.4", path = "./crates/services/executor", default-features = false } +fuel-core-gas-price-service = { version = "0.47.4", path = "crates/services/gas_price_service" } +fuel-core-importer = { version = "0.47.4", path = "./crates/services/importer" } +fuel-core-keygen = { version = "0.47.4", path = "./crates/keygen" } +fuel-core-metrics = { version = "0.47.4", path = "./crates/metrics" } +fuel-core-p2p = { version = "0.47.4", path = "./crates/services/p2p" } +fuel-core-parallel-executor = { version = "0.47.4", path = "./crates/services/parallel-executor" } +fuel-core-poa = { version = "0.47.4", path = "./crates/services/consensus_module/poa" } +fuel-core-producer = { version = "0.47.4", path = "./crates/services/producer" } +fuel-core-relayer = { version = "0.47.4", path = "./crates/services/relayer" } +fuel-core-services = { version = "0.47.4", path = "./crates/services" } +fuel-core-shared-sequencer = { version = "0.47.4", path = "crates/services/shared-sequencer" } +fuel-core-storage = { version = "0.47.4", path = "./crates/storage", default-features = false } +fuel-core-sync = { version = "0.47.4", path = "./crates/services/sync" } +fuel-core-syscall = { version = "0.47.4", path = "./crates/syscall", default-features = false } +fuel-core-trace = { version = "0.47.4", path = "./crates/trace" } +fuel-core-tx-status-manager = { version = "0.47.4", path = "./crates/services/tx_status_manager" } +fuel-core-txpool = { version = "0.47.4", path = "./crates/services/txpool_v2" } +fuel-core-types = { version = "0.47.4", path = "./crates/types", default-features = false } +fuel-core-upgradable-executor = { version = "0.47.4", path = "./crates/services/upgradable-executor" } +fuel-core-wasm-executor = { version = "0.47.4", path = "./crates/services/upgradable-executor/wasm-executor", default-features = false } +fuel-gas-price-algorithm = { version = "0.47.4", path = "crates/fuel-gas-price-algorithm" } # Fuel dependencies fuel-vm-private = { version = "0.65.0", package = "fuel-vm", default-features = false } diff --git a/crates/fuel-core/redis_leader_lease_adapter_scripts/write_block.lua b/crates/fuel-core/redis_leader_lease_adapter_scripts/write_block.lua index fe70fc8f6c3..4b4febdb0aa 100644 --- a/crates/fuel-core/redis_leader_lease_adapter_scripts/write_block.lua +++ b/crates/fuel-core/redis_leader_lease_adapter_scripts/write_block.lua @@ -39,16 +39,28 @@ end -- the new leader to fail on the orphan's node, but it can still reach -- quorum on the remaining nodes. If the orphan blocks quorum entirely, -- the leader must reconcile via the read path instead. +local posted_height = tonumber(ARGV[3]) local existing = redis.call("XREVRANGE", KEYS[1], "+", "-") +local stop_scan = false for _, entry in ipairs(existing) do local fields = entry[2] for i = 1, #fields, 2 do - if fields[i] == "height" and fields[i + 1] == ARGV[3] then - return redis.error_reply( - "HEIGHT_EXISTS: Block at height " .. ARGV[3] .. " already in stream" - ) + if fields[i] == "height" then + local entry_height = tonumber(fields[i + 1]) + if entry_height == posted_height then + return redis.error_reply( + "HEIGHT_EXISTS: Block at height " .. ARGV[3] .. " already in stream" + ) + end + if entry_height ~= nil and entry_height < posted_height then + stop_scan = true + break + end end end + if stop_scan then + break + end end -- 5) Persist block entry. diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index 71438b52558..9589599639d 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -797,20 +797,29 @@ impl RedisLeaderLeaseAdapter { break; } + // Group votes by block_id only (not epoch). The same block can + // be written to different nodes with different epochs during + // re-promotion storms — but if the block_id matches, it's the + // same block and all copies count toward quorum. We track the + // max epoch per block_id as the tiebreaker for fork resolution + // when block_ids genuinely differ. let votes = blocks_by_node .iter() .filter_map(|blocks_by_height| blocks_by_height.get(¤t_height)) .flat_map(|blocks_by_epoch| blocks_by_epoch.iter()) .fold( - HashMap::<(u64, BlockId), (usize, SealedBlock)>::new(), + HashMap::::new(), |mut votes, (epoch, block)| { - let vote_key = (*epoch, block.entity.id()); + let vote_key = block.entity.id(); match votes.get_mut(&vote_key) { - Some((count, _)) => { + Some((max_epoch, count, _)) => { *count = count.saturating_add(1); + if *epoch > *max_epoch { + *max_epoch = *epoch; + } } None => { - votes.insert(vote_key, (1, block.clone())); + votes.insert(vote_key, (*epoch, 1, block.clone())); } } votes @@ -819,8 +828,8 @@ impl RedisLeaderLeaseAdapter { let winner = votes .into_iter() - .max_by_key(|((epoch, _), _)| *epoch) - .map(|(_, (count, block))| (count, block)); + .max_by_key(|(_, (max_epoch, _, _))| *max_epoch) + .map(|(_, (_, count, block))| (count, block)); if let Some((count, block)) = winner { if self.quorum_reached(count) { @@ -982,6 +991,33 @@ impl RedisLeaderLeaseAdapter { } } + fn publish_block_on_all_nodes( + &self, + epoch: u64, + block: &SealedBlock, + block_data: &[u8], + ) -> Vec> { + std::thread::scope(|scope| { + let handles = self + .redis_nodes + .iter() + .map(|redis_node| { + scope.spawn(move || { + self.publish_block_on_node(redis_node, epoch, block, block_data) + }) + }) + .collect::>(); + + handles + .into_iter() + .map(|handle| match handle.join() { + Ok(result) => result, + Err(_) => Err(anyhow!("Redis publish worker panicked")), + }) + .collect() + }) + } + /// Repropose a sub-quorum block to all Redis nodes to reach quorum. /// Called during reconciliation when a block exists on some nodes but /// below quorum — possibly from a leader that published and committed @@ -1020,8 +1056,8 @@ impl RedisLeaderLeaseAdapter { // block at this height, but it might be a different block from // a competing leader's partial write. let mut total_with_block = pre_existing_count; - for redis_node in &self.redis_nodes { - match self.publish_block_on_node(redis_node, epoch, block, &block_data) { + for result in self.publish_block_on_all_nodes(epoch, block, &block_data) { + match result { Ok(WriteBlockResult::Written) => { total_with_block = total_with_block.saturating_add(1); } @@ -1211,16 +1247,14 @@ impl BlockReconciliationWritePort for RedisLeaderLeaseAdapter { }; let block_data = postcard::to_allocvec(block)?; let successes = self - .redis_nodes - .iter() - .map(|redis_node| { - match self.publish_block_on_node(redis_node, epoch, block, &block_data) { - Ok(WriteBlockResult::Written) => true, - Ok(_) => false, - Err(err) => { - tracing::debug!("Redis publish on node failed: {err}"); - false - } + .publish_block_on_all_nodes(epoch, block, &block_data) + .into_iter() + .map(|result| match result { + Ok(WriteBlockResult::Written) => true, + Ok(_) => false, + Err(err) => { + tracing::debug!("Redis publish on node failed: {err}"); + false } }) .filter(|success| *success) @@ -1565,6 +1599,81 @@ mod tests { ); } + /// Reproduces the devnet deadlock from April 17, 2026. + /// + /// The same block was written to all 3 nodes during re-promotion storms, + /// so each node has the same block_id but with different epoch metadata. + /// The old `(epoch, block_id)` vote grouping fragmented these into + /// separate vote groups, with the max-epoch group having a count below + /// quorum. Repair then failed because every node returned HEIGHT_EXISTS. + /// + /// With the fix (grouping by block_id only), all copies of the same + /// block count toward quorum regardless of epoch metadata — so this + /// state resolves without repair. + #[tokio::test(flavor = "multi_thread")] + async fn leader_state__when_same_block_has_different_epochs_across_nodes_then_reconciles_without_repair() + { + // given: same block on all 3 nodes, but with different epochs + // (as happens when re-promotion writes race during production) + let redis_a = RedisTestServer::spawn(); + let redis_b = RedisTestServer::spawn(); + let redis_c = RedisTestServer::spawn(); + let lease_key = "poa:test:same-block-different-epochs".to_string(); + let stream_key = format!("{lease_key}:block:stream"); + let adapter = new_test_adapter( + vec![ + redis_a.redis_url(), + redis_b.redis_url(), + redis_c.redis_url(), + ], + lease_key, + ); + assert!( + adapter + .acquire_lease_if_free() + .await + .expect("acquire should succeed"), + "adapter should acquire lease" + ); + + // Same block (same data, same block_id) on all 3 nodes, but each + // with a different epoch. This simulates what happens when the + // original leader was re-promoted repeatedly during a race, + // writing the same block content each time with a bumped epoch. + let block = poa_block_at_time(1, 10); + let block_data = postcard::to_allocvec(&block).expect("should serialize"); + + let redis_a_client = + redis::Client::open(redis_a.redis_url()).expect("redis a client"); + let redis_b_client = + redis::Client::open(redis_b.redis_url()).expect("redis b client"); + let redis_c_client = + redis::Client::open(redis_c.redis_url()).expect("redis c client"); + let mut conn_a = redis_a_client.get_connection().expect("redis a conn"); + let mut conn_b = redis_b_client.get_connection().expect("redis b conn"); + let mut conn_c = redis_c_client.get_connection().expect("redis c conn"); + + // Same block_id, three different epochs + append_stream_block(&mut conn_a, &stream_key, 1, &block_data, 5); + append_stream_block(&mut conn_b, &stream_key, 1, &block_data, 7); + append_stream_block(&mut conn_c, &stream_key, 1, &block_data, 9); + + // when: leader reconciles + let leader_state = adapter + .leader_state(1.into()) + .await + .expect("leader_state should succeed"); + + // then: the block is reconciled directly (no repair needed). Without + // the fix, the old logic would have split the 3 copies into 3 vote + // groups and tried to repair the max-epoch group (count=1), which + // would deadlock because every node returns HEIGHT_EXISTS. + assert!( + matches!(leader_state, LeaderState::UnreconciledBlocks(ref blocks) if blocks.len() == 1), + "Expected block to be reconciled from quorum across mixed epochs, got {leader_state:?}" + ); + } + #[tokio::test(flavor = "multi_thread")] async fn leader_state__when_same_height_entry_exists_on_less_than_quorum_nodes_then_repairs_it() { diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 210408c1b12..ac929ffcd06 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -143,6 +143,9 @@ pub struct MainTask { /// externally controlled start of block production block_production_ready_signal: BlockProductionReadySignal, reconciliation_port: RP, + /// Shared with SyncTask — blocks at heights <= this watermark were + /// imported via reconciliation and should not trigger NotSynced. + reconciliation_watermark: Arc, } impl MainTask @@ -183,12 +186,15 @@ where .. } = config; + let reconciliation_watermark = Arc::new(std::sync::atomic::AtomicU32::new(0)); + let sync_task = SyncTask::new( peer_connections_stream, min_connected_reserved_peers, time_until_synced, block_stream, last_block, + Arc::clone(&reconciliation_watermark), ); let sync_task_handle = ServiceRunner::new(sync_task); @@ -213,6 +219,7 @@ where production_timeout, block_production_ready_signal, reconciliation_port, + reconciliation_watermark, } } @@ -628,6 +635,17 @@ where continue; } + // Set watermark to this block's height so SyncTask + // doesn't transition to NotSynced when it sees the + // broadcast. execute_and_commit marks blocks as + // Source::Network, which would otherwise cause a + // Synced → NotSynced transition and deadlock + // ensure_synced(). + self.reconciliation_watermark.fetch_max( + u32::from(block_height), + std::sync::atomic::Ordering::Release, + ); + match self.block_importer.execute_and_commit(block).await { Ok(()) => { self.last_height = block_height; diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index 23db1e2c49e..8b62bb32f03 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -47,9 +47,12 @@ use fuel_core_types::{ fuel_tx::*, fuel_types::BlockHeight, secrecy::Secret, - services::executor::{ - ExecutionResult, - UncommittedResult, + services::{ + block_importer::BlockImportInfo, + executor::{ + ExecutionResult, + UncommittedResult, + }, }, signer::SignMode, tai64::{ @@ -940,3 +943,144 @@ async fn consensus_service__run__will_produce_blocks_with_ready_signal() { let produced_block = block_receiver.recv().await.unwrap(); assert!(matches!(produced_block, FakeProducedBlock::New(_, _))); } + +/// Reproduces the deadlock from the April 9, 2026 testnet outage. +/// +/// After a FENCING_ERROR, reconciliation imports a block via +/// `execute_and_commit` which marks it as `Source::Network`. The SyncTask +/// sees this non-local block and transitions from Synced → NotSynced. +/// On the next `run()` iteration, `ensure_synced()` blocks forever +/// because the leader can't produce locally-sourced blocks while blocked. +/// +/// This test uses a `FakeReconciliationPort` that returns +/// `UnreconciledBlocks` on the first call (simulating reconciliation after +/// fencing error), then switches to `ReconciledLeader`. The +/// `MockBlockImporter::execute_and_commit` broadcasts a `Source::Network` +/// block into the block_stream, triggering the SyncTask's NotSynced +/// transition. Without the watermark fix, the service deadlocks and +/// never produces a block. +#[tokio::test] +async fn main_task__reconciliation_import_does_not_deadlock_leader() { + // given: a PoA service with Trigger::Interval + let config = Config { + trigger: Trigger::Interval { + block_time: Duration::from_millis(10), + }, + signer: SignMode::Key(test_signing_key()), + metrics: false, + min_connected_reserved_peers: 0, + time_until_synced: Duration::ZERO, + ..Default::default() + }; + + let (block_producer, mut block_receiver) = FakeBlockProducer::new(); + + // Use an mpsc channel to feed both execute_and_commit results and + // the SyncTask's block_stream. This simulates what the real importer + // does when `execute_and_commit` commits a block and broadcasts it. + let (block_import_sender, block_import_receiver) = + tokio::sync::mpsc::channel::(16); + + let mut block_importer = MockBlockImporter::default(); + block_importer.expect_commit_result().returning(|_| Ok(())); + + // When execute_and_commit is called for the reconciliation block, + // send it as Source::Network — this is what the real importer + // does (ImportResult::new_from_network at importer.rs:585). + let sender_for_import = block_import_sender.clone(); + block_importer + .expect_execute_and_commit() + .returning(move |block| { + let header = block.entity.header().clone(); + let _ = sender_for_import.try_send(BlockImportInfo::new_from_network(header)); + Ok(()) + }); + + // The block_stream feeds the SyncTask — wrap the mpsc receiver. + // Use Option+Mutex to allow moving out of the FnMut closure. + let receiver_cell = Arc::new(StdMutex::new(Some(block_import_receiver))); + block_importer.expect_block_stream().returning(move || { + let rx = receiver_cell + .lock() + .unwrap() + .take() + .expect("block_stream called more than once"); + Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)) + }); + + block_importer + .expect_latest_block_height() + .returning(|| Ok(Some(BlockHeight::from(0u32)))); + + let txpool = MockTransactionPool::no_tx_updates(); + let p2p_port = generate_p2p_port(); + let predefined_blocks = InMemoryPredefinedBlocks::new(HashMap::new()); + let time = TestTime::at_unix_epoch(); + let watch = time.watch(); + + // Create a reconciliation port that returns UnreconciledBlocks once, + // then switches to ReconciledLeader for subsequent calls. + let block = block_for_height(2); + let consensus = FakeBlockSigner { succeeds: true } + .seal_block(&block) + .await + .unwrap(); + let unreconciled = LeaderState::UnreconciledBlocks(vec![SealedBlock { + entity: block, + consensus, + }]); + + let reconciliation_port = FakeReconciliationPort::with_state(Ok(unreconciled)); + let reconciliation_state = reconciliation_port.state.clone(); + + let task = MainTask::new( + &BlockHeader::new_block(BlockHeight::from(1u32), watch.now()), + config, + txpool, + block_producer, + block_importer, + p2p_port, + FakeBlockSigner { succeeds: true }.into(), + predefined_blocks, + watch, + FakeBlockProductionReadySignal, + reconciliation_port, + ); + + // when: start the service + let service = ServiceRunner::new(task); + service.start_and_await().await.unwrap(); + + // Give time for the reconciliation block to be imported. + // After import, switch to ReconciledLeader so the service can + // attempt normal block production. + tokio::task::yield_now().await; + time::advance(Duration::from_millis(20)).await; + tokio::task::yield_now().await; + + // Switch reconciliation port to ReconciledLeader + { + let mut state = reconciliation_state.lock().unwrap(); + *state = Ok(LeaderState::ReconciledLeader); + } + + // then: try to receive a produced block within a timeout. + // Without the fix, ensure_synced() deadlocks and no block is produced. + let receive_timeout = tokio::spawn(async move { + time::timeout(Duration::from_millis(500), block_receiver.recv()).await + }); + time::advance(Duration::from_millis(501)).await; + tokio::task::yield_now().await; + let receive_result = receive_timeout.await.unwrap(); + + let _ = service.stop_and_await().await; + + // This assertion fails without the watermark fix — the service + // deadlocks in ensure_synced() and never produces a block. + assert!( + receive_result.is_ok(), + "Expected block production after reconciliation, but the service \ + deadlocked in ensure_synced() — this is the bug from the \ + April 9, 2026 testnet outage" + ); +} diff --git a/crates/services/consensus_module/poa/src/sync.rs b/crates/services/consensus_module/poa/src/sync.rs index bd66f794628..5c3364038d5 100644 --- a/crates/services/consensus_module/poa/src/sync.rs +++ b/crates/services/consensus_module/poa/src/sync.rs @@ -1,5 +1,11 @@ use std::{ - sync::Arc, + sync::{ + Arc, + atomic::{ + AtomicU32, + Ordering, + }, + }, time::Duration, }; @@ -52,6 +58,10 @@ pub struct SyncTask { state_receiver: watch::Receiver, inner_state: InnerSyncState, timer: Option, + /// Blocks at heights <= this watermark were imported via reconciliation + /// by the leader and should not trigger Synced → NotSynced transitions. + /// Set by MainTask via `fetch_max`, monotonically increasing, never cleared. + reconciliation_watermark: Arc, } impl SyncTask { @@ -61,6 +71,7 @@ impl SyncTask { time_until_synced: Duration, block_stream: BoxStream, block_header: &BlockHeader, + reconciliation_watermark: Arc, ) -> Self { let inner_state = InnerSyncState::from_config( min_connected_reserved_peers, @@ -92,6 +103,7 @@ impl SyncTask { state_receiver, inner_state, timer, + reconciliation_watermark, } } @@ -184,7 +196,11 @@ impl RunnableTask for SyncTask { self.restart_timer(); } InnerSyncState::Synced { block_header, has_sufficient_peers } if new_block_height > block_header.height() => { - if block_info.is_locally_produced() { + let watermark = self.reconciliation_watermark.load(Ordering::Acquire); + let is_reconciliation = watermark > 0 + && u32::from(*new_block_height) <= watermark; + + if block_info.is_locally_produced() || is_reconciliation { self.inner_state = InnerSyncState::Synced { block_header: block_info.block_header.clone(), has_sufficient_peers: *has_sufficient_peers @@ -278,6 +294,7 @@ impl InnerSyncState { } #[allow(clippy::arithmetic_side_effects)] +#[allow(non_snake_case)] #[cfg(test)] mod tests { use super::*; @@ -359,6 +376,7 @@ mod tests { time_until_synced, block_stream, &Default::default(), + Arc::new(AtomicU32::new(0)), ); (sync_task, watcher, tx) @@ -598,4 +616,124 @@ mod tests { )); matches!(*sync_task.state_receiver.borrow(), SyncState::Synced(_)); } + + /// Reproduces the deadlock root cause: a network-sourced block (from + /// reconciliation via `execute_and_commit`) arrives while the SyncTask + /// is in Synced state. Without the watermark fix, this transitions + /// the SyncTask to NotSynced, which deadlocks the leader's + /// `ensure_synced()` call. + #[tokio::test] + async fn sync_task__network_block_at_reconciliation_height_causes_not_synced_without_watermark() + { + // given: a SyncTask that starts in Synced state (min_peers=0, time=ZERO) + let connections_stream = MockStream::::new(vec![]).into_boxed(); + let block_stream = MockStream::::new(vec![]).into_boxed(); + + let (tx, shutdown) = + tokio::sync::watch::channel(fuel_core_services::State::Started); + let mut watcher: StateWatcher = shutdown.into(); + + // Watermark is 0 (not set) — simulates the pre-fix state + let watermark = Arc::new(AtomicU32::new(0)); + + let mut sync_task = SyncTask::new( + connections_stream, + 0, // min_connected_reserved_peers + Duration::ZERO, + block_stream, + &BlockHeader::new_block(5u32.into(), Tai64::now()), + watermark, + ); + + // Verify we start in Synced state + assert!( + matches!(*sync_task.state_receiver.borrow(), SyncState::Synced(_)), + "SyncTask should start Synced with min_peers=0 and time_until_synced=ZERO" + ); + + // when: a Source::Network block arrives at height 6 (> current height 5) + // This is what happens when reconciliation imports a block via + // execute_and_commit, which always uses ImportResult::new_from_network + let network_block_stream = + MockStream::new(vec![BlockHeader::new_block(6u32.into(), Tai64::now())]) + .map(BlockImportInfo::new_from_network) + .into_boxed(); + sync_task.block_stream = network_block_stream; + + let _ = sync_task.run(&mut watcher).await; + + // then: SyncTask transitions to NotSynced — THIS IS THE BUG + // The leader's ensure_synced() will now block forever because + // it can't produce locally-produced blocks while blocked. + assert_eq!( + SyncState::NotSynced, + *sync_task.state_receiver.borrow(), + "Without watermark fix, a network-sourced reconciliation block \ + causes NotSynced — this deadlocks the leader" + ); + + drop(tx); + } + + /// Verifies the watermark fix: when the reconciliation watermark covers + /// the block height, a network-sourced block should NOT trigger NotSynced. + #[tokio::test] + async fn sync_task__network_block_within_watermark_stays_synced() { + // given: a SyncTask in Synced state with watermark set to height 6 + let connections_stream = MockStream::::new(vec![]).into_boxed(); + let block_stream = MockStream::::new(vec![]).into_boxed(); + + let (tx, shutdown) = + tokio::sync::watch::channel(fuel_core_services::State::Started); + let mut watcher: StateWatcher = shutdown.into(); + + let watermark = Arc::new(AtomicU32::new(6)); + + let mut sync_task = SyncTask::new( + connections_stream, + 0, + Duration::ZERO, + block_stream, + &BlockHeader::new_block(5u32.into(), Tai64::now()), + watermark, + ); + + assert!(matches!( + *sync_task.state_receiver.borrow(), + SyncState::Synced(_) + )); + + // when: a Source::Network block at height 6 (within watermark) + let network_block_stream = + MockStream::new(vec![BlockHeader::new_block(6u32.into(), Tai64::now())]) + .map(BlockImportInfo::new_from_network) + .into_boxed(); + sync_task.block_stream = network_block_stream; + + let _ = sync_task.run(&mut watcher).await; + + // then: should stay Synced because watermark covers height 6 + assert!( + matches!(*sync_task.state_receiver.borrow(), SyncState::Synced(_)), + "With watermark=6, a network block at height 6 should NOT trigger NotSynced" + ); + + // when: a Source::Network block at height 7 (ABOVE watermark) + let network_block_stream = + MockStream::new(vec![BlockHeader::new_block(7u32.into(), Tai64::now())]) + .map(BlockImportInfo::new_from_network) + .into_boxed(); + sync_task.block_stream = network_block_stream; + + let _ = sync_task.run(&mut watcher).await; + + // then: should transition to NotSynced (watermark doesn't protect above its value) + assert_eq!( + SyncState::NotSynced, + *sync_task.state_receiver.borrow(), + "A network block above the watermark should still trigger NotSynced" + ); + + drop(tx); + } } diff --git a/crates/services/txpool_v2/Cargo.toml b/crates/services/txpool_v2/Cargo.toml index 24a5f567bb1..2d59c12081c 100644 --- a/crates/services/txpool_v2/Cargo.toml +++ b/crates/services/txpool_v2/Cargo.toml @@ -34,6 +34,7 @@ tracing = { workspace = true } [dev-dependencies] fuel-core-storage = { workspace = true, features = ["std", "test-helpers"] } fuel-core-trace = { path = "../../trace" } +fuel-core-txpool = { path = ".", features = ["test-helpers"] } mockall = { workspace = true } rand = { workspace = true } tokio = { workspace = true, features = ["sync", "test-util"] } diff --git a/crates/services/txpool_v2/src/collision_manager/basic.rs b/crates/services/txpool_v2/src/collision_manager/basic.rs index 686271980db..3c31d881e32 100644 --- a/crates/services/txpool_v2/src/collision_manager/basic.rs +++ b/crates/services/txpool_v2/src/collision_manager/basic.rs @@ -21,6 +21,7 @@ use fuel_core_types::{ CoinPredicate, CoinSigned, }, + contract::Contract as ContractInput, message::{ MessageCoinPredicate, MessageCoinSigned, @@ -57,6 +58,9 @@ pub struct BasicCollisionManager { coins_spenders: BTreeMap, /// Contract -> Transaction that currently create the contract contracts_creators: HashMap, + /// Contract -> Transactions (by TxId) that currently use the contract as an input. + /// Symmetric to `contracts_creators`; used to evict dependents during rollback. + contract_users: HashMap>, /// Blob -> Transaction that currently create the blob blobs_users: HashMap, } @@ -67,6 +71,7 @@ impl BasicCollisionManager { messages_spenders: HashMap::new(), coins_spenders: BTreeMap::new(), contracts_creators: HashMap::new(), + contract_users: HashMap::new(), blobs_users: HashMap::new(), } } @@ -76,6 +81,7 @@ impl BasicCollisionManager { self.messages_spenders.is_empty() && self.coins_spenders.is_empty() && self.contracts_creators.is_empty() + && self.contract_users.is_empty() && self.blobs_users.is_empty() } @@ -88,6 +94,7 @@ impl BasicCollisionManager { let mut message_spenders = HashMap::new(); let mut coins_spenders = BTreeMap::new(); let mut contracts_creators = HashMap::new(); + let mut contract_users: HashMap> = HashMap::new(); let mut blobs_users = HashMap::new(); for tx in expected_txs { if let PoolTransaction::Blob(checked_tx, _) = tx.deref() { @@ -110,7 +117,12 @@ impl BasicCollisionManager { }) => { message_spenders.insert(*nonce, tx.id()); } - Input::Contract { .. } => {} + Input::Contract(ContractInput { contract_id, .. }) => { + contract_users + .entry(*contract_id) + .or_default() + .push(tx.id()); + } } } for output in tx.outputs() { @@ -152,6 +164,26 @@ impl BasicCollisionManager { "Some contract creators are missing from the collision manager: {:?}", contracts_creators ); + for (contract_id, users) in &self.contract_users { + let expected = contract_users.remove(contract_id).unwrap_or_else(|| panic!( + "A contract ({}) user list is present on the collision manager that shouldn't be there.", + contract_id + )); + let mut actual_sorted = users.clone(); + actual_sorted.sort(); + let mut expected_sorted = expected; + expected_sorted.sort(); + assert_eq!( + actual_sorted, expected_sorted, + "contract_users mismatch for contract {}", + contract_id + ); + } + assert!( + contract_users.is_empty(), + "Some contract users are missing from the collision manager: {:?}", + contract_users + ); } } @@ -174,6 +206,17 @@ where .collect() } + fn get_contract_users(&self, contract_id: &ContractId) -> Vec { + self.contract_users + .get(contract_id) + .cloned() + .unwrap_or_default() + } + + fn contract_created_in_pool(&self, contract_id: &ContractId) -> bool { + self.contracts_creators.contains_key(contract_id) + } + fn find_collisions( &self, transaction: &PoolTransaction, @@ -248,6 +291,7 @@ where let blob_id = checked_tx.transaction().blob_id(); self.blobs_users.insert(*blob_id, storage_id); } + let tx_id = store_entry.transaction.id(); for input in store_entry.transaction.inputs() { match input { Input::CoinSigned(CoinSigned { utxo_id, .. }) @@ -262,7 +306,12 @@ where // insert message self.messages_spenders.insert(*nonce, storage_id); } - _ => {} + Input::Contract(ContractInput { contract_id, .. }) => { + self.contract_users + .entry(*contract_id) + .or_default() + .push(tx_id); + } } } for output in store_entry.transaction.outputs().iter() { @@ -284,6 +333,7 @@ where let blob_id = checked_tx.transaction().blob_id(); self.blobs_users.remove(blob_id); } + let tx_id = transaction.id(); for input in transaction.inputs() { match input { Input::CoinSigned(CoinSigned { utxo_id, .. }) @@ -298,7 +348,14 @@ where // remove message self.messages_spenders.remove(nonce); } - _ => {} + Input::Contract(ContractInput { contract_id, .. }) => { + if let Some(users) = self.contract_users.get_mut(contract_id) { + users.retain(|id| id != &tx_id); + if users.is_empty() { + self.contract_users.remove(contract_id); + } + } + } } } for output in transaction.outputs().iter() { diff --git a/crates/services/txpool_v2/src/collision_manager/mod.rs b/crates/services/txpool_v2/src/collision_manager/mod.rs index c6e75a45167..d053b70cad4 100644 --- a/crates/services/txpool_v2/src/collision_manager/mod.rs +++ b/crates/services/txpool_v2/src/collision_manager/mod.rs @@ -8,6 +8,8 @@ use fuel_core_types::{ }; use std::collections::HashMap; +use fuel_core_types::fuel_tx::ContractId; + use crate::storage::StorageData; pub mod basic; @@ -27,6 +29,16 @@ pub trait CollisionManager { /// Get spenders of coins UTXO created by a transaction ID. fn get_coins_spenders(&self, tx_creator_id: &TxId) -> Vec; + /// Get the IDs of pool transactions that have `Input::Contract(contract_id)`. + /// Used during preconfirmation rollback to evict pool txs that were admitted + /// only because a preconfirmed tx temporarily created the contract. + fn get_contract_users(&self, contract_id: &ContractId) -> Vec; + + /// Returns true if a currently in-pool transaction creates `contract_id`. + /// Used during rollback to skip eviction when the contract is still available + /// via a pool tx (independent of the rolled-back preconfirmation). + fn contract_created_in_pool(&self, contract_id: &ContractId) -> bool; + /// Inform the collision manager that a transaction was stored. fn on_stored_transaction( &mut self, diff --git a/crates/services/txpool_v2/src/extracted_outputs.rs b/crates/services/txpool_v2/src/extracted_outputs.rs index ce0c7f3f28b..e1cd66974ec 100644 --- a/crates/services/txpool_v2/src/extracted_outputs.rs +++ b/crates/services/txpool_v2/src/extracted_outputs.rs @@ -49,7 +49,13 @@ impl ExtractedOutputs { for (utxo_id, output) in outputs { match output { Output::ContractCreated { contract_id, .. } => { - self.contract_created.insert(*contract_id, *utxo_id.tx_id()); + let tx_id = *utxo_id.tx_id(); + self.contract_created.insert(*contract_id, tx_id); + // Track the reverse mapping so cleanup via new_executed_transaction works. + self.contract_created_by_tx + .entry(tx_id) + .or_default() + .push(*contract_id); } Output::Coin { to, @@ -131,6 +137,16 @@ impl ExtractedOutputs { self.new_executed_transaction(tx_id); } + /// Returns the contract IDs created by `tx_id`, if any. + /// Call this **before** [`new_skipped_transaction`] / [`new_executed_transaction`] + /// if the caller needs the list for cleanup. + pub fn contracts_created_by(&self, tx_id: &TxId) -> &[ContractId] { + self.contract_created_by_tx + .get(tx_id) + .map(Vec::as_slice) + .unwrap_or(&[]) + } + pub fn new_executed_transaction(&mut self, tx_id: &TxId) { let contract_ids = self.contract_created_by_tx.remove(tx_id); if let Some(contract_ids) = contract_ids { diff --git a/crates/services/txpool_v2/src/lib.rs b/crates/services/txpool_v2/src/lib.rs index c27845af7b7..234dbbdf946 100644 --- a/crates/services/txpool_v2/src/lib.rs +++ b/crates/services/txpool_v2/src/lib.rs @@ -61,6 +61,9 @@ mod spent_inputs; mod tests; #[cfg(test)] fuel_core_trace::enable_tracing!(); +// Needed to activate the `test-helpers` feature flag for integration tests. +#[cfg(test)] +use fuel_core_txpool as _; use fuel_core_types::fuel_asm::Word; pub use pool::TxPoolStats; diff --git a/crates/services/txpool_v2/src/pool.rs b/crates/services/txpool_v2/src/pool.rs index 60568b3e2ce..66f67b3815d 100644 --- a/crates/services/txpool_v2/src/pool.rs +++ b/crates/services/txpool_v2/src/pool.rs @@ -426,6 +426,55 @@ where self.tx_id_to_storage_id.keys() } + /// Process a preconfirmed transaction as committed while recording its spent + /// inputs so they can be rolled back later if the transaction is not + /// included in the canonical block. + pub fn process_preconfirmed_committed_transaction(&mut self, tx_id: TxId) { + // If the tx was already extracted for local block production it is no + // longer in `tx_id_to_storage_id`, so the branch below that calls + // `record_tentative_spend` will be skipped. Preserve the input keys + // now, before `spend_inputs_by_tx_id` drains `spender_of_inputs[T]`. + if !self.tx_id_to_storage_id.contains_key(&tx_id) { + self.spent_inputs.move_spender_to_tentative(tx_id); + } + self.spent_inputs.spend_inputs_by_tx_id(tx_id); + if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) { + let dependents: Vec = + self.storage.get_direct_dependents(storage_id).collect(); + let Some(transaction) = self.storage.remove_transaction(storage_id) else { + // Invariant violation. Panic in tests, log in production. + debug_assert!(false, "Storage data not found for the transaction"); + tracing::warn!("Storage data not found for the transaction."); + return; + }; + self.extracted_outputs + .new_extracted_transaction(&transaction.transaction); + // Save the inputs before spending them permanently, so we can roll + // them back if the canonical block omits this transaction. + self.spent_inputs + .record_tentative_spend(tx_id, transaction.transaction.inputs()); + self.spent_inputs + .spend_inputs(tx_id, transaction.transaction.inputs()); + self.update_components_and_caches_on_removal(iter::once(&transaction)); + + let mut new_executable_transaction = false; + for dependent in dependents { + if !self.storage.has_dependencies(&dependent) + && let Some(storage_data) = self.storage.get(&dependent) + { + self.selection_algorithm + .new_executable_transaction(dependent, storage_data); + new_executable_transaction = true; + } + } + if new_executable_transaction { + self.new_executable_txs_notifier.send_replace(()); + } + } + + self.update_stats(); + } + /// Process committed transactions: /// - Remove transaction but keep its dependents and the dependents become executables. /// - Notify about possible new executable transactions. @@ -438,6 +487,7 @@ where self.storage.get_direct_dependents(storage_id).collect(); let Some(transaction) = self.storage.remove_transaction(storage_id) else { + // Invariant violation. Panic in tests, log in production. debug_assert!(false, "Storage data not found for the transaction"); tracing::warn!( "Storage data not found for the transaction during `remove_transaction`." @@ -461,6 +511,7 @@ where let mut new_executable_transaction = false; for promote in transactions_to_promote { let Some(storage_data) = self.storage.get(&promote) else { + // Invariant violation. Panic in tests, log in production. debug_assert!( false, "Dependent storage data not found for the transaction" @@ -568,13 +619,13 @@ where debug_assert!(!self.storage.has_dependencies(storage_id)); let Some(storage_data) = self.storage.get(storage_id) else { + // Invariant violation. Panic in tests, log in production. debug_assert!( false, "Storage data not found for one of the less worth transactions" ); tracing::warn!( - "Storage data not found for one of the less \ - worth transactions during `find_free_space`." + "Storage data not found for one of the less worth transactions" ); continue }; @@ -696,6 +747,89 @@ where self.update_stats(); } + /// Rollback a preconfirmed transaction that was not included in the canonical block. + /// + /// This clears the preconfirmation-derived outputs and removes any pool transactions + /// that depend on those outputs, since those inputs no longer exist on-chain. + pub fn rollback_preconfirmed_transaction(&mut self, tx_id: TxId) { + // Capture contracts created by this preconfirmation BEFORE clearing + // extracted_outputs, so we can evict any pool txs that were admitted + // only because of the now-stale temporary contract existence. + let created_contracts: Vec<_> = + self.extracted_outputs.contracts_created_by(&tx_id).to_vec(); + + // Remove preconfirmed outputs so dependents can't use them. + self.extracted_outputs.new_skipped_transaction(&tx_id); + // Allow the transaction itself to be re-submitted. + self.spent_inputs.unspend_preconfirmed(tx_id); + + let reason = format!( + "Preconfirmed parent transaction {tx_id} was not included in the canonical block" + ); + + // Remove any pool transactions that used the preconfirmed coin outputs as inputs. + let coin_dependents = self.collision_manager.get_coins_spenders(&tx_id); + if !coin_dependents.is_empty() { + for dependent in coin_dependents { + let removed = self + .storage + .remove_transaction_and_dependents_subtree(dependent); + self.update_components_and_caches_on_removal(removed.iter()); + let removed_txs: Vec<_> = removed + .into_iter() + .map(|data| { + let dependent_tx_id = data.transaction.id(); + let tx_status = statuses::SqueezedOut { + reason: reason.clone(), + }; + (dependent_tx_id, tx_status) + }) + .collect(); + if !removed_txs.is_empty() { + self.tx_status_manager.squeezed_out_txs(removed_txs); + } + } + } + + // Remove any pool transactions that used a preconfirmed contract creation + // as an input and were admitted only via `extracted_outputs` (no in-pool + // graph dependency). Skip eviction when the contract is still available + // through an independent in-pool creator. + for contract_id in created_contracts { + if self + .collision_manager + .contract_created_in_pool(&contract_id) + { + continue; + } + let user_tx_ids = self.collision_manager.get_contract_users(&contract_id); + for user_tx_id in user_tx_ids { + let Some(&storage_id) = self.tx_id_to_storage_id.get(&user_tx_id) else { + continue; + }; + let removed = self + .storage + .remove_transaction_and_dependents_subtree(storage_id); + self.update_components_and_caches_on_removal(removed.iter()); + let removed_txs: Vec<_> = removed + .into_iter() + .map(|data| { + let dependent_tx_id = data.transaction.id(); + let tx_status = statuses::SqueezedOut { + reason: reason.clone(), + }; + (dependent_tx_id, tx_status) + }) + .collect(); + if !removed_txs.is_empty() { + self.tx_status_manager.squeezed_out_txs(removed_txs); + } + } + } + + self.update_stats(); + } + fn check_blob_does_not_exist( tx: &PoolTransaction, persistent_storage: &impl TxPoolPersistentStorage, diff --git a/crates/services/txpool_v2/src/pool_worker.rs b/crates/services/txpool_v2/src/pool_worker.rs index ed64194bb9f..a1f6447656d 100644 --- a/crates/services/txpool_v2/src/pool_worker.rs +++ b/crates/services/txpool_v2/src/pool_worker.rs @@ -17,7 +17,10 @@ use fuel_core_types::{ }, }; use std::{ - iter, + collections::{ + BTreeMap, + HashSet, + }, ops::Deref, sync::Arc, time::SystemTime, @@ -94,6 +97,7 @@ impl PoolWorkerInterface { tx_pool: TxPool, view_provider: Arc>, limits: &ServiceChannelLimits, + initial_block_height: BlockHeight, ) -> Self where View: TxPoolPersistentStorage, @@ -137,6 +141,8 @@ impl PoolWorkerInterface { pending_pool: PendingPool::new(tx_pool.config.pending_pool_tx_ttl), pool: tx_pool, view_provider, + tentative_preconfs: BTreeMap::new(), + current_canonical_height: initial_block_height, }; tokio_runtime.block_on(async { @@ -271,6 +277,14 @@ pub(super) struct PoolWorker { pending_pool: PendingPool, view_provider: Arc>, notification_sender: Sender, + /// Tracks preconfirmed transaction IDs by their tentative block height. + /// Used to roll back stale preconfirmations when the canonical block at + /// that height does not include those transactions. + tentative_preconfs: BTreeMap>, + /// The height of the last canonical block imported by this node. + /// Used to discard late preconfirmations whose tentative block height + /// is already at or below the canonical tip. + current_canonical_height: BlockHeight, } impl PoolWorker @@ -488,9 +502,17 @@ where } fn process_block(&mut self, block_result: SharedImportResult) { - self.pool.process_committed_transactions( - block_result.tx_status.iter().map(|tx_status| tx_status.id), - ); + let block_height = *block_result.sealed_block.entity.header().height(); + self.current_canonical_height = self.current_canonical_height.max(block_height); + + let confirmed_tx_ids: HashSet = block_result + .tx_status + .iter() + .map(|tx_status| tx_status.id) + .collect(); + + self.pool + .process_committed_transactions(confirmed_tx_ids.iter().copied()); block_result.tx_status.iter().for_each(|tx_status| { self.pool @@ -498,6 +520,37 @@ where .new_executed_transaction(&tx_status.id); }); + // Reconcile tentative preconfirmations for all heights up to and including + // the imported block height. Any preconfirmed tx that is absent from the + // canonical block must have its state rolled back so those inputs/outputs + // do not remain stale in the mempool. + let stale_heights: Vec = self + .tentative_preconfs + .range(..=block_height) + .map(|(h, _)| *h) + .collect(); + + for height in stale_heights { + if let Some(tentative_txs) = self.tentative_preconfs.remove(&height) { + for tx_id in tentative_txs { + if confirmed_tx_ids.contains(&tx_id) { + // Tx was included in the canonical block — confirm the + // tentative spend record so it won't be rolled back. + self.pool.spent_inputs.confirm_tentative_spend(&tx_id); + } else { + tracing::debug!( + "Rolling back stale preconfirmation for tx {} \ + (tentative block {}, canonical block {})", + tx_id, + height, + block_height, + ); + self.pool.rollback_preconfirmed_transaction(tx_id); + } + } + } + } + let resolved_txs = self.pending_pool.new_known_txs( block_result .sealed_block @@ -529,20 +582,54 @@ where tx_id: TxId, status: PreConfirmationStatus, ) { - let outputs = match &status { + // Reject late preconfirmations whose tentative block height is already + // at or below the node's current canonical tip. Applying them would + // temporarily mark inputs as spent and admit dependents against outputs + // that may not be in any canonical block, creating a stale-acceptance + // window that is only unwound on the next block import. + let preconf_height = match &status { + PreConfirmationStatus::Success(s) => Some(s.tx_pointer.block_height()), + PreConfirmationStatus::Failure(s) => Some(s.tx_pointer.block_height()), + PreConfirmationStatus::SqueezedOut(_) => None, + }; + if let Some(h) = preconf_height + && h <= self.current_canonical_height + { + tracing::debug!( + "Ignoring late preconfirmation for tx {} at height {} \ + (current canonical height {})", + tx_id, + h, + self.current_canonical_height, + ); + return; + } + + let (outputs, block_height) = match &status { PreConfirmationStatus::Success(status) => { - self.pool.process_committed_transactions(iter::once(tx_id)); + self.pool.process_preconfirmed_committed_transaction(tx_id); + let height = status.tx_pointer.block_height(); if let Some(outputs) = &status.resolved_outputs { - outputs + (outputs, Some(height)) } else { + // Still track the height so block import can clean up spent_inputs. + self.tentative_preconfs + .entry(height) + .or_default() + .insert(tx_id); return; } } PreConfirmationStatus::Failure(status) => { - self.pool.process_committed_transactions(iter::once(tx_id)); + self.pool.process_preconfirmed_committed_transaction(tx_id); + let height = status.tx_pointer.block_height(); if let Some(outputs) = &status.resolved_outputs { - outputs + (outputs, Some(height)) } else { + self.tentative_preconfs + .entry(height) + .or_default() + .insert(tx_id); return; } } @@ -551,6 +638,16 @@ where return; } }; + + // Track the tentative block height so the preconfirmed outputs can be + // rolled back if the canonical block at that height omits this tx. + if let Some(height) = block_height { + self.tentative_preconfs + .entry(height) + .or_default() + .insert(tx_id); + } + // All of this can be useful in case that we didn't know about the transaction let resolved = self .pending_pool diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index eb466af937e..8dcc4c84912 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -803,8 +803,12 @@ where let (current_height_writer, current_height_reader) = unsafe { SeqLock::new(current_height) }; - let pool_worker = - PoolWorkerInterface::new(txpool, storage_provider, &service_channel_limits); + let pool_worker = PoolWorkerInterface::new( + txpool, + storage_provider, + &service_channel_limits, + current_height, + ); let shared_state = SharedState { request_read_sender: pool_worker.request_read_sender.clone(), diff --git a/crates/services/txpool_v2/src/spent_inputs.rs b/crates/services/txpool_v2/src/spent_inputs.rs index 4cc0c933c3d..66c556a7e83 100644 --- a/crates/services/txpool_v2/src/spent_inputs.rs +++ b/crates/services/txpool_v2/src/spent_inputs.rs @@ -27,6 +27,9 @@ pub struct SpentInputs { /// transaction spent it. Later, this information can be used to unspent /// or fully spend the input. spender_of_inputs: HashMap>, + /// Inputs permanently spent during preconfirmation processing, saved so + /// they can be rolled back if the preconfirmation turns out to be stale. + tentative_spent: HashMap>, } impl SpentInputs { @@ -34,6 +37,7 @@ impl SpentInputs { Self { spent_inputs: LruCache::new(capacity), spender_of_inputs: HashMap::new(), + tentative_spent: HashMap::new(), } } @@ -90,6 +94,21 @@ impl SpentInputs { } } + /// Transitions the inputs recorded by [`maybe_spend_inputs`] from the + /// `spender_of_inputs` map directly into `tentative_spent`, without + /// touching the live `spent_inputs` LRU cache. + /// + /// Call this **before** [`spend_inputs_by_tx_id`] when a preconfirmation + /// arrives for a tx that was already extracted for local block production + /// (i.e. absent from pool storage). This preserves the input keys for a + /// potential rollback via [`unspend_preconfirmed`] before + /// [`spend_inputs_by_tx_id`] drains `spender_of_inputs`. + pub fn move_spender_to_tentative(&mut self, tx_id: TxId) { + if let Some(keys) = self.spender_of_inputs.get(&tx_id) { + self.tentative_spent.insert(tx_id, keys.clone()); + } + } + /// If transaction is skipped during the block production, this functions /// can be used to unspend inputs, allowing other transactions to spend them. pub fn unspend_inputs(&mut self, tx_id: TxId) { @@ -114,6 +133,43 @@ impl SpentInputs { pub fn is_spent_tx(&self, tx: &TxId) -> bool { self.spent_inputs.contains(&InputKey::Tx(*tx)) } + + /// Record inputs that were permanently spent during preconfirmation processing. + /// The saved keys can later be rolled back via [`unspend_preconfirmed`]. + pub fn record_tentative_spend(&mut self, tx_id: TxId, inputs: &[Input]) { + let keys: Vec = inputs + .iter() + .filter_map(|input| { + if input.is_coin() { + input.utxo_id().cloned().map(InputKey::Utxo) + } else if input.is_message() { + input.nonce().cloned().map(InputKey::Message) + } else { + None + } + }) + .collect(); + self.tentative_spent.insert(tx_id, keys); + } + + /// Remove the tentative-spend record for a confirmed transaction, preventing + /// a spurious rollback. Called when the preconfirmed tx is included in the + /// canonical block. + pub fn confirm_tentative_spend(&mut self, tx_id: &TxId) { + self.tentative_spent.remove(tx_id); + } + + /// Removes the tx entry and any individually-tracked UTXO/message inputs + /// from spent inputs, allowing the same inputs to be re-used. + /// Used when rolling back a stale preconfirmation. + pub fn unspend_preconfirmed(&mut self, tx_id: TxId) { + self.spent_inputs.pop(&InputKey::Tx(tx_id)); + if let Some(saved_keys) = self.tentative_spent.remove(&tx_id) { + for key in saved_keys { + self.spent_inputs.pop(&key); + } + } + } } #[cfg(test)] diff --git a/crates/services/txpool_v2/src/tests/mocks.rs b/crates/services/txpool_v2/src/tests/mocks.rs index 94d1447bce2..ea119abd4d8 100644 --- a/crates/services/txpool_v2/src/tests/mocks.rs +++ b/crates/services/txpool_v2/src/tests/mocks.rs @@ -113,6 +113,11 @@ impl MockTxStatusManager { tx, } } + + /// Send a preconfirmation update to the pool worker. + pub fn send_preconfirmation(&self, tx_id: TxId, status: PreConfirmationStatus) { + let _ = self.tx_preconfirmations_update_sender.send((tx_id, status)); + } } impl ports::TxStatusManager for MockTxStatusManager { diff --git a/crates/services/txpool_v2/src/tests/mod.rs b/crates/services/txpool_v2/src/tests/mod.rs index 4578a2a50bc..cdcb9f78ecd 100644 --- a/crates/services/txpool_v2/src/tests/mod.rs +++ b/crates/services/txpool_v2/src/tests/mod.rs @@ -5,6 +5,7 @@ mod stability_test; mod tests_p2p; mod tests_pending_pool; mod tests_pool; +mod tests_preconf_rollback; mod tests_service; mod tx_status_manager_integration; mod universe; diff --git a/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs b/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs new file mode 100644 index 00000000000..9e730ef142b --- /dev/null +++ b/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs @@ -0,0 +1,565 @@ +//! When a block producer emits preconfirmations and then crashes or produces a +//! different block at the same height, the sentry/RPC mempool must purge the +//! stale preconfirmation state on the next canonical block import. + +use std::sync::Arc; + +use fuel_core_types::{ + blockchain::{ + block::Block, + consensus::Sealed, + }, + fuel_tx::{ + Contract, + Output, + TxPointer, + UniqueIdentifier, + UtxoId, + }, + fuel_types::{ + BlockHeight, + ContractId, + }, + services::{ + block_importer::ImportResult, + executor::{ + TransactionExecutionResult, + TransactionExecutionStatus, + }, + transaction_status::{ + PreConfirmationStatus, + statuses, + }, + }, +}; + +use fuel_core_services::Service as ServiceTrait; + +use crate::{ + Constraints, + tests::{ + mocks::MockImporter, + universe::{ + TestPoolUniverse, + create_contract_input, + }, + }, +}; + +/// Build a canonical block sealed at `height` that contains `tx_ids`. +fn make_block_import( + height: u32, + tx_ids: &[fuel_core_types::fuel_tx::TxId], +) -> Arc< + dyn std::ops::Deref + + Send + + Sync, +> { + let sealed_block = Sealed { + entity: { + let mut block = Block::default(); + block + .header_mut() + .set_block_height(BlockHeight::new(height)); + block + }, + consensus: Default::default(), + }; + let tx_statuses = tx_ids + .iter() + .map(|id| TransactionExecutionStatus { + id: *id, + result: TransactionExecutionResult::Success { + result: None, + receipts: Arc::new(vec![]), + total_gas: 0, + total_fee: 0, + }, + }) + .collect(); + Arc::new(ImportResult::new_from_local(sealed_block, tx_statuses, vec![]).wrap()) +} + +/// Build a `PreConfirmationStatus::Success` that carries one coin output. +fn make_preconf_success( + tx_id: fuel_core_types::fuel_tx::TxId, + block_height: u32, + output: Output, +) -> PreConfirmationStatus { + let utxo_id = UtxoId::new(tx_id, 0); + PreConfirmationStatus::Success( + statuses::PreConfirmationSuccess { + tx_pointer: TxPointer::new(BlockHeight::new(block_height), 0), + total_gas: 0, + total_fee: 0, + receipts: None, + resolved_outputs: Some(vec![(utxo_id, output)]), + } + .into(), + ) +} + +/// Build a `PreConfirmationStatus::Success` whose resolved outputs include a +/// `ContractCreated` entry, so that `extracted_outputs.contract_exists(contract_id)` +/// becomes true after the preconfirmation is processed. +fn make_preconf_with_contract_created( + tx_id: fuel_core_types::fuel_tx::TxId, + block_height: u32, + contract_id: ContractId, +) -> PreConfirmationStatus { + let utxo_id = UtxoId::new(tx_id, 0); + let output = Output::ContractCreated { + contract_id, + state_root: Contract::default_state_root(), + }; + PreConfirmationStatus::Success( + statuses::PreConfirmationSuccess { + tx_pointer: TxPointer::new(BlockHeight::new(block_height), 0), + total_gas: 0, + total_fee: 0, + receipts: None, + resolved_outputs: Some(vec![(utxo_id, output)]), + } + .into(), + ) +} + +/// Build a `PreConfirmationStatus::Success` with no resolved outputs. +fn make_preconf_success_no_outputs( + _tx_id: fuel_core_types::fuel_tx::TxId, + block_height: u32, +) -> PreConfirmationStatus { + PreConfirmationStatus::Success( + statuses::PreConfirmationSuccess { + tx_pointer: TxPointer::new(BlockHeight::new(block_height), 0), + total_gas: 0, + total_fee: 0, + receipts: None, + resolved_outputs: None, + } + .into(), + ) +} + +/// After a preconfirmation arrives for tx T at height H, and then a canonical +/// block at height H is imported *without* T, the tx should no longer be +/// marked as "spent" i.e. it can be re-inserted into the pool. +#[tokio::test] +async fn preconfirmed_tx_can_be_reinserted_after_rollback() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + let tx = universe.build_script_transaction(None, None, 10); + let tx_id = tx.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Insert and wait for submitted status. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + // Simulate the block producer preconfirming tx at block height 1. + universe.send_preconfirmation(tx_id, make_preconf_success_no_outputs(tx_id, 1)); + + // Give the pool worker time to process the preconfirmation. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // The tx should not be in the pool any more (committed). + let found = service.shared.find(vec![tx_id]).await.unwrap(); + assert!( + found[0].is_none(), + "tx should have been committed out of pool" + ); + + // When — import an empty block at height 1 (no tx T). + block_sender.send(make_block_import(1, &[])).await.unwrap(); + + // Give the worker time to process the block. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Then — re-inserting the same tx should now succeed because + // spent_inputs was rolled back. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + service.stop_and_await().await.unwrap(); +} + +/// When a preconfirmed tx's outputs are used by a dependent tx D, and the +/// canonical block does not include the preconfirmed tx, D must be removed +/// from the pool. +#[tokio::test] +async fn dependents_of_preconfirmed_tx_removed_on_rollback() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + + // tx_parent is the tx that will be preconfirmed but not included. + // It produces output_a (a coin). + let (output_a, unset_input_a) = universe.create_output_and_input(); + let tx_parent = universe.build_script_transaction(None, Some(vec![output_a]), 1); + let tx_parent_id = tx_parent.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Simulate receiving a preconfirmation for tx_parent (which the sentry may + // never have seen). The preconf carries output_a. + universe.send_preconfirmation( + tx_parent_id, + make_preconf_success(tx_parent_id, 1, output_a), + ); + + // Give the worker time to process the preconfirmation and register outputs. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Now insert tx_child that spends output_a (utxo from tx_parent). + let input_a = unset_input_a.into_input(UtxoId::new(tx_parent_id, 0)); + let tx_child = universe.build_script_transaction(Some(vec![input_a]), None, 2); + let tx_child_id = tx_child.id(&Default::default()); + + service.shared.insert(tx_child.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_child_id]) + .await; + + // Sanity: child is in the pool. + let found = service.shared.find(vec![tx_child_id]).await.unwrap(); + assert!(found[0].is_some(), "tx_child should be in pool"); + + // When — import a block at height 1 that does NOT contain tx_parent. + block_sender.send(make_block_import(1, &[])).await.unwrap(); + + // Then — tx_child depends on a now-stale preconfirmed output; it must be + // squeezed out. + universe + .await_expected_tx_statuses(vec![tx_child_id], |status| { + matches!( + status, + fuel_core_types::services::transaction_status::TransactionStatus::SqueezedOut(_) + ) + }) + .await + .unwrap(); + + let found = service.shared.find(vec![tx_child_id]).await.unwrap(); + assert!(found[0].is_none(), "tx_child should have been removed"); + + service.stop_and_await().await.unwrap(); +} + +/// When a preconfirmed tx IS included in the canonical block, its state must +/// be committed normally — no spurious rollback. +#[tokio::test] +async fn preconfirmed_tx_committed_normally_when_in_canonical_block() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + let tx = universe.build_script_transaction(None, None, 10); + let tx_id = tx.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + // Preconfirmation at height 1. + universe.send_preconfirmation(tx_id, make_preconf_success_no_outputs(tx_id, 1)); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // When — import a block at height 1 that CONTAINS the tx. + block_sender + .send(make_block_import(1, &[tx_id])) + .await + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Then — tx must not reappear in the pool; re-inserting it should fail + // because its inputs are now permanently spent (committed in the block). + let found = service.shared.find(vec![tx_id]).await.unwrap(); + assert!( + found[0].is_none(), + "tx should not be in pool after block commit" + ); + + service.stop_and_await().await.unwrap(); +} + +/// Regression test for the "extracted-first" rollback bug. +/// +/// Sequence: +/// 1. Tx T is inserted and then extracted for local block production. +/// `maybe_spend_inputs` records T's coin inputs in `spender_of_inputs`. +/// 2. A preconfirmation for T arrives. Because T was already removed from +/// `tx_id_to_storage_id`, `process_preconfirmed_committed_transaction` +/// takes the `else` branch and must still save T's inputs into +/// `tentative_spent` so they can be rolled back later. +/// 3. The canonical block omits T → `rollback_preconfirmed_transaction` must +/// clear those coin-input keys from `spent_inputs`. +/// 4. Re-inserting T must succeed (inputs no longer marked spent). +#[tokio::test] +async fn extracted_tx_inputs_freed_after_preconf_rollback() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + let tx = universe.build_script_transaction(None, None, 10); + let tx_id = tx.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Insert and wait for the tx to be accepted. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + // Extract the tx (simulating local block production). + // This calls `maybe_spend_inputs` and removes the tx from storage. + let extracted = service + .shared + .extract_transactions_for_block(Constraints { + minimal_gas_price: 0, + max_gas: u64::MAX, + maximum_txs: u16::MAX, + maximum_block_size: u32::MAX, + excluded_contracts: Default::default(), + }) + .unwrap(); + assert_eq!(extracted.len(), 1, "expected exactly one extracted tx"); + + // Preconfirmation arrives for the already-extracted tx. + universe.send_preconfirmation(tx_id, make_preconf_success_no_outputs(tx_id, 1)); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // When — import an empty canonical block at height 1 (tx is absent). + block_sender.send(make_block_import(1, &[])).await.unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Then — coin inputs must have been freed; re-inserting T must succeed. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + service.stop_and_await().await.unwrap(); +} + +/// Stale preconfirmations at an older height are cleaned up when a later +/// block is imported, even if the heights don't match exactly. +#[tokio::test] +async fn stale_preconfs_at_older_height_cleaned_up_by_later_block() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + + let (output_a, unset_input_a) = universe.create_output_and_input(); + let tx_parent = universe.build_script_transaction(None, Some(vec![output_a]), 1); + let tx_parent_id = tx_parent.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Preconfirmation at height 1 (but the block producer crashes). + universe.send_preconfirmation( + tx_parent_id, + make_preconf_success(tx_parent_id, 1, output_a), + ); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Insert a dependent tx while the preconf outputs are "live". + let input_a = unset_input_a.into_input(UtxoId::new(tx_parent_id, 0)); + let tx_child = universe.build_script_transaction(Some(vec![input_a]), None, 2); + let tx_child_id = tx_child.id(&Default::default()); + + service.shared.insert(tx_child).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_child_id]) + .await; + + // When — a block at height 2 arrives (skipping height 1). The preconf for + // height 1 was never resolved, so it must be rolled back. + block_sender.send(make_block_import(2, &[])).await.unwrap(); + + // Then — tx_child (which depended on the stale preconf output) is removed. + universe + .await_expected_tx_statuses(vec![tx_child_id], |status| { + matches!( + status, + fuel_core_types::services::transaction_status::TransactionStatus::SqueezedOut(_) + ) + }) + .await + .unwrap(); + + service.stop_and_await().await.unwrap(); +} + +/// A late preconfirmation arriving after the referenced canonical block has +/// already been imported must be silently discarded. +/// +/// Sequence: +/// 1. Node imports canonical block at height H (tx T is absent from it). +/// 2. T is inserted into the pool. +/// 3. A delayed preconfirmation for T arrives claiming height H. +/// 4. Because H <= current_canonical_height the preconfirmation is ignored: +/// T must remain in the pool (not committed out) and spent_inputs must +/// not be mutated. +#[tokio::test] +async fn late_preconf_below_canonical_height_is_ignored() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + let tx = universe.build_script_transaction(None, None, 10); + let tx_id = tx.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Import a canonical block at height 1 that does NOT include the tx. + // After this the node's canonical height is 1. + block_sender.send(make_block_import(1, &[])).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Insert T after the block is imported. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + // When — a delayed preconfirmation for T arrives at height 1 + // (already at or below the canonical tip). + universe.send_preconfirmation(tx_id, make_preconf_success_no_outputs(tx_id, 1)); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Then — the preconfirmation must be ignored. + // If it were applied, `process_preconfirmed_committed_transaction` would + // remove T from the pool. T must still be present. + let found = service.shared.find(vec![tx_id]).await.unwrap(); + assert!( + found[0].is_some(), + "tx should still be in pool — late preconfirmation must have been ignored" + ); + + service.stop_and_await().await.unwrap(); +} + +/// Regression test: a pool tx admitted via a preconfirmed contract creation must +/// be evicted when the canonical block omits the contract-creating preconfirmation. +/// +/// Sequence: +/// 1. Preconfirmation for tx P arrives carrying `Output::ContractCreated { C }`. +/// `extracted_outputs` now reports `contract_exists(C) == true`. +/// 2. Tx D with `Input::Contract(C)` is inserted into the pool. +/// It passes validation because `extracted_outputs.contract_exists(C)`. +/// D has no graph dependency on any in-pool creator of C. +/// 3. An empty canonical block is imported at height 1 (P is absent). +/// Rollback clears C from `extracted_outputs`. +/// 4. D must be squeezed out — it was only valid because of the now-stale +/// preconfirmed contract creation. +#[tokio::test] +async fn contract_dependent_tx_removed_on_preconf_rollback() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + + // Compute the contract id that P will advertise as created. + // P is never inserted into the pool — it arrives only via preconfirmation. + let contract_code = vec![1u8, 2, 3]; + let contract: fuel_core_types::fuel_tx::Contract = contract_code.into(); + let contract_id = fuel_core_types::fuel_tx::Contract::id( + &Default::default(), + &contract.root(), + &Default::default(), + ); + let p_tx_id = fuel_core_types::fuel_tx::TxId::from([0xABu8; 32]); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Preconfirmation for P arrives (with ContractCreated output). + universe.send_preconfirmation( + p_tx_id, + make_preconf_with_contract_created(p_tx_id, 1, contract_id), + ); + + // Give the worker time to register the preconfirmed contract in extracted_outputs. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Insert D which uses the preconfirmed contract as an input. + // The mock DB has no record of this contract, so admission relies solely on + // extracted_outputs.contract_exists(contract_id). + // A script tx with Input::Contract at index 0 also needs Output::Contract(0). + let contract_input = create_contract_input(p_tx_id, 0, contract_id); + let contract_output = Output::contract(0, Default::default(), Default::default()); + let tx_d = universe.build_script_transaction( + Some(vec![contract_input]), + Some(vec![contract_output]), + 5, + ); + let tx_d_id = tx_d.id(&Default::default()); + + service.shared.insert(tx_d).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_d_id]) + .await; + + let found = service.shared.find(vec![tx_d_id]).await.unwrap(); + assert!(found[0].is_some(), "tx_d should be in the pool"); + + // When — import an empty canonical block at height 1 (P was never included). + block_sender.send(make_block_import(1, &[])).await.unwrap(); + + // Then — D must be squeezed out because the contract it relied on never landed. + universe + .await_expected_tx_statuses(vec![tx_d_id], |status| { + matches!( + status, + fuel_core_types::services::transaction_status::TransactionStatus::SqueezedOut(_) + ) + }) + .await + .unwrap(); + + let found = service.shared.find(vec![tx_d_id]).await.unwrap(); + assert!( + found[0].is_none(), + "tx_d should have been removed from the pool" + ); + + service.stop_and_await().await.unwrap(); +} diff --git a/crates/services/txpool_v2/src/tests/universe.rs b/crates/services/txpool_v2/src/tests/universe.rs index c1a7603aed2..d3039c26073 100644 --- a/crates/services/txpool_v2/src/tests/universe.rs +++ b/crates/services/txpool_v2/src/tests/universe.rs @@ -6,6 +6,8 @@ use std::{ sync::Arc, }; +use fuel_core_types::services::transaction_status::PreConfirmationStatus; + use crate::{ GasPrice, Service, @@ -453,6 +455,12 @@ impl TestPoolUniverse { self.pool.clone().unwrap() } + /// Send a preconfirmation update directly to the pool worker. + pub fn send_preconfirmation(&self, tx_id: TxId, status: PreConfirmationStatus) { + self.mock_tx_status_manager + .send_preconfirmation(tx_id, status); + } + pub fn setup_coin(&mut self) -> (Coin, Input) { let input = self.random_predicate(AssetId::BASE, TEST_COIN_AMOUNT, None); diff --git a/crates/services/upgradable-executor/src/executor.rs b/crates/services/upgradable-executor/src/executor.rs index 4900386adce..a9340f1be62 100644 --- a/crates/services/upgradable-executor/src/executor.rs +++ b/crates/services/upgradable-executor/src/executor.rs @@ -262,8 +262,8 @@ impl Executor { ("0-45-1", 30), ("0-46-0", 31), // We are skipping 0-47-0 because it was not published. - // 0-47-3 has the same STF version as 0-47-1 - ("0-47-3", LATEST_STATE_TRANSITION_VERSION), + // 0-47-4 has the same STF version as 0-47-1 + ("0-47-4", LATEST_STATE_TRANSITION_VERSION), ]; pub fn new(