diff --git a/crates/fuel-core/redis_leader_lease_adapter_scripts/check_lease_owner.lua b/crates/fuel-core/redis_leader_lease_adapter_scripts/check_lease_owner.lua index 2097b17db52..f99ece5c259 100644 --- a/crates/fuel-core/redis_leader_lease_adapter_scripts/check_lease_owner.lua +++ b/crates/fuel-core/redis_leader_lease_adapter_scripts/check_lease_owner.lua @@ -1,6 +1,6 @@ -- Check whether this node still sees us as lease owner. -- KEYS[1]: lease key (e.g., poa:leader:lock) --- ARGV[1]: lease owner token (UUID) +-- ARGV[1]: lease owner id (UUID) if redis.call("GET", KEYS[1]) == ARGV[1] then return 1 else diff --git a/crates/fuel-core/redis_leader_lease_adapter_scripts/promote_leader.lua b/crates/fuel-core/redis_leader_lease_adapter_scripts/promote_leader.lua index 0fceb073fae..76ba0834295 100644 --- a/crates/fuel-core/redis_leader_lease_adapter_scripts/promote_leader.lua +++ b/crates/fuel-core/redis_leader_lease_adapter_scripts/promote_leader.lua @@ -1,7 +1,7 @@ -- Atomically promote follower to leader and return a new fencing token. -- KEYS[1]: leader lock key (e.g., poa:leader:lock) -- KEYS[2]: epoch token key (e.g., poa:leader:lock:epoch:token) --- ARGV[1]: lease owner token (UUID) +-- ARGV[1]: lease owner id (UUID) -- ARGV[2]: lease TTL in milliseconds -- -- Returns: new epoch token on success, error if lock is held. diff --git a/crates/fuel-core/redis_leader_lease_adapter_scripts/release_lock.lua b/crates/fuel-core/redis_leader_lease_adapter_scripts/release_lock.lua index 741b28fd802..2f96ff718f3 100644 --- a/crates/fuel-core/redis_leader_lease_adapter_scripts/release_lock.lua +++ b/crates/fuel-core/redis_leader_lease_adapter_scripts/release_lock.lua @@ -1,6 +1,6 @@ -- Release lease if we are still the owner (graceful shutdown / stepdown). -- KEYS[1]: lease key --- ARGV[1]: lease owner token (UUID) +-- ARGV[1]: lease owner id (UUID) if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else diff --git a/crates/fuel-core/redis_leader_lease_adapter_scripts/write_block.lua b/crates/fuel-core/redis_leader_lease_adapter_scripts/write_block.lua index fe70fc8f6c3..eec075a78ee 100644 --- a/crates/fuel-core/redis_leader_lease_adapter_scripts/write_block.lua +++ b/crates/fuel-core/redis_leader_lease_adapter_scripts/write_block.lua @@ -3,7 +3,7 @@ -- KEYS[2]: epoch token key (e.g., poa:leader:lock:epoch:token) -- KEYS[3]: leader lock key (e.g., poa:leader:lock) -- ARGV[1]: my_epoch (max token observed during promotion quorum) --- ARGV[2]: lease owner token (UUID) +-- ARGV[2]: lease owner id (UUID) -- ARGV[3]: block_height -- ARGV[4]: block_data -- ARGV[5]: lease_ttl_ms diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index 71438b52558..bedc05f6095 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -118,23 +118,49 @@ impl Clone for RedisNode { } } +/// Uses a set of Redis servers to take ownership of fencing tokens for each epoch and publish +/// block data. +/// +/// In steady state, a single block producer should be able to hold the lock indefinitely. If that +/// producer is shut down, it will release the token and allow another producer to take over. +/// +/// The code however is designed to be fault tolerant. If a producer fails to release the lock, +/// there is a TTL on the lock that will allow another producer to take over. +/// +/// If different producers claim locks on each Redis server, nodes can deterministically +/// identify the true block producer of any given block. This is done by iterating over each epochs +/// and finding the highest epoch with a quorum of blocks produced. pub struct RedisLeaderLeaseAdapter { + /// Redis nodes that participate in leader election and block reconciliation redis_nodes: Vec, + /// Minimum number of Redis nodes that must agree for ownership or writes to be accepted quorum: usize, - quorum_disruption_budget: u32, + /// Redis key that stores the current lease owner id with a TTL lease_key: String, + /// Redis key that stores the monotonically increasing fencing token epoch_key: String, + /// Redis stream key that stores published blocks for reconciliation block_stream_key: String, - lease_owner_token: String, - drop_release_guard: std::sync::Arc<()>, + /// Unique id for this adapter instance, written into the lease key when it acquires leadership + lease_owner_id: String, + /// Locally cached fencing token currently used for block writes. current_epoch_token: std::sync::Arc>>, + /// Lease TTL applied to the Redis lease key and renewed on successful writes. lease_ttl_millis: u64, + /// Safety margin subtracted from the nominal TTL when evaluating remaining lease validity. lease_drift_millis: u64, + /// Timeout used for individual Redis node operations. node_timeout: Duration, + /// Approximate upper bound for the Redis block stream and per-round reconciliation reads. + stream_max_len: u32, + /// Base delay before retrying lease acquisition after a failed attempt. retry_delay_millis: u64, + /// Random jitter added to the retry delay to reduce election collisions. max_retry_delay_offset_millis: u64, + /// Maximum number of lease acquisition attempts per election round. max_attempts: usize, - stream_max_len: u32, + /// Ensures only the last live clone attempts lease release during drop. + drop_release_guard: std::sync::Arc<()>, } impl Clone for RedisLeaderLeaseAdapter { @@ -142,20 +168,19 @@ impl Clone for RedisLeaderLeaseAdapter { Self { redis_nodes: self.redis_nodes.clone(), quorum: self.quorum, - quorum_disruption_budget: self.quorum_disruption_budget, lease_key: self.lease_key.clone(), epoch_key: self.epoch_key.clone(), block_stream_key: self.block_stream_key.clone(), - lease_owner_token: self.lease_owner_token.clone(), - drop_release_guard: self.drop_release_guard.clone(), + lease_owner_id: self.lease_owner_id.clone(), current_epoch_token: self.current_epoch_token.clone(), lease_ttl_millis: self.lease_ttl_millis, lease_drift_millis: self.lease_drift_millis, node_timeout: self.node_timeout, + stream_max_len: self.stream_max_len, retry_delay_millis: self.retry_delay_millis, max_retry_delay_offset_millis: self.max_retry_delay_offset_millis, max_attempts: self.max_attempts, - stream_max_len: self.stream_max_len, + drop_release_guard: self.drop_release_guard.clone(), } } } @@ -169,6 +194,9 @@ pub enum ReconciliationAdapter { Noop(NoopReconciliationAdapter), } +type BlocksByEpoch = HashMap; +type BlocksByHeight = HashMap; + impl RedisLeaderLeaseAdapter { fn calculate_quorum(redis_nodes_len: usize, quorum_disruption_budget: u32) -> usize { let majority = redis_nodes_len @@ -185,6 +213,7 @@ impl RedisLeaderLeaseAdapter { pub fn new( redis_urls: Vec, lease_key: String, + quorum_disruption_budget: u32, lease_ttl: Duration, node_timeout: Duration, retry_delay: Duration, @@ -206,14 +235,13 @@ impl RedisLeaderLeaseAdapter { "At least one redis url is required for leader lock" )); } - let quorum_disruption_budget = 0u32; let quorum = Self::calculate_quorum(redis_nodes.len(), quorum_disruption_budget); let lease_ttl_millis = u64::try_from(lease_ttl.as_millis())?; let retry_delay_millis = u64::try_from(retry_delay.as_millis())?; let max_retry_delay_offset_millis = u64::try_from(max_retry_delay_offset.as_millis())?; let max_attempts = usize::try_from(max_attempts)?.max(1); - let lease_owner_token = uuid::Uuid::new_v4().to_string(); + let lease_owner_id = uuid::Uuid::new_v4().to_string(); let epoch_key = format!("{lease_key}:epoch:token"); let block_stream_key = format!("{lease_key}:block:stream"); let lease_drift_millis = lease_ttl_millis @@ -223,33 +251,22 @@ impl RedisLeaderLeaseAdapter { Ok(Self { redis_nodes, quorum, - quorum_disruption_budget, lease_key, epoch_key, block_stream_key, - lease_owner_token, - drop_release_guard: std::sync::Arc::new(()), + lease_owner_id, current_epoch_token: std::sync::Arc::new(std::sync::Mutex::new(None)), lease_ttl_millis, lease_drift_millis, node_timeout, + stream_max_len, retry_delay_millis, max_retry_delay_offset_millis, max_attempts, - stream_max_len, + drop_release_guard: std::sync::Arc::new(()), }) } - pub fn with_quorum_disruption_budget( - mut self, - quorum_disruption_budget: u32, - ) -> Self { - self.quorum_disruption_budget = quorum_disruption_budget; - self.quorum = - Self::calculate_quorum(self.redis_nodes.len(), quorum_disruption_budget); - self - } - async fn multiplexed_connection( &self, redis_node: &RedisNode, @@ -289,7 +306,7 @@ impl RedisLeaderLeaseAdapter { self.node_timeout, redis::Script::new(CHECK_LEASE_OWNER_SCRIPT) .key(&self.lease_key) - .arg(&self.lease_owner_token) + .arg(&self.lease_owner_id) .invoke_async::(&mut connection), ) .await; @@ -319,7 +336,7 @@ impl RedisLeaderLeaseAdapter { redis::Script::new(PROMOTE_LEADER_SCRIPT) .key(&self.lease_key) .key(&self.epoch_key) - .arg(&self.lease_owner_token) + .arg(&self.lease_owner_id) .arg(self.lease_ttl_millis) .invoke_async::(&mut connection), ) @@ -349,7 +366,7 @@ impl RedisLeaderLeaseAdapter { self.node_timeout, redis::Script::new(RELEASE_LOCK_SCRIPT) .key(&self.lease_key) - .arg(&self.lease_owner_token) + .arg(&self.lease_owner_id) .invoke_async::(&mut connection), ) .await; @@ -517,7 +534,7 @@ impl RedisLeaderLeaseAdapter { async fn release_lease_on_client( redis_client: redis::Client, lease_key: String, - lease_owner_token: String, + lease_owner_id: String, node_timeout: Duration, ) { let connection = timeout( @@ -534,7 +551,7 @@ impl RedisLeaderLeaseAdapter { node_timeout, redis::Script::new(RELEASE_LOCK_SCRIPT) .key(lease_key) - .arg(lease_owner_token) + .arg(lease_owner_id) .invoke_async::(&mut connection), ) .await; @@ -543,7 +560,7 @@ impl RedisLeaderLeaseAdapter { async fn release_lease_on_clients( redis_clients: Vec, lease_key: String, - lease_owner_token: String, + lease_owner_id: String, node_timeout: Duration, ) { let _ = @@ -551,7 +568,7 @@ impl RedisLeaderLeaseAdapter { Self::release_lease_on_client( redis_client, lease_key.clone(), - lease_owner_token.clone(), + lease_owner_id.clone(), node_timeout, ) })) @@ -561,7 +578,7 @@ impl RedisLeaderLeaseAdapter { fn release_lease_on_clients_sync( redis_clients: Vec, lease_key: String, - lease_owner_token: String, + lease_owner_id: String, ) { redis_clients.into_iter().for_each(|redis_client| { let Ok(mut connection) = redis_client.get_connection() else { @@ -569,7 +586,7 @@ impl RedisLeaderLeaseAdapter { }; let _ = redis::Script::new(RELEASE_LOCK_SCRIPT) .key(&lease_key) - .arg(&lease_owner_token) + .arg(&lease_owner_id) .invoke::(&mut connection); }); } @@ -712,17 +729,20 @@ impl RedisLeaderLeaseAdapter { if !self.should_reconcile_from_stream(next_height).await? { return Ok(Vec::new()); } - let mut reconciled = Vec::new(); - let max_reconcile_blocks_per_round = - usize::try_from(self.stream_max_len).unwrap_or(usize::MAX); - let next_height_u32 = u32::from(next_height); + let blocks_by_node = self.read_backlog(next_height).await?; + self.update_backlog_metrics(&blocks_by_node, next_height); + self.reconcile_contiguous_heights(blocks_by_node, next_height) + } + + async fn read_backlog( + &self, + next_height: BlockHeight, + ) -> anyhow::Result> { + let max_blocks = usize::try_from(self.stream_max_len).unwrap_or(usize::MAX); + let next_height = u32::from(next_height); let read_results = futures::future::join_all(self.redis_nodes.iter().map(|redis_node| { - self.read_stream_entries_on_node( - redis_node, - next_height_u32, - max_reconcile_blocks_per_round, - ) + self.read_stream_entries_on_node(redis_node, next_height, max_blocks) })) .await; @@ -747,138 +767,62 @@ impl RedisLeaderLeaseAdapter { )); } - let blocks_by_node = successful_reads + Ok(successful_reads .into_iter() - .map(|entries| { - entries.into_iter().fold( - HashMap::>::new(), - |mut blocks_by_height, (height, epoch, block)| { - blocks_by_height - .entry(height) - .or_default() - .insert(epoch, block); - blocks_by_height - }, - ) - }) - .collect::>(); + .map(Self::group_entries_by_height) + .collect()) + } + + fn group_entries_by_height(entries: Vec<(u32, u64, SealedBlock)>) -> BlocksByHeight { + entries.into_iter().fold( + HashMap::new(), + |mut blocks_by_height, (height, epoch, block)| { + blocks_by_height + .entry(height) + .or_default() + .insert(epoch, block); + blocks_by_height + }, + ) + } - // Compute stream trim headroom: min stream height - local committed height + fn update_backlog_metrics( + &self, + blocks_by_node: &[BlocksByHeight], + next_height: BlockHeight, + ) { let min_stream_height = blocks_by_node .iter() .flat_map(|blocks_by_height| blocks_by_height.keys().copied()) .min(); - if let Some(min_h) = min_stream_height { + if let Some(min_height) = min_stream_height { let local_committed = i64::from(u32::from(next_height).saturating_sub(1)); - let headroom = i64::from(min_h).saturating_sub(local_committed); + let headroom = i64::from(min_height).saturating_sub(local_committed); poa_metrics().stream_trim_headroom.set(headroom); } + } + fn reconcile_contiguous_heights( + &self, + blocks_by_node: Vec, + next_height: BlockHeight, + ) -> anyhow::Result> { + let mut reconciled = Vec::new(); let mut current_height = u32::from(next_height); + let max_blocks = usize::try_from(self.stream_max_len).unwrap_or(usize::MAX); - for _ in 0..max_reconcile_blocks_per_round { - let nodes_with_height = blocks_by_node - .iter() - .filter(|blocks_by_height| blocks_by_height.contains_key(¤t_height)) - .count(); - - tracing::debug!( - "unreconciled_blocks: height={current_height} nodes_with_height={nodes_with_height}/{}", - blocks_by_node.len() - ); - - if nodes_with_height == 0 { - if reconciled.is_empty() { + for _ in 0..max_blocks { + match self.reconcile_height(&blocks_by_node, current_height) { + Ok(Some(block)) => reconciled.push(block), + Ok(None) if reconciled.is_empty() => { return Err(anyhow!( "Backlog unresolved at height {current_height}: \ stream indicates backlog but no entries found at next height" )); } - break; - } - - let votes = blocks_by_node - .iter() - .filter_map(|blocks_by_height| blocks_by_height.get(¤t_height)) - .flat_map(|blocks_by_epoch| blocks_by_epoch.iter()) - .fold( - HashMap::<(u64, BlockId), (usize, SealedBlock)>::new(), - |mut votes, (epoch, block)| { - let vote_key = (*epoch, block.entity.id()); - match votes.get_mut(&vote_key) { - Some((count, _)) => { - *count = count.saturating_add(1); - } - None => { - votes.insert(vote_key, (1, block.clone())); - } - } - votes - }, - ); - - let winner = votes - .into_iter() - .max_by_key(|((epoch, _), _)| *epoch) - .map(|(_, (count, block))| (count, block)); - - if let Some((count, block)) = winner { - if self.quorum_reached(count) { - // Block already has quorum — reconcile it directly - reconciled.push(block); - } else { - // Sub-quorum block: repropose to all nodes to reach quorum. - // This repairs orphaned partial writes from failed leaders. - // HEIGHT_EXISTS on nodes that already have the block returns - // Ok(false), and nodes missing it accept the write. - tracing::info!( - "Repairing sub-quorum block at height {current_height} \ - (found on {count}/{} nodes)", - blocks_by_node.len() - ); - match self.repair_sub_quorum_block(&block, count) { - Ok(true) => { - tracing::info!( - "Repair succeeded — block at height {current_height} \ - now has quorum" - ); - reconciled.push(block); - } - Ok(false) => { - tracing::warn!( - "Repair failed to reach quorum at height \ - {current_height} — will retry next round" - ); - if reconciled.is_empty() { - return Err(anyhow!( - "Backlog unresolved at height {current_height}: \ - repair failed to reach quorum" - )); - } - break; - } - Err(e) => { - tracing::warn!( - "Repair error at height {current_height}: {e}" - ); - if reconciled.is_empty() { - return Err(anyhow!( - "Backlog unresolved at height {current_height}: \ - repair error: {e}" - )); - } - break; - } - } - } - } else { - if reconciled.is_empty() { - return Err(anyhow!( - "Backlog unresolved at height {current_height}: \ - no winning block candidate" - )); - } - break; + Ok(None) => break, + Err(err) if reconciled.is_empty() => return Err(err), + Err(_) => break, } let Some(next) = current_height.checked_add(1) else { @@ -890,6 +834,70 @@ impl RedisLeaderLeaseAdapter { Ok(reconciled) } + fn reconcile_height( + &self, + blocks_by_node: &[BlocksByHeight], + current_height: u32, + ) -> anyhow::Result> { + let nodes_with_height = blocks_by_node + .iter() + .filter(|blocks_by_height| blocks_by_height.contains_key(¤t_height)) + .count(); + + tracing::debug!( + "unreconciled_blocks: height={current_height} nodes_with_height={nodes_with_height}/{}", + blocks_by_node.len() + ); + + if nodes_with_height == 0 { + return Ok(None); + } + + let Some((count, block)) = + self.select_height_winner(blocks_by_node, current_height) + else { + return Err(anyhow!( + "Backlog unresolved at height {current_height}: \ + no winning block candidate" + )); + }; + + if self.quorum_reached(count) { + return Ok(Some(block)); + } + + self.repair_sub_quorum_block(current_height, block, count, blocks_by_node.len()) + } + + fn select_height_winner( + &self, + blocks_by_node: &[BlocksByHeight], + current_height: u32, + ) -> Option<(usize, SealedBlock)> { + blocks_by_node + .iter() + .filter_map(|blocks_by_height| blocks_by_height.get(¤t_height)) + .flat_map(|blocks_by_epoch| blocks_by_epoch.iter()) + .fold( + HashMap::<(u64, BlockId), (usize, SealedBlock)>::new(), + |mut votes, (epoch, block)| { + let vote_key = (*epoch, block.entity.id()); + match votes.get_mut(&vote_key) { + Some((count, _)) => { + *count = count.saturating_add(1); + } + None => { + votes.insert(vote_key, (1, block.clone())); + } + } + votes + }, + ) + .into_iter() + .max_by_key(|((epoch, _), _)| *epoch) + .map(|(_, (count, block))| (count, block)) + } + async fn can_produce_block(&self) -> anyhow::Result { tracing::debug!("Checking Redis leader lock"); if self.has_lease_owner_quorum().await? { @@ -947,7 +955,7 @@ impl RedisLeaderLeaseAdapter { .key(&self.epoch_key) .key(&self.lease_key) .arg(epoch) - .arg(&self.lease_owner_token) + .arg(&self.lease_owner_id) .arg(block_height) .arg(block_data) .arg(self.lease_ttl_millis) @@ -998,9 +1006,15 @@ impl RedisLeaderLeaseAdapter { /// - The total (pre_existing + newly written) must reach quorum fn repair_sub_quorum_block( &self, - block: &SealedBlock, + current_height: u32, + block: SealedBlock, pre_existing_count: usize, - ) -> anyhow::Result { + node_count: usize, + ) -> anyhow::Result> { + tracing::info!( + "Repairing sub-quorum block at height {current_height} \ + (found on {pre_existing_count}/{node_count} nodes)" + ); let epoch = match *self .current_epoch_token .lock() @@ -1013,7 +1027,7 @@ impl RedisLeaderLeaseAdapter { )); } }; - let block_data = postcard::to_allocvec(block)?; + let block_data = postcard::to_allocvec(&block)?; // Start from the pre-existing count (nodes already confirmed to // have this specific block during reconciliation). Only count // newly Written nodes — HeightExists means the node has *some* @@ -1021,7 +1035,7 @@ impl RedisLeaderLeaseAdapter { // a competing leader's partial write. let mut total_with_block = pre_existing_count; for redis_node in &self.redis_nodes { - match self.publish_block_on_node(redis_node, epoch, block, &block_data) { + match self.publish_block_on_node(redis_node, epoch, &block, &block_data) { Ok(WriteBlockResult::Written) => { total_with_block = total_with_block.saturating_add(1); } @@ -1044,10 +1058,22 @@ impl RedisLeaderLeaseAdapter { let reached_quorum = self.quorum_reached(total_with_block); if reached_quorum { poa_metrics().repair_success_total.inc(); + tracing::info!( + "Repair succeeded — block at height {current_height} \ + now has quorum" + ); + Ok(Some(block)) } else { poa_metrics().repair_failure_total.inc(); + tracing::warn!( + "Repair failed to reach quorum at height \ + {current_height} — will retry next round" + ); + Err(anyhow!( + "Backlog unresolved at height {current_height}: \ + repair failed to reach quorum" + )) } - Ok(reached_quorum) } } @@ -1166,7 +1192,7 @@ impl Drop for RedisLeaderLeaseAdapter { Self::release_lease_on_clients( redis_clients, self.lease_key.clone(), - self.lease_owner_token.clone(), + self.lease_owner_id.clone(), self.node_timeout, ), ); @@ -1181,7 +1207,7 @@ impl Drop for RedisLeaderLeaseAdapter { Self::release_lease_on_clients_sync( redis_clients, self.lease_key.clone(), - self.lease_owner_token.clone(), + self.lease_owner_id.clone(), ); } } @@ -1407,6 +1433,7 @@ mod tests { let adapter = RedisLeaderLeaseAdapter::new( vec![redis.redis_url()], lease_key, + 0, Duration::from_secs(2), Duration::from_millis(100), Duration::from_millis(50), @@ -1461,6 +1488,7 @@ mod tests { let adapter = RedisLeaderLeaseAdapter::new( vec![redis.redis_url()], lease_key, + 0, Duration::from_secs(2), Duration::from_millis(100), Duration::from_millis(50), @@ -1703,6 +1731,7 @@ mod tests { redis_c.redis_url(), ], lease_key, + 0, Duration::from_secs(2), Duration::from_millis(100), Duration::from_millis(50), @@ -1825,7 +1854,7 @@ mod tests { "Adapter should acquire lease" ); let adapter_clone = adapter.clone(); - let owner_token = adapter.lease_owner_token.clone(); + let owner_id = adapter.lease_owner_id.clone(); // when drop(adapter_clone); @@ -1836,7 +1865,7 @@ mod tests { .iter() .filter(|redis_url| { read_lease_owner(redis_url, &lease_key).as_deref() - == Some(owner_token.as_str()) + == Some(owner_id.as_str()) }) .count(); assert!( @@ -1860,6 +1889,7 @@ mod tests { let adapter = RedisLeaderLeaseAdapter::new( redis_urls.clone(), lease_key.clone(), + 0, Duration::from_millis(500), Duration::from_millis(100), Duration::from_millis(50), @@ -1878,7 +1908,7 @@ mod tests { .iter() .filter(|redis_url| { read_lease_owner(redis_url, &lease_key).as_deref() - == Some(adapter.lease_owner_token.as_str()) + == Some(adapter.lease_owner_id.as_str()) }) .count(); @@ -1908,6 +1938,7 @@ mod tests { let first_adapter = RedisLeaderLeaseAdapter::new( redis_urls.clone(), lease_key.clone(), + 0, Duration::from_millis(300), Duration::from_millis(100), Duration::from_millis(50), @@ -1919,6 +1950,7 @@ mod tests { let second_adapter = RedisLeaderLeaseAdapter::new( redis_urls.clone(), lease_key.clone(), + 0, Duration::from_millis(300), Duration::from_millis(100), Duration::from_millis(50), @@ -1943,7 +1975,7 @@ mod tests { .iter() .filter(|redis_url| { read_lease_owner(redis_url, &lease_key).as_deref() - == Some(second_adapter.lease_owner_token.as_str()) + == Some(second_adapter.lease_owner_id.as_str()) }) .count(); @@ -2078,6 +2110,7 @@ mod tests { let adapter = RedisLeaderLeaseAdapter::new( redis_urls.clone(), lease_key.clone(), + 0, Duration::from_millis(700), Duration::from_millis(100), Duration::from_millis(50), @@ -2103,7 +2136,7 @@ mod tests { .iter() .filter(|redis_url| { read_lease_owner(redis_url, &lease_key).as_deref() - == Some(adapter.lease_owner_token.as_str()) + == Some(adapter.lease_owner_id.as_str()) }) .count(); @@ -2181,6 +2214,7 @@ mod tests { let adapter = RedisLeaderLeaseAdapter::new( vec![redis.redis_url()], lease_key, + 0, Duration::from_secs(2), Duration::from_millis(100), Duration::from_millis(50), @@ -2221,6 +2255,7 @@ mod tests { let adapter = RedisLeaderLeaseAdapter::new( vec![redis.redis_url()], lease_key, + 0, Duration::from_secs(2), Duration::from_millis(100), Duration::from_millis(50), @@ -2282,6 +2317,7 @@ mod tests { let adapter = RedisLeaderLeaseAdapter::new( vec![redis.redis_url()], lease_key, + 0, Duration::from_secs(2), Duration::from_millis(100), Duration::from_millis(50), @@ -2549,6 +2585,7 @@ mod tests { RedisLeaderLeaseAdapter::new( redis_urls, lease_key, + 0, Duration::from_secs(2), Duration::from_millis(100), Duration::from_millis(50), @@ -2831,7 +2868,7 @@ mod tests { let mut conn = client.get_connection().expect("conn"); let _: () = redis::cmd("SET") .arg(&lease_key) - .arg(&candidate_b.lease_owner_token) + .arg(&candidate_b.lease_owner_id) .arg("PX") .arg(5000u64) .query(&mut conn) @@ -2850,11 +2887,11 @@ mod tests { // Verify A owns nodes A,B but NOT node C let owns_a = read_lease_owner(&redis_a.redis_url(), &lease_key) - == Some(adapter_a.lease_owner_token.clone()); + == Some(adapter_a.lease_owner_id.clone()); let owns_b = read_lease_owner(&redis_b.redis_url(), &lease_key) - == Some(adapter_a.lease_owner_token.clone()); + == Some(adapter_a.lease_owner_id.clone()); let owns_c = read_lease_owner(&redis_c.redis_url(), &lease_key) - == Some(adapter_a.lease_owner_token.clone()); + == Some(adapter_a.lease_owner_id.clone()); assert!(owns_a && owns_b, "A should own nodes A and B"); assert!(!owns_c, "A should NOT own node C (held by B)"); @@ -2874,7 +2911,7 @@ mod tests { // then — A should now own node C too let owns_c_after = read_lease_owner(&redis_c.redis_url(), &lease_key) - == Some(adapter_a.lease_owner_token.clone()); + == Some(adapter_a.lease_owner_id.clone()); assert!(owns_c_after, "Lock expansion should have acquired node C"); // Verify writes now go to all 3 nodes @@ -2919,7 +2956,7 @@ mod tests { // Simulate B's promote_leader.lua on node C: SET NX + INCR let _: () = redis::cmd("SET") .arg(&lease_key) - .arg(&candidate_b.lease_owner_token) + .arg(&candidate_b.lease_owner_id) .arg("PX") .arg(5000u64) .query(&mut conn) diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 3880eb0f441..a22fceea5fb 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -239,6 +239,7 @@ pub fn init_sub_services( RedisLeaderLeaseAdapter::new( leader_lock.redis_urls.clone(), leader_lock.lease_key.clone(), + leader_lock.quorum_disruption_budget, leader_lock.lease_ttl, leader_lock.node_timeout, leader_lock.retry_delay, @@ -246,10 +247,6 @@ pub fn init_sub_services( leader_lock.max_attempts, leader_lock.stream_max_len, ) - .map(|adapter| { - adapter - .with_quorum_disruption_budget(leader_lock.quorum_disruption_budget) - }) }) .transpose()?; diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 210408c1b12..edee7504fb3 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -492,12 +492,21 @@ where let (last_height, last_timestamp, last_block_created) = Self::extract_block_info(self.clock.now(), block_header); if last_height > self.last_height { - self.last_height = last_height; - self.last_timestamp = last_timestamp; - self.last_block_created = last_block_created; + self.record_block_progress(last_height, last_timestamp, last_block_created); } } + fn record_block_progress( + &mut self, + height: BlockHeight, + timestamp: Tai64, + created_at: Instant, + ) { + self.last_height = height; + self.last_timestamp = timestamp; + self.last_block_created = created_at; + } + async fn ensure_synced( &mut self, watcher: &mut StateWatcher, @@ -612,6 +621,7 @@ where { LeaderState::ReconciledFollower => { sleep_until(deadline).await; + self.last_block_created = deadline; Ok(TaskNextAction::Continue) } LeaderState::ReconciledLeader => { @@ -630,8 +640,11 @@ where match self.block_importer.execute_and_commit(block).await { Ok(()) => { - self.last_height = block_height; - self.last_timestamp = block_time; + self.record_block_progress( + block_height, + block_time, + deadline, + ); } Err(err) => { // Re-sync from the DB and skip — the block