diff --git a/crates/cardano/src/ashard/commit.rs b/crates/cardano/src/ashard/commit.rs new file mode 100644 index 000000000..25def6fd8 --- /dev/null +++ b/crates/cardano/src/ashard/commit.rs @@ -0,0 +1,83 @@ +//! Commit logic for `AShardWorkUnit`. +//! +//! Adds the AShard-specific commit method to `BoundaryWork`. Reuses the +//! shared `stream_and_apply_namespace` helper that lives in `ewrap/commit.rs`. + +use dolos_core::{ + ArchiveStore, ArchiveWriter, ChainError, ChainPoint, Domain, LogKey, StateStore, StateWriter, + TemporalKey, +}; +use tracing::{debug, instrument, warn}; + +use crate::{ + ewrap::BoundaryWork, rupd::credential_to_key, AccountState, EpochState, FixedNamespace, + PendingRewardState, +}; + +impl BoundaryWork { + /// Commit a single account shard (ashard): apply per-account deltas + /// (rewards + drops) and the `EpochEndAccumulate` delta against + /// `EpochState`, flush archive logs (`{Leader,Member}RewardLog`), delete + /// applied pending rewards. + #[instrument(skip(self, state, archive))] + pub fn commit_ashard( + &mut self, + state: &D::State, + archive: &D::Archive, + range: std::ops::Range, + ) -> Result<(), ChainError> { + debug!("committing ashard changes"); + + let writer = state.start_writer()?; + let archive_writer = archive.start_writer()?; + + // Stream accounts in this shard's range only. + self.stream_and_apply_namespace::(state, &writer, Some(range))?; + + // EpochState gets the EpochEndAccumulate delta (single entity). + self.stream_and_apply_namespace::(state, &writer, None)?; + + // Delete applied pending rewards. + debug!( + count = self.applied_reward_credentials.len(), + "deleting applied pending rewards" + ); + for credential in self.applied_reward_credentials.drain(..) { + let key = credential_to_key(&credential); + writer.delete_entity(PendingRewardState::NS, &key)?; + } + + // Any unspendable rewards left in the map after flush (i.e. those not + // in drain_unspendable — shouldn't happen today but kept for safety). + if !self.rewards.is_empty() { + warn!( + remaining = self.rewards.len(), + "draining remaining pending rewards (shard)" + ); + for (credential, _) in self.rewards.iter_pending() { + let key = credential_to_key(credential); + writer.delete_entity(PendingRewardState::NS, &key)?; + } + } + + // Archive logs — share the epoch-start temporal key across shards. + let start_of_epoch = self.chain_summary.epoch_start(self.ending_state().number); + let temporal_key = TemporalKey::from(&ChainPoint::Slot(start_of_epoch)); + + debug!(log_count = self.logs.len(), "writing shard archive logs"); + for (entity_key, log) in self.logs.drain(..) { + let log_key = LogKey::from((temporal_key.clone(), entity_key)); + archive_writer.write_log_typed(&log_key, &log)?; + } + + if !self.deltas.entities.is_empty() { + warn!(quantity = %self.deltas.entities.len(), "uncommitted shard deltas"); + } + + writer.commit()?; + archive_writer.commit()?; + + debug!("ashard commit complete"); + Ok(()) + } +} diff --git a/crates/cardano/src/ashard/loading.rs b/crates/cardano/src/ashard/loading.rs new file mode 100644 index 000000000..c13f2f789 --- /dev/null +++ b/crates/cardano/src/ashard/loading.rs @@ -0,0 +1,137 @@ +//! Load + compute helpers for `AShardWorkUnit`. +//! +//! Adds AShard-specific methods to `BoundaryWork` (defined in `ewrap`). +//! The shared boundary helpers (`new_empty`, `load_pool_data`, +//! `load_drep_data`) live in `ewrap/loading.rs`; this file builds on top. + +use std::{collections::HashMap, ops::Range, sync::Arc}; + +use dolos_core::{ChainError, Domain, EntityKey, Genesis, StateStore}; +use pallas::ledger::primitives::StakeCredential; + +use crate::{ + ewrap::{BoundaryVisitor as _, BoundaryWork}, + rewards::{Reward, RewardMap}, + AccountState, FixedNamespace as _, PendingRewardState, +}; + +impl BoundaryWork { + /// Range-load pending rewards from state store (persisted by RUPD) into + /// `self.rewards`. `range = Some(r)` restricts iteration to a shard's + /// key range; `None` loads everything (kept for completeness — currently + /// unused since the only caller is `load_ashard`, which always passes a + /// shard range). + fn load_pending_rewards_range( + &mut self, + state: &D::State, + range: Option>, + ) -> Result<(), ChainError> { + let pending_iter = state + .iter_entities_typed::(PendingRewardState::NS, range)?; + + let mut pending: HashMap = HashMap::new(); + + for record in pending_iter { + let (_, pending_state) = record?; + let credential = pending_state.credential.clone(); + let reward = Reward::from_pending_state(&pending_state); + pending.insert(credential, reward); + } + + let pending_total: u64 = pending.values().map(|r| r.total_value()).sum(); + let spendable_count = pending.values().filter(|r| r.is_spendable()).count(); + let unspendable_count = pending.len() - spendable_count; + + tracing::debug!( + pending_count = pending.len(), + %pending_total, + %spendable_count, + %unspendable_count, + "loaded pending rewards from state" + ); + + let incentives = self.rewards.incentives().clone(); + self.rewards = RewardMap::from_pending(pending, incentives); + + Ok(()) + } + + /// Load + compute for an `AShard` phase: + /// * reload the small classifications that drops.visit_account needs + /// (retiring_pools, retiring_dreps, reregistrating_dreps), + /// * range-load pending rewards for this shard's key range, + /// * iterate accounts in range, applying rewards+drops visitors, and + /// * emit an `EpochEndAccumulate` delta carrying the shard's reward + /// contribution. + pub fn load_ashard( + state: &D::State, + genesis: Arc, + shard_index: u32, + total_shards: u32, + range: Range, + ) -> Result { + let mut boundary = Self::new_empty::(state, genesis)?; + + // drops.visit_account needs retiring_pools + retiring_dreps + + // reregistrating_dreps. These sets are small (handful per epoch) so + // re-classifying them per shard is cheap. + boundary.load_pool_data::(state)?; + boundary.load_drep_data::(state)?; + + boundary.load_pending_rewards_range::(state, Some(range.clone()))?; + + boundary.compute_shard_deltas::(state, range, shard_index, total_shards)?; + + Ok(boundary) + } + + fn compute_shard_deltas( + &mut self, + state: &D::State, + range: Range, + shard_index: u32, + total_shards: u32, + ) -> Result<(), ChainError> { + let mut visitor_rewards = super::rewards::BoundaryVisitor::default(); + let mut visitor_drops = crate::ewrap::drops::BoundaryVisitor::default(); + + let accounts = + state.iter_entities_typed::(AccountState::NS, Some(range))?; + + for record in accounts { + let (account_id, account) = record?; + // HACK: rewards must apply before drops. Rewards update the live + // value before the snapshot; drops schedule refunds for after the + // snapshot. If reordered, the rewards would be overwritten by the + // refund schedule. With this order, the refund clones the live + // values with rewards already applied. + // TODO: move retires to ESTART (after the snapshot has been taken) + // and drop this ordering hack. + visitor_rewards.visit_account(self, &account_id, &account)?; + visitor_drops.visit_account(self, &account_id, &account)?; + } + + visitor_rewards.flush(self)?; + visitor_drops.flush(self)?; + + // Snapshot the reward-map counters for this shard and emit the + // accumulator delta. The RewardMap's applied_* counters reflect only + // this shard's contribution (the map was created fresh for this shard + // with just this shard's pending rewards). + self.shard_applied_effective = self.rewards.applied_effective(); + self.shard_applied_unspendable_to_treasury = + self.rewards.applied_unspendable_to_treasury(); + self.shard_applied_unspendable_to_reserves = + self.rewards.applied_unspendable_to_reserves(); + + self.add_delta(crate::EpochEndAccumulate::new( + self.shard_applied_effective, + self.shard_applied_unspendable_to_treasury, + self.shard_applied_unspendable_to_reserves, + shard_index, + total_shards, + )); + + Ok(()) + } +} diff --git a/crates/cardano/src/ashard/mod.rs b/crates/cardano/src/ashard/mod.rs new file mode 100644 index 000000000..dd265c580 --- /dev/null +++ b/crates/cardano/src/ashard/mod.rs @@ -0,0 +1,14 @@ +//! AShard work unit — per-account leg of the epoch-boundary pipeline. +//! +//! Builds on the shared `BoundaryWork` / `BoundaryVisitor` infrastructure +//! defined in `crate::ewrap`. The drops visitor (used by both phases) also +//! lives in `ewrap`; this module owns only the AShard-specific work +//! unit, the rewards visitor, and the key-range partitioning helpers. + +pub mod commit; +pub mod loading; +pub mod rewards; +pub mod shard; +pub mod work_unit; + +pub use work_unit::AShardWorkUnit; diff --git a/crates/cardano/src/ewrap/rewards.rs b/crates/cardano/src/ashard/rewards.rs similarity index 90% rename from crates/cardano/src/ewrap/rewards.rs rename to crates/cardano/src/ashard/rewards.rs index 1f2061847..01ec2fc52 100644 --- a/crates/cardano/src/ewrap/rewards.rs +++ b/crates/cardano/src/ashard/rewards.rs @@ -3,8 +3,8 @@ use dolos_core::{ChainError, EntityKey}; use tracing::debug; use crate::{ - ewrap::AppliedReward, AccountState, AssignRewards, CardanoDelta, CardanoEntity, - LeaderRewardLog, MemberRewardLog, + ewrap::{AccountId, AppliedReward, BoundaryWork}, + AccountState, AssignRewards, CardanoDelta, CardanoEntity, LeaderRewardLog, MemberRewardLog, }; #[derive(Default)] @@ -23,11 +23,11 @@ impl BoundaryVisitor { } } -impl super::BoundaryVisitor for BoundaryVisitor { +impl crate::ewrap::BoundaryVisitor for BoundaryVisitor { fn visit_account( &mut self, - ctx: &mut super::BoundaryWork, - id: &super::AccountId, + ctx: &mut BoundaryWork, + id: &AccountId, account: &AccountState, ) -> Result<(), ChainError> { let Some(reward) = ctx.rewards.take_for_apply(&account.credential) else { @@ -79,6 +79,9 @@ impl super::BoundaryVisitor for BoundaryVisitor { self.change(AssignRewards::new(id.clone(), reward.total_value())); for (pool, value, as_leader) in reward.into_vec() { + // Per-shard bookkeeping: each shard's applied_rewards is bounded + // by the shard's account count, so holding it for the duration of + // the shard is fine (the memory bound is per-shard, not epoch). ctx.applied_rewards.push(AppliedReward { credential: account.credential.clone(), pool, @@ -108,7 +111,7 @@ impl super::BoundaryVisitor for BoundaryVisitor { Ok(()) } - fn flush(&mut self, ctx: &mut super::BoundaryWork) -> Result<(), ChainError> { + fn flush(&mut self, ctx: &mut BoundaryWork) -> Result<(), ChainError> { let mark_protocol = ctx .ending_state() .pparams diff --git a/crates/cardano/src/ashard/shard.rs b/crates/cardano/src/ashard/shard.rs new file mode 100644 index 000000000..9eb175953 --- /dev/null +++ b/crates/cardano/src/ashard/shard.rs @@ -0,0 +1,105 @@ +//! Shard range helper for account-shard key-range partitioning. +//! +//! Shards partition per-account boundary work by first-byte prefix of the +//! `EntityKey`. `total_shards` must divide 256 so that each shard covers an +//! equal whole number of prefix buckets (e.g. at `total_shards = 16` each +//! shard owns 16 consecutive prefix values). +//! +//! The state-store iterator takes a half-open `Range`, so shard +//! `i` gets: +//! +//! - `start = [i*step, 0, 0, ..., 0]` +//! - `end = [(i+1)*step, 0, 0, ..., 0]` for `i < total - 1` +//! - `end = [0xFF; KEY_SIZE]` for the final shard, matching +//! `EntityKey::full_range`'s convention. + +use std::ops::Range; + +use dolos_core::{EntityKey, KEY_SIZE}; + +pub const PREFIX_SPACE: u32 = 256; + +/// Return `Ok(())` if `total_shards` is a valid sharding factor (>= 1 and +/// divides 256). +pub fn validate_total_shards(total_shards: u32) -> Result<(), String> { + if total_shards == 0 { + return Err("account_shards must be >= 1".into()); + } + if !PREFIX_SPACE.is_multiple_of(total_shards) { + return Err(format!( + "account_shards ({total_shards}) must divide {PREFIX_SPACE}" + )); + } + Ok(()) +} + +/// Compute the key range for `shard_index` out of `total_shards`. +pub fn shard_key_range(shard_index: u32, total_shards: u32) -> Range { + debug_assert!(validate_total_shards(total_shards).is_ok()); + debug_assert!(shard_index < total_shards); + + let step = PREFIX_SPACE / total_shards; + let first_byte = (shard_index * step) as u8; + + let mut start = [0u8; KEY_SIZE]; + start[0] = first_byte; + + let end = if shard_index + 1 == total_shards { + [0xFFu8; KEY_SIZE] + } else { + let next_first_byte = ((shard_index + 1) * step) as u8; + let mut end = [0u8; KEY_SIZE]; + end[0] = next_first_byte; + end + }; + + Range { + start: EntityKey::from(&start), + end: EntityKey::from(&end), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn shards_cover_full_range_at_16() { + let ranges: Vec<_> = (0..16).map(|i| shard_key_range(i, 16)).collect(); + + assert_eq!(ranges[0].start.as_ref()[0], 0x00); + assert_eq!(ranges[0].end.as_ref()[0], 0x10); + assert_eq!(ranges[1].start.as_ref()[0], 0x10); + assert_eq!(ranges[15].start.as_ref()[0], 0xF0); + assert_eq!(ranges[15].end.as_ref(), &[0xFFu8; KEY_SIZE]); + + // Adjacent ranges meet: end_i == start_{i+1} for i < 15 + for i in 0..15 { + assert_eq!( + ranges[i].end.as_ref()[0], + ranges[i + 1].start.as_ref()[0], + "shards {i} and {} must meet", + i + 1 + ); + } + } + + #[test] + fn shards_cover_full_range_at_1() { + let range = shard_key_range(0, 1); + assert_eq!(range.start.as_ref()[0], 0x00); + assert_eq!(range.end.as_ref(), &[0xFFu8; KEY_SIZE]); + } + + #[test] + fn validates_total_shards() { + assert!(validate_total_shards(1).is_ok()); + assert!(validate_total_shards(2).is_ok()); + assert!(validate_total_shards(16).is_ok()); + assert!(validate_total_shards(256).is_ok()); + + assert!(validate_total_shards(0).is_err()); + assert!(validate_total_shards(3).is_err()); + assert!(validate_total_shards(100).is_err()); + } +} diff --git a/crates/cardano/src/ashard/work_unit.rs b/crates/cardano/src/ashard/work_unit.rs new file mode 100644 index 000000000..112d45896 --- /dev/null +++ b/crates/cardano/src/ashard/work_unit.rs @@ -0,0 +1,134 @@ +//! AShard work unit — per-account leg of the epoch-boundary pipeline. +//! +//! Emitted `total_shards` times in sequence. Each shard covers a first-byte +//! prefix range of the account key space, range-loads pending rewards, +//! iterates accounts in range, applies rewards + drops visitors, and emits +//! `EpochEndAccumulate` to populate the reward accumulators on +//! `EpochState.end` (the slot is opened by `EpochTransition` during ESTART). +//! +//! Followed by the `EwrapWorkUnit` global phase which closes the boundary. + +use std::sync::Arc; + +use dolos_core::{config::CardanoConfig, BlockSlot, Domain, DomainError, Genesis, WorkUnit}; +use tracing::{debug, info}; + +use crate::{ewrap::BoundaryWork, load_epoch, CardanoLogic}; + +use super::shard::shard_key_range; + +pub struct AShardWorkUnit { + slot: BlockSlot, + config: CardanoConfig, + genesis: Arc, + shard_index: u32, + + boundary: Option, +} + +impl AShardWorkUnit { + pub fn new( + slot: BlockSlot, + config: CardanoConfig, + genesis: Arc, + shard_index: u32, + ) -> Self { + Self { + slot, + config, + genesis, + shard_index, + boundary: None, + } + } + + pub fn shard_index(&self) -> u32 { + self.shard_index + } + + pub fn boundary(&self) -> Option<&BoundaryWork> { + self.boundary.as_ref() + } +} + +impl WorkUnit for AShardWorkUnit +where + D: Domain, +{ + fn name(&self) -> &'static str { + "ashard" + } + + fn load(&mut self, domain: &D) -> Result<(), DomainError> { + // If a boundary is in flight, the persisted `ashard_progress.total` + // is authoritative — guards against a config change between shards + // (e.g. across a crash and restart) breaking the in-flight pipeline. + // Falls back to current config for a fresh boundary. + let total_shards = match load_epoch::(domain.state()) { + Ok(epoch) => epoch + .ashard_progress + .as_ref() + .map(|p| p.total) + .unwrap_or_else(|| self.config.account_shards()), + Err(_) => self.config.account_shards(), + }; + let range = shard_key_range(self.shard_index, total_shards); + + debug!( + slot = self.slot, + shard = self.shard_index, + total = total_shards, + "loading ashard context" + ); + + let boundary = BoundaryWork::load_ashard::( + domain.state(), + self.genesis.clone(), + self.shard_index, + total_shards, + range, + )?; + + info!( + epoch = boundary.ending_state().number, + shard = self.shard_index, + "ashard" + ); + + self.boundary = Some(boundary); + + debug!("ashard context loaded"); + Ok(()) + } + + fn compute(&mut self) -> Result<(), DomainError> { + debug!("ashard compute (deltas computed during load)"); + Ok(()) + } + + fn commit_state(&mut self, domain: &D) -> Result<(), DomainError> { + // Mirror the same effective-total logic as `load` so the commit's + // key range matches what `load` used. + let total_shards = match load_epoch::(domain.state()) { + Ok(epoch) => epoch + .ashard_progress + .as_ref() + .map(|p| p.total) + .unwrap_or_else(|| self.config.account_shards()), + Err(_) => self.config.account_shards(), + }; + let range = shard_key_range(self.shard_index, total_shards); + + let boundary = self + .boundary + .as_mut() + .ok_or_else(|| DomainError::Internal("ashard boundary not loaded".into()))?; + + boundary.commit_ashard::(domain.state(), domain.archive(), range)?; + Ok(()) + } + + fn commit_archive(&mut self, _domain: &D) -> Result<(), DomainError> { + Ok(()) + } +} diff --git a/crates/cardano/src/ewrap/commit.rs b/crates/cardano/src/ewrap/commit.rs index ba96ba156..f6ce0fa39 100644 --- a/crates/cardano/src/ewrap/commit.rs +++ b/crates/cardano/src/ewrap/commit.rs @@ -1,8 +1,14 @@ -//! Commit logic for epoch wrap (ewrap) work unit. +//! Commit logic for the two-phase boundary pipeline (AShard + Ewrap). //! -//! This module uses a streaming pattern that processes entities one-by-one, -//! applying deltas and writing immediately without accumulating all entities -//! in memory. +//! Each phase commits its own deltas and archive logs atomically. The handoff +//! between phases happens via `EpochState.end` + `EpochState.ashard_progress` +//! (see `EpochEndAccumulate` / `EpochWrapUp` deltas). +//! +//! Both phases share the same streaming helper: each entity namespace is read +//! one record at a time, deltas for that record are applied, and the result +//! is written immediately. Peak residency is bounded by whatever the current +//! phase keeps in `BoundaryWork` — shards keep a single key-range slice, +//! Ewrap keeps the small globals. use dolos_core::{ ArchiveStore, ArchiveWriter, ChainError, ChainPoint, Domain, Entity, EntityDelta as _, LogKey, @@ -12,29 +18,29 @@ use tracing::{debug, instrument, trace, warn}; use crate::{ ewrap::BoundaryWork, rupd::credential_to_key, AccountState, CardanoEntity, DRepState, - EpochState, FixedNamespace, PendingMirState, PendingRewardState, PoolState, ProposalState, + EpochState, FixedNamespace, PendingMirState, PoolState, ProposalState, }; impl BoundaryWork { /// Stream entities from a namespace, apply deltas, and write immediately. /// - /// Processes entities one at a time without accumulating them in memory, - /// reducing peak memory usage during epoch boundary commits. - fn stream_and_apply_namespace( + /// `range` optionally narrows iteration — `AShard` passes the + /// shard's key range so only accounts in that slice are streamed. + pub(crate) fn stream_and_apply_namespace( &mut self, state: &D::State, writer: &::Writer, + range: Option>, ) -> Result<(), ChainError> where D: Domain, E: Entity + FixedNamespace + Into, { - let records = state.iter_entities_typed::(E::NS, None)?; + let records = state.iter_entities_typed::(E::NS, range)?; for record in records { let (entity_id, entity) = record?; - // Check if this entity has deltas to apply let to_apply = self .deltas .entities @@ -47,7 +53,6 @@ impl BoundaryWork { delta.apply(&mut entity); } - // Write immediately - don't collect! writer.save_entity_typed(E::NS, &entity_id, entity.as_ref())?; } else { trace!(ns = E::NS, key = %entity_id, "no deltas for entity"); @@ -57,44 +62,44 @@ impl BoundaryWork { Ok(()) } + /// Commit the Ewrap phase: enactment/MIR/refund/wrapup-global deltas + /// for pools, dreps, proposals, plus the `EpochWrapUp` delta on + /// `EpochState` that closes the boundary (overwrites `entity.end` with + /// the final stats, rotates rolling/pparams snapshots, clears + /// `ashard_progress`). Also writes archive logs produced by the global + /// visitors (e.g. `PoolDepositRefundLog`) and the completed `EpochState` + /// snapshot under the epoch-start temporal key. #[instrument(skip_all)] - pub fn commit( + pub fn commit_ewrap( &mut self, state: &D::State, archive: &D::Archive, ) -> Result<(), ChainError> { - debug!("committing ewrap changes (streaming mode)"); + debug!("committing ewrap changes"); let writer = state.start_writer()?; let archive_writer = archive.start_writer()?; - // Stream each namespace - entities are read, processed, and written one at a time - debug!("streaming account entities"); - self.stream_and_apply_namespace::(state, &writer)?; - - debug!("streaming pool entities"); - self.stream_and_apply_namespace::(state, &writer)?; - - debug!("streaming drep entities"); - self.stream_and_apply_namespace::(state, &writer)?; - - debug!("streaming proposal entities"); - self.stream_and_apply_namespace::(state, &writer)?; - - debug!("streaming epoch entities"); - self.stream_and_apply_namespace::(state, &writer)?; - - // Delete applied pending rewards - debug!( - count = self.applied_reward_credentials.len(), - "deleting applied pending rewards" - ); - for credential in self.applied_reward_credentials.drain(..) { - let key = credential_to_key(&credential); - writer.delete_entity(PendingRewardState::NS, &key)?; - } - - // Delete processed pending MIRs + // Apply deltas to pools / dreps / proposals. The only `AssignRewards` + // deltas Ewrap queues against accounts come from MIR processing + // (per-account stake rewards are owned by the preceding AShard + // phase); they're applied in the account namespace below. + self.stream_and_apply_namespace::(state, &writer, None)?; + self.stream_and_apply_namespace::(state, &writer, None)?; + self.stream_and_apply_namespace::(state, &writer, None)?; + + // MIR AssignRewards land on accounts; stream the account namespace so + // MIR recipients get their rewards applied here (only recipients have + // queued deltas, so this is effectively a targeted write via the + // streaming path). + self.stream_and_apply_namespace::(state, &writer, None)?; + + // EpochState receives the boundary-closing deltas (PParamsUpdate, + // TreasuryWithdrawal from enactment; EpochWrapUp from the wrapup + // visitor that finalises `entity.end` and rotates snapshots). + self.stream_and_apply_namespace::(state, &writer, None)?; + + // Delete processed pending MIRs. debug!( count = self.applied_mir_credentials.len(), "deleting processed pending MIRs" @@ -104,42 +109,31 @@ impl BoundaryWork { writer.delete_entity(PendingMirState::NS, &key)?; } - // Drain remaining unspendable rewards - if !self.rewards.is_empty() { - warn!( - remaining = self.rewards.len(), - "draining remaining unspendable rewards" - ); - for (credential, _) in self.rewards.iter_pending() { - let key = credential_to_key(credential); - writer.delete_entity(PendingRewardState::NS, &key)?; - } - } - - // Write archive logs (still accumulated during compute_deltas, but much smaller than entities) - let start_of_epoch = self.chain_summary.epoch_start(self.ending_state.number); + // Write archive logs under the epoch-start temporal key. + let start_of_epoch = self.chain_summary.epoch_start(self.ending_state().number); let temporal_key = TemporalKey::from(&ChainPoint::Slot(start_of_epoch)); - debug!(log_count = self.logs.len(), "writing archive logs"); + debug!(log_count = self.logs.len(), "writing ewrap archive logs"); for (entity_key, log) in self.logs.drain(..) { let log_key = LogKey::from((temporal_key.clone(), entity_key)); archive_writer.write_log_typed(&log_key, &log)?; } - // Write epoch state to archive - archive_writer.write_log_typed(&temporal_key.clone().into(), &self.ending_state)?; + // Write the completed `EpochState` to archive under the epoch-start + // temporal key (preserves the pre-snapshot-rotation state for + // historical queries). `ending_state.end` was assembled with the + // final stats by `wrapup.flush` before this commit ran. + archive_writer.write_log_typed(&temporal_key.clone().into(), self.ending_state())?; - // Verify all deltas were processed if !self.deltas.entities.is_empty() { - warn!(quantity = %self.deltas.entities.len(), "uncommitted deltas"); + warn!(quantity = %self.deltas.entities.len(), "uncommitted ewrap deltas"); } - // Commit both writers atomically writer.commit()?; archive_writer.commit()?; debug!("ewrap commit complete"); - Ok(()) } + } diff --git a/crates/cardano/src/ewrap/loading.rs b/crates/cardano/src/ewrap/loading.rs index 176597e6b..e80269217 100644 --- a/crates/cardano/src/ewrap/loading.rs +++ b/crates/cardano/src/ewrap/loading.rs @@ -1,17 +1,16 @@ use std::{collections::HashMap, sync::Arc}; use dolos_core::{BlockSlot, ChainError, Domain, Genesis, StateStore, TxOrder}; -use pallas::{codec::minicbor, ledger::primitives::StakeCredential}; +use pallas::codec::minicbor; use crate::{ ewrap::{BoundaryVisitor as _, BoundaryWork}, load_era_summary, pallas_extras, - pots::EpochIncentives, - rewards::{Reward, RewardMap}, + rewards::RewardMap, roll::WorkDeltas, - rupd::{credential_to_key, RupdWork}, - AccountState, DRepState, EraProtocol, FixedNamespace as _, PendingMirState, PendingRewardState, - PoolState, ProposalState, + rupd::credential_to_key, + AccountState, DRepState, EraProtocol, FixedNamespace as _, PendingMirState, PoolState, + ProposalState, }; impl BoundaryWork { @@ -50,7 +49,7 @@ impl BoundaryWork { Ok(account) } - fn load_pool_data(&mut self, state: &D::State) -> Result<(), ChainError> { + pub(crate) fn load_pool_data(&mut self, state: &D::State) -> Result<(), ChainError> { let pools = state.iter_entities_typed::(PoolState::NS, None)?; for record in pools { @@ -111,7 +110,7 @@ impl BoundaryWork { None } - fn load_drep_data(&mut self, state: &D::State) -> Result<(), ChainError> { + pub(crate) fn load_drep_data(&mut self, state: &D::State) -> Result<(), ChainError> { let dreps = state.iter_entities_typed::(DRepState::NS, None)?; for record in dreps { @@ -246,74 +245,124 @@ impl BoundaryWork { Ok(()) } - pub fn compute_deltas(&mut self, state: &D::State) -> Result<(), ChainError> { - // Process pending MIRs first (before regular rewards) + /// Construct an empty `BoundaryWork` with the small globals every phase needs + /// (ending_state, chain summary, active protocol, genesis, incentives). + pub(crate) fn new_empty( + state: &D::State, + genesis: Arc, + ) -> Result { + let ending_state = crate::load_epoch::(state)?; + let chain_summary = load_era_summary::(state)?; + let active_protocol = EraProtocol::from(chain_summary.edge().protocol); + let incentives = ending_state.incentives.clone().unwrap_or_default(); + + Ok(BoundaryWork { + ending_state, + chain_summary, + active_protocol, + genesis, + rewards: RewardMap::from_pending(HashMap::new(), incentives), + new_pools: Default::default(), + retiring_pools: Default::default(), + expiring_dreps: Default::default(), + retiring_dreps: Default::default(), + reregistrating_dreps: Default::default(), + enacting_proposals: Default::default(), + dropping_proposals: Default::default(), + deltas: WorkDeltas::default(), + logs: Default::default(), + applied_reward_credentials: Default::default(), + applied_rewards: Default::default(), + effective_treasury_mirs: 0, + effective_reserve_mirs: 0, + invalid_treasury_mirs: 0, + invalid_reserve_mirs: 0, + applied_mir_credentials: Default::default(), + shard_applied_effective: 0, + shard_applied_unspendable_to_treasury: 0, + shard_applied_unspendable_to_reserves: 0, + }) + } + + // --------------------------------------------------------------------- + // Ewrap-phase orchestration. AShard's load/commit live in + // `crate::ashard` (separate module); the shared helpers above are reused. + // --------------------------------------------------------------------- + + /// Load + compute for the `Ewrap` phase: + /// * classify pools/dreps/proposals (retiring/enacting/dropping), + /// * process pending MIRs, + /// * run the enactment / refunds / wrapup visitors (global only — + /// account-level work happened in the preceding AShards), and + /// * emit a single `EpochWrapUp` delta carrying the final `EndStats` + /// (prepare-time fields + shard-populated reward accumulators). Apply + /// overwrites `entity.end`, rotates rolling/pparams snapshots, and + /// clears `ashard_progress`. The boundary close is now part of this + /// load; there is no separate finalize phase. + pub fn load_ewrap( + state: &D::State, + genesis: Arc, + ) -> Result { + let mut boundary = Self::new_empty::(state, genesis)?; + + boundary.load_pool_data::(state)?; + boundary.load_drep_data::(state)?; + boundary.load_proposal_data::(state)?; + + boundary.compute_ewrap_deltas::(state)?; + + Ok(boundary) + } + + // --------------------------------------------------------------------- + // Ewrap-phase compute helper. Drives the global visitors (enactment / + // refunds / drops / wrapup) over pools, dreps, and proposals, then the + // wrapup visitor's flush emits `EpochWrapUp` carrying the final + // `EndStats`. + // --------------------------------------------------------------------- + + fn compute_ewrap_deltas(&mut self, state: &D::State) -> Result<(), ChainError> { self.process_pending_mirs::(state)?; let mut visitor_enactment = crate::ewrap::enactment::BoundaryVisitor::default(); - let mut visitor_rewards = crate::ewrap::rewards::BoundaryVisitor::default(); let mut visitor_drops = crate::ewrap::drops::BoundaryVisitor::default(); let mut visitor_refunds = crate::ewrap::refunds::BoundaryVisitor::default(); let mut visitor_wrapup = crate::ewrap::wrapup::BoundaryVisitor::default(); + // Pools — all pools, then retiring pools via their stored clones. let pools = state.iter_entities_typed::(PoolState::NS, None)?; - - for pool in pools { - let (pool_id, pool) = pool?; - + for record in pools { + let (pool_id, pool) = record?; visitor_enactment.visit_pool(self, &pool_id, &pool)?; - visitor_rewards.visit_pool(self, &pool_id, &pool)?; visitor_drops.visit_pool(self, &pool_id, &pool)?; visitor_refunds.visit_pool(self, &pool_id, &pool)?; visitor_wrapup.visit_pool(self, &pool_id, &pool)?; } let retiring_pools = self.retiring_pools.clone(); - - for (pool_id, (pool, account)) in retiring_pools { - visitor_enactment.visit_retiring_pool(self, pool_id, &pool, account.as_ref())?; - visitor_rewards.visit_retiring_pool(self, pool_id, &pool, account.as_ref())?; - visitor_drops.visit_retiring_pool(self, pool_id, &pool, account.as_ref())?; - visitor_refunds.visit_retiring_pool(self, pool_id, &pool, account.as_ref())?; - visitor_wrapup.visit_retiring_pool(self, pool_id, &pool, account.as_ref())?; + for (pool_hash, (pool, account)) in retiring_pools { + visitor_enactment.visit_retiring_pool(self, pool_hash, &pool, account.as_ref())?; + visitor_drops.visit_retiring_pool(self, pool_hash, &pool, account.as_ref())?; + visitor_refunds.visit_retiring_pool(self, pool_hash, &pool, account.as_ref())?; + visitor_wrapup.visit_retiring_pool(self, pool_hash, &pool, account.as_ref())?; } + // DReps — drops.visit_drep emits DRepExpiration for expiring dreps. let dreps = state.iter_entities_typed::(DRepState::NS, None)?; - - for drep in dreps { - let (drep_id, drep) = drep?; - + for record in dreps { + let (drep_id, drep) = record?; visitor_enactment.visit_drep(self, &drep_id, &drep)?; - visitor_rewards.visit_drep(self, &drep_id, &drep)?; visitor_drops.visit_drep(self, &drep_id, &drep)?; visitor_refunds.visit_drep(self, &drep_id, &drep)?; visitor_wrapup.visit_drep(self, &drep_id, &drep)?; } - let accounts = state.iter_entities_typed::(AccountState::NS, None)?; - - for account in accounts { - let (account_id, account) = account?; - - visitor_enactment.visit_account(self, &account_id, &account)?; - - // HACK: we need the rewards to apply before the retires. This is because the rewards will update the live value before the snapshot but the retires will schedule refunds for after the snapshot. If we switch the sequence, the rewards will be overriden by the refund schedule. If we keep this order, the refund will clone the existing live values with the rewards already applied. - // TODO: we should probably move the retires to ESTART after the snapshot has been taken. - visitor_rewards.visit_account(self, &account_id, &account)?; - visitor_drops.visit_account(self, &account_id, &account)?; - visitor_refunds.visit_account(self, &account_id, &account)?; - - visitor_wrapup.visit_account(self, &account_id, &account)?; - } - + // Active proposals + enacting + dropping. let proposals = state.iter_entities_typed::(ProposalState::NS, None)?; - - for proposal in proposals { - let (proposal_id, proposal) = proposal?; - + for record in proposals { + let (proposal_id, proposal) = record?; if proposal.is_active(self.ending_state.number) { visitor_enactment.visit_active_proposal(self, &proposal_id, &proposal)?; - visitor_rewards.visit_active_proposal(self, &proposal_id, &proposal)?; visitor_drops.visit_active_proposal(self, &proposal_id, &proposal)?; visitor_refunds.visit_active_proposal(self, &proposal_id, &proposal)?; visitor_wrapup.visit_active_proposal(self, &proposal_id, &proposal)?; @@ -321,128 +370,30 @@ impl BoundaryWork { } let enacting_proposals = self.enacting_proposals.clone(); - for (id, (proposal, account)) in enacting_proposals.iter() { visitor_enactment.visit_enacting_proposal(self, id, proposal, account.as_ref())?; - visitor_rewards.visit_enacting_proposal(self, id, proposal, account.as_ref())?; visitor_drops.visit_enacting_proposal(self, id, proposal, account.as_ref())?; visitor_refunds.visit_enacting_proposal(self, id, proposal, account.as_ref())?; visitor_wrapup.visit_enacting_proposal(self, id, proposal, account.as_ref())?; } let dropping_proposals = self.dropping_proposals.clone(); - for (id, (proposal, account)) in dropping_proposals.iter() { visitor_enactment.visit_dropping_proposal(self, id, proposal, account.as_ref())?; - visitor_rewards.visit_dropping_proposal(self, id, proposal, account.as_ref())?; visitor_drops.visit_dropping_proposal(self, id, proposal, account.as_ref())?; visitor_refunds.visit_dropping_proposal(self, id, proposal, account.as_ref())?; visitor_wrapup.visit_dropping_proposal(self, id, proposal, account.as_ref())?; } visitor_enactment.flush(self)?; - visitor_rewards.flush(self)?; visitor_drops.flush(self)?; visitor_refunds.flush(self)?; + + // wrapup.flush emits the final `EpochWrapUp` delta carrying the + // assembled `EndStats` (prepare-time fields + shard accumulators). visitor_wrapup.flush(self)?; Ok(()) } - /// Load pending rewards from state store (persisted by RUPD). - fn load_pending_rewards( - state: &D::State, - incentives: EpochIncentives, - ) -> Result, ChainError> { - let pending_iter = - state.iter_entities_typed::(PendingRewardState::NS, None)?; - - let mut pending: HashMap = HashMap::new(); - - for record in pending_iter { - let (_, pending_state) = record?; - let credential = pending_state.credential.clone(); - let reward = Reward::from_pending_state(&pending_state); - pending.insert(credential, reward); - } - - let pending_total: u64 = pending.values().map(|r| r.total_value()).sum(); - let spendable_total: u64 = pending - .values() - .filter(|r| r.is_spendable()) - .map(|r| r.total_value()) - .sum(); - let unspendable_total: u64 = pending - .values() - .filter(|r| !r.is_spendable()) - .map(|r| r.total_value()) - .sum(); - let spendable_count = pending.values().filter(|r| r.is_spendable()).count(); - let unspendable_count = pending.len() - spendable_count; - - tracing::debug!( - pending_count = pending.len(), - %pending_total, - %spendable_count, - %spendable_total, - %unspendable_count, - %unspendable_total, - "loaded pending rewards from state" - ); - - Ok(RewardMap::from_pending(pending, incentives)) - } - - pub fn load( - state: &D::State, - genesis: Arc, - ) -> Result { - let ending_state = crate::load_epoch::(state)?; - let chain_summary = load_era_summary::(state)?; - let active_protocol = EraProtocol::from(chain_summary.edge().protocol); - - // Load incentives from epoch state (set by RUPD) - let incentives = ending_state.incentives.clone().unwrap_or_default(); - - // Load pending rewards from state store - let rewards = Self::load_pending_rewards::(state, incentives)?; - - let mut boundary = BoundaryWork { - ending_state, - chain_summary, - active_protocol, - genesis, - rewards, - - // to be loaded right after - new_pools: Default::default(), - retiring_pools: Default::default(), - expiring_dreps: Default::default(), - retiring_dreps: Default::default(), - reregistrating_dreps: Default::default(), - enacting_proposals: Default::default(), - dropping_proposals: Default::default(), - - // empty until computed - deltas: WorkDeltas::default(), - logs: Default::default(), - applied_reward_credentials: Default::default(), - applied_rewards: Default::default(), - effective_treasury_mirs: 0, - effective_reserve_mirs: 0, - invalid_treasury_mirs: 0, - invalid_reserve_mirs: 0, - applied_mir_credentials: Default::default(), - }; - - boundary.load_pool_data::(state)?; - - boundary.load_drep_data::(state)?; - - boundary.load_proposal_data::(state)?; - - boundary.compute_deltas::(state)?; - - Ok(boundary) - } } diff --git a/crates/cardano/src/ewrap/mod.rs b/crates/cardano/src/ewrap/mod.rs index bb850fe48..b308601f6 100644 --- a/crates/cardano/src/ewrap/mod.rs +++ b/crates/cardano/src/ewrap/mod.rs @@ -12,8 +12,9 @@ use crate::{ ProposalState, }; -/// A reward that was applied at EWRAP time. -/// This represents a spendable reward that was successfully credited to an account. +/// A reward that was applied during the per-account boundary phase +/// (`AShard`). Represents a spendable reward that was successfully +/// credited to an account. #[derive(Debug, Clone)] pub struct AppliedReward { pub credential: StakeCredential, @@ -30,7 +31,6 @@ pub mod work_unit; pub mod drops; pub mod enactment; pub mod refunds; -pub mod rewards; pub mod wrapup; pub use work_unit::EwrapWorkUnit; @@ -122,7 +122,7 @@ pub type ProposalId = EntityKey; pub struct BoundaryWork { // loaded - ending_state: EpochState, + pub(crate) ending_state: EpochState, pub active_protocol: EraProtocol, pub chain_summary: ChainSummary, pub genesis: Arc, @@ -147,8 +147,9 @@ pub struct BoundaryWork { /// Credentials whose rewards were applied (need to be dequeued from state). pub applied_reward_credentials: Vec, - /// Rewards that were actually applied (spendable) at EWRAP time. - /// This is populated during the reward visitor phase and survives commit. + /// Rewards that were actually applied (spendable) during the + /// `AShard` reward visitor. Populated per-shard and survives the + /// commit so callers can observe what was credited. pub applied_rewards: Vec, /// Effective MIRs from treasury (applied to registered accounts). @@ -165,6 +166,23 @@ pub struct BoundaryWork { /// Credentials whose pending MIRs were processed (need to be dequeued from state). pub applied_mir_credentials: Vec, + + /// Shard-local reward accumulator — total effective rewards applied by + /// the current `AShard` run. Snapshot into `EpochEndAccumulate` + /// before the shard commits. Populated only by AShard work units; + /// zero in `BoundaryWork`s loaded for the Ewrap phase. + pub shard_applied_effective: u64, + + /// Shard-local unspendable reward routed to treasury (accounts that + /// deregistered between RUPD and EWRAP). Snapshot into + /// `EpochEndAccumulate` before the shard commits. Zero outside + /// AShard. + pub shard_applied_unspendable_to_treasury: u64, + + /// Shard-local unspendable reward that returns to reserves (pre-Babbage + /// filtered entries). Snapshot into `EpochEndAccumulate` before the + /// shard commits. Zero outside AShard. + pub shard_applied_unspendable_to_reserves: u64, } impl BoundaryWork { diff --git a/crates/cardano/src/ewrap/work_unit.rs b/crates/cardano/src/ewrap/work_unit.rs index 710753037..5b14391d6 100644 --- a/crates/cardano/src/ewrap/work_unit.rs +++ b/crates/cardano/src/ewrap/work_unit.rs @@ -1,10 +1,13 @@ -//! Ewrap (Epoch Wrap) work unit implementation. +//! Ewrap work unit — global (non-account) work and the boundary close. //! -//! The ewrap work unit handles epoch boundary processing including: -//! - Applying rewards to accounts (loaded from pending_rewards state) -//! - Processing pool retirements -//! - Handling governance proposal enactment -//! - DRep expiration +//! Runs after the per-account `AShardWorkUnit` series. Performs +//! pool/drep/proposal classification, enactment, MIR processing, deposit +//! refunds, then assembles the final `EndStats` (combining prepare-time +//! fields with the shard-populated accumulators) and emits a single +//! `EpochWrapUp` delta. `EpochWrapUp::apply` overwrites `entity.end` with +//! the final stats, rotates rolling/pparams snapshots forward, and clears +//! `ashard_progress`. The completed `EpochState` is also written to archive +//! at commit time. use std::sync::Arc; @@ -15,20 +18,16 @@ use crate::CardanoLogic; use super::BoundaryWork; -/// Work unit for epoch boundary wrap-up processing. pub struct EwrapWorkUnit { slot: BlockSlot, #[allow(dead_code)] config: CardanoConfig, genesis: Arc, - // Loaded boundary: Option, } impl EwrapWorkUnit { - /// Create a new ewrap work unit. - /// Rewards are loaded from state store during load phase. pub fn new(slot: BlockSlot, config: CardanoConfig, genesis: Arc) -> Self { Self { slot, @@ -38,7 +37,6 @@ impl EwrapWorkUnit { } } - /// Access the loaded boundary work context. pub fn boundary(&self) -> Option<&BoundaryWork> { self.boundary.as_ref() } @@ -53,43 +51,35 @@ where } fn load(&mut self, domain: &D) -> Result<(), DomainError> { - debug!(slot = self.slot, "loading ewrap boundary context"); + debug!(slot = self.slot, "loading ewrap context"); - // Load rewards from state store (persisted by RUPD) - let boundary = BoundaryWork::load::(domain.state(), self.genesis.clone())?; + let boundary = BoundaryWork::load_ewrap::(domain.state(), self.genesis.clone())?; - info!(epoch = boundary.ending_state().number, "ending epoch"); + info!(epoch = boundary.ending_state().number, "ewrap"); self.boundary = Some(boundary); - debug!("ewrap boundary context loaded"); + debug!("ewrap context loaded"); Ok(()) } fn compute(&mut self) -> Result<(), DomainError> { - // Computation is done during load via compute_deltas - // This is because the visitor pattern needs access to state - debug!("ewrap compute phase (deltas already computed during load)"); + debug!("ewrap compute (deltas computed during load)"); Ok(()) } fn commit_state(&mut self, domain: &D) -> Result<(), DomainError> { - debug!(slot = self.slot, "committing ewrap state changes"); - let boundary = self .boundary .as_mut() .ok_or_else(|| DomainError::Internal("ewrap boundary not loaded".into()))?; - boundary.commit::(domain.state(), domain.archive())?; - - debug!("ewrap state committed"); + boundary.commit_ewrap::(domain.state(), domain.archive())?; Ok(()) } fn commit_archive(&mut self, _domain: &D) -> Result<(), DomainError> { - // Archive writes are done in commit_state via boundary.commit() - // because they're interleaved with state commits Ok(()) } } + diff --git a/crates/cardano/src/ewrap/wrapup.rs b/crates/cardano/src/ewrap/wrapup.rs index 33fa31d33..8e88e2118 100644 --- a/crates/cardano/src/ewrap/wrapup.rs +++ b/crates/cardano/src/ewrap/wrapup.rs @@ -80,34 +80,21 @@ fn define_end_stats(ctx: &super::BoundaryWork) -> EndStats { ); } - let effective = ctx.rewards.applied_effective(); - let to_treasury = ctx.rewards.applied_unspendable_to_treasury(); - let to_reserves = ctx.rewards.applied_unspendable_to_reserves(); - - // Sum of applied_rewards should match effective - let applied_rewards_sum: u64 = ctx.applied_rewards.iter().map(|r| r.amount).sum(); - - #[cfg(feature = "strict")] - assert!( - effective == applied_rewards_sum, - "EWRAP epoch {}: effective_rewards ({}) != applied_rewards_sum ({}), diff = {}", - ctx.ending_state().number, - effective, - applied_rewards_sum, - effective as i64 - applied_rewards_sum as i64 - ); + // Reward accumulators are populated by AShards (which ran before + // this Ewrap phase). Read them back from `EpochState.end` and combine + // with the prepare-time fields to produce the final stats carried by + // `EpochWrapUp`. + let acc = ctx + .ending_state() + .end + .as_ref() + .expect("ESTART seeded EpochState.end before AShards"); tracing::debug!( epoch = ctx.ending_state().number, available_rewards = %incentives.available_rewards, - %effective, - %to_treasury, - %to_reserves, - consumed = %(effective + to_treasury), - returned = %(incentives.available_rewards.saturating_sub(effective + to_treasury)), - %applied_rewards_sum, - applied_rewards_count = ctx.applied_rewards.len(), - "EWRAP reward classification" + effective_rewards = acc.effective_rewards, + "EWRAP: assembling final EndStats (prepare-time + accumulators)" ); EndStats { @@ -115,9 +102,9 @@ fn define_end_stats(ctx: &super::BoundaryWork) -> EndStats { pool_refund_count: pool_refund_count as u64, pool_invalid_refund_count: pool_invalid_refund_count as u64, epoch_incentives: incentives.clone(), - effective_rewards: effective, - unspendable_to_treasury: to_treasury, - unspendable_to_reserves: to_reserves, + effective_rewards: acc.effective_rewards, + unspendable_to_treasury: acc.unspendable_to_treasury, + unspendable_to_reserves: acc.unspendable_to_reserves, treasury_mirs, reserve_mirs, invalid_treasury_mirs, @@ -148,9 +135,18 @@ impl super::BoundaryVisitor for BoundaryVisitor { ctx.add_delta(delta); } - let stats = define_end_stats(ctx); + // Assemble the final `EndStats` from the prepare-time fields plus the + // shard-populated accumulators (already in `ending_state.end`), and + // emit `EpochWrapUp` to close the epoch boundary. Apply will + // overwrite `entity.end` with these stats, rotate the rolling/pparams + // snapshots forward, and clear `ashard_progress`. + let final_stats = define_end_stats(ctx); + + // Stash the final stats on `ending_state.end` so the post-commit + // archive write in `commit_ewrap` reflects the finalised state. + ctx.ending_state.end = Some(final_stats.clone()); - ctx.deltas.add_for_entity(EpochWrapUp::new(stats)); + ctx.deltas.add_for_entity(EpochWrapUp::new(final_stats)); Ok(()) } diff --git a/crates/cardano/src/genesis/mod.rs b/crates/cardano/src/genesis/mod.rs index c2220e18f..63ab3bd7d 100644 --- a/crates/cardano/src/genesis/mod.rs +++ b/crates/cardano/src/genesis/mod.rs @@ -4,8 +4,8 @@ use dolos_core::{ }; use crate::{ - indexes::index_delta_from_utxo_delta, pots::Pots, utils::nonce_stability_window, EpochState, - EpochValue, EraBoundary, EraSummary, Lovelace, Nonces, PParamsSet, RollingStats, + indexes::index_delta_from_utxo_delta, pots::Pots, utils::nonce_stability_window, EndStats, + EpochState, EpochValue, EraBoundary, EraSummary, Lovelace, Nonces, PParamsSet, RollingStats, CURRENT_EPOCH_KEY, }; @@ -77,8 +77,12 @@ pub fn bootstrap_epoch( previous_nonce_tail: None, number: 0, rolling: EpochValue::with_live(0, RollingStats::default()), - end: None, + // Seed `end` with defaults so the first boundary's AShard + // (which now runs before Ewrap) finds a populated slot. ESTART's + // `EpochTransition` re-seeds it on every subsequent epoch. + end: Some(EndStats::default()), incentives: None, + ashard_progress: None, }; let writer = state.start_writer()?; diff --git a/crates/cardano/src/lib.rs b/crates/cardano/src/lib.rs index 5dae7803c..9a6668085 100644 --- a/crates/cardano/src/lib.rs +++ b/crates/cardano/src/lib.rs @@ -37,6 +37,7 @@ pub mod utils; pub mod utxoset; // work units +pub mod ashard; pub mod estart; pub mod ewrap; pub mod genesis; @@ -69,8 +70,18 @@ pub enum CardanoWorkUnit { Roll(Box), /// Compute rewards at stability window boundary. Rupd(Box), - /// Handle epoch boundary wrap-up processing. + /// Handle the global portion of the epoch boundary (pool/drep/proposal + /// classification, MIRs, enactment, deposit refunds) and close the + /// boundary by emitting `EpochWrapUp` with the assembled final + /// `EndStats` (prepare-time fields + accumulator fields populated by + /// the preceding `AShard` runs). The `EpochState.end` slot itself + /// is opened by ESTART's `EpochTransition` at the start of each epoch. Ewrap(Box), + /// Handle one shard of per-account reward application. Emitted + /// `config.account_shards` times in sequence. Each shard covers a + /// first-byte prefix range of the account key space and accumulates its + /// contribution into `EpochState.end` via `EpochEndAccumulate`. + AShard(Box), /// Handle epoch start processing. Estart(Box), /// Signal forced stop at configured epoch. @@ -87,6 +98,7 @@ where Self::Roll(w) => >::name(w), Self::Rupd(w) => >::name(w), Self::Ewrap(w) => >::name(w), + Self::AShard(w) => >::name(w), Self::Estart(w) => >::name(w), Self::ForcedStop => "forced_stop", } @@ -98,6 +110,7 @@ where Self::Roll(w) => >::load(w, domain), Self::Rupd(w) => >::load(w, domain), Self::Ewrap(w) => >::load(w, domain), + Self::AShard(w) => >::load(w, domain), Self::Estart(w) => >::load(w, domain), Self::ForcedStop => Ok(()), } @@ -109,6 +122,7 @@ where Self::Roll(w) => >::compute(w), Self::Rupd(w) => >::compute(w), Self::Ewrap(w) => >::compute(w), + Self::AShard(w) => >::compute(w), Self::Estart(w) => >::compute(w), Self::ForcedStop => Ok(()), } @@ -119,7 +133,12 @@ where Self::Genesis(w) => >::commit_wal(w, domain), Self::Roll(w) => >::commit_wal(w, domain), Self::Rupd(w) => >::commit_wal(w, domain), - Self::Ewrap(w) => >::commit_wal(w, domain), + Self::Ewrap(w) => { + >::commit_wal(w, domain) + } + Self::AShard(w) => { + >::commit_wal(w, domain) + } Self::Estart(w) => >::commit_wal(w, domain), Self::ForcedStop => Ok(()), } @@ -130,7 +149,12 @@ where Self::Genesis(w) => >::commit_state(w, domain), Self::Roll(w) => >::commit_state(w, domain), Self::Rupd(w) => >::commit_state(w, domain), - Self::Ewrap(w) => >::commit_state(w, domain), + Self::Ewrap(w) => { + >::commit_state(w, domain) + } + Self::AShard(w) => { + >::commit_state(w, domain) + } Self::Estart(w) => >::commit_state(w, domain), Self::ForcedStop => Err(DomainError::StopEpochReached), } @@ -143,7 +167,12 @@ where } Self::Roll(w) => >::commit_archive(w, domain), Self::Rupd(w) => >::commit_archive(w, domain), - Self::Ewrap(w) => >::commit_archive(w, domain), + Self::Ewrap(w) => { + >::commit_archive(w, domain) + } + Self::AShard(w) => { + >::commit_archive(w, domain) + } Self::Estart(w) => >::commit_archive(w, domain), Self::ForcedStop => Ok(()), } @@ -156,7 +185,12 @@ where } Self::Roll(w) => >::commit_indexes(w, domain), Self::Rupd(w) => >::commit_indexes(w, domain), - Self::Ewrap(w) => >::commit_indexes(w, domain), + Self::Ewrap(w) => { + >::commit_indexes(w, domain) + } + Self::AShard(w) => { + >::commit_indexes(w, domain) + } Self::Estart(w) => >::commit_indexes(w, domain), Self::ForcedStop => Ok(()), } @@ -168,6 +202,7 @@ where Self::Roll(w) => >::tip_events(w), Self::Rupd(w) => >::tip_events(w), Self::Ewrap(w) => >::tip_events(w), + Self::AShard(w) => >::tip_events(w), Self::Estart(w) => >::tip_events(w), Self::ForcedStop => Vec::new(), } @@ -194,6 +229,13 @@ pub struct CardanoLogic { /// Flag indicating the cache needs refresh after a work unit that modifies eras. /// Set after Genesis or EStart work units are popped, cleared at next pop_work call. needs_cache_refresh: bool, + /// Cached effective `account_shards` value: equal to + /// `EpochState.ashard_progress.total` when a boundary is in flight, or + /// `config.account_shards()` otherwise. Refreshed at every `pop_work` + /// call (which has state access) so `receive_block` (which does not) + /// can use the up-to-date value when constructing + /// `WorkBuffer::AShardingBoundary`. + effective_account_shards: u32, } impl CardanoLogic { @@ -204,6 +246,16 @@ impl CardanoLogic { Ok(()) } + + /// Compute the effective `account_shards` value: stored + /// `ashard_progress.total` if a boundary is in flight, otherwise the + /// configured value. + fn read_effective_account_shards(&self, state: &D::State) -> u32 { + load_epoch::(state) + .ok() + .and_then(|e| e.ashard_progress.as_ref().map(|p| p.total)) + .unwrap_or_else(|| self.config.account_shards()) + } } impl dolos_core::ChainLogic for CardanoLogic { @@ -222,6 +274,13 @@ impl dolos_core::ChainLogic for CardanoLogic { ) -> Result { info!("initializing"); + // Reject misconfigured `account_shards` early. The shard-key-range + // helper only `debug_assert!`s the divides-256 invariant, so an + // invalid release-build config (0, 3, 7, 100, ...) would silently + // corrupt key-range coverage. + crate::ashard::shard::validate_total_shards(config.account_shards()) + .map_err(ChainError::InvalidConfig)?; + let cursor = state.read_cursor()?; let work = match cursor { @@ -229,6 +288,65 @@ impl dolos_core::ChainLogic for CardanoLogic { None => WorkBuffer::Empty, }; + // Crash-recovery check: if the previous process crashed mid-boundary, + // `EpochState.ashard_progress` will be `Some(p)` with `p.committed` + // equal to the next AShard that should have run, and `p.total` the + // boundary's shard count captured at the first commit. Detect and + // warn — full resume requires re-fetching the boundary block from + // upstream, which is tracked separately. The persisted `total` is + // used for the in-flight boundary even if `config.account_shards()` + // changed, to avoid breaking the in-progress pipeline. + if let Ok(epoch) = load_epoch::(state) { + if let Some(progress) = epoch.ashard_progress.as_ref() { + let configured = config.account_shards(); + if progress.total != configured { + tracing::warn!( + epoch = epoch.number, + stored_total = progress.total, + configured_total = configured, + "in-flight boundary uses {} shards but config.account_shards = {}; \ + the in-flight boundary will continue with {} (the persisted total) \ + and the new config takes effect on the next boundary", + progress.total, + configured, + progress.total, + ); + } + if progress.committed < progress.total { + tracing::warn!( + epoch = epoch.number, + next_shard = progress.committed, + total_shards = progress.total, + "crash detected mid-boundary: ashard_progress is set. \ + On the next block that triggers the boundary, dolos will \ + resume the AShard pipeline; correctness depends on shard \ + idempotency (state deletes are no-ops if already applied; \ + EpochEndAccumulate guards on shard_index). Operators should \ + monitor the subsequent boundary for inconsistency. \ + TODO: implement true shard resume." + ); + } else { + tracing::warn!( + epoch = epoch.number, + committed = progress.committed, + total_shards = progress.total, + "found EpochState.ashard_progress.committed == total at \ + startup — Ewrap (closing phase) was not committed \ + before crash. The next boundary attempt will re-run \ + AShards and Ewrap; idempotency should keep the \ + result correct." + ); + } + } + } + + // Capture the effective account_shards value: stored total if a + // boundary is in flight, otherwise the configured value. + let effective_account_shards = load_epoch::(state) + .ok() + .and_then(|e| e.ashard_progress.as_ref().map(|p| p.total)) + .unwrap_or_else(|| config.account_shards()); + let eras = eras::load_era_summary::(state)?; // Use randomness_stability_window (4k/f) for the RUPD trigger boundary. @@ -246,6 +364,7 @@ impl dolos_core::ChainLogic for CardanoLogic { }, work: Some(work), needs_cache_refresh: false, + effective_account_shards, }) } @@ -263,7 +382,12 @@ impl dolos_core::ChainLogic for CardanoLogic { let work = self.work.take().expect("work buffer is initialized"); - let new_work = work.receive_block(block, &self.cache.eras, self.cache.stability_window); + let new_work = work.receive_block( + block, + &self.cache.eras, + self.cache.stability_window, + self.effective_account_shards, + ); let last = new_work.last_point_seen().slot(); @@ -284,9 +408,16 @@ impl dolos_core::ChainLogic for CardanoLogic { self.needs_cache_refresh = false; } + // Refresh effective `account_shards` from state. While a boundary is + // in flight, `EpochState.ashard_progress.total` overrides config so + // the in-progress pipeline isn't disrupted by a config change. + self.effective_account_shards = + self.read_effective_account_shards::(domain.state()); + let work = self.work.take().expect("work buffer is initialized"); - let (work_unit, new_buffer) = work.pop_work(self.config.stop_epoch); + let (work_unit, new_buffer) = + work.pop_work(self.config.stop_epoch, self.effective_account_shards); self.work = Some(new_buffer); @@ -312,13 +443,18 @@ impl dolos_core::ChainLogic for CardanoLogic { InternalWorkUnit::Rupd(slot) => Some(CardanoWorkUnit::Rupd(Box::new( rupd::RupdWorkUnit::new(slot, domain.genesis()), ))), - InternalWorkUnit::EWrap(slot) => { - // Rewards are loaded from state store during EWRAP load phase - Some(CardanoWorkUnit::Ewrap(Box::new(ewrap::EwrapWorkUnit::new( - slot, - self.config.clone(), - domain.genesis(), - )))) + InternalWorkUnit::Ewrap(slot) => Some(CardanoWorkUnit::Ewrap(Box::new( + ewrap::EwrapWorkUnit::new(slot, self.config.clone(), domain.genesis()), + ))), + InternalWorkUnit::AShard(slot, shard_index) => { + Some(CardanoWorkUnit::AShard(Box::new( + ashard::AShardWorkUnit::new( + slot, + self.config.clone(), + domain.genesis(), + shard_index, + ), + ))) } InternalWorkUnit::EStart(slot) => { // EStart may trigger era transitions, schedule cache refresh diff --git a/crates/cardano/src/model/epochs.rs b/crates/cardano/src/model/epochs.rs index 6b9719a32..e0e6f63ef 100644 --- a/crates/cardano/src/model/epochs.rs +++ b/crates/cardano/src/model/epochs.rs @@ -149,7 +149,7 @@ impl TransitionDefault for RollingStats { } /// Stats that are gathered at the end of the epoch -#[derive(Debug, Encode, Decode, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Debug, Encode, Decode, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] pub struct EndStats { #[n(0)] pub pool_deposit_count: u64, @@ -210,6 +210,24 @@ pub struct EndStats { pub __drep_refunds: Lovelace, } +/// Snapshot of the AShard pipeline's progress + total within a single epoch +/// boundary. Persisted alongside `EpochState.ashard_progress` so a config +/// change mid-boundary (or across a crash and restart) can't break the +/// in-flight work — the stored `total` is authoritative until the boundary +/// completes. +#[derive(Debug, Clone, Encode, Decode, Default, Serialize, Deserialize, PartialEq, Eq)] +pub struct AShardProgress { + /// Number of shards that have committed; shards `0..committed` are done + /// and `committed` is the next to run. + #[n(0)] + pub committed: u32, + /// Total shard count for this boundary, captured at the first shard's + /// commit (snapshots the value of `CardanoConfig::account_shards()` + /// effective at that moment). + #[n(1)] + pub total: u32, +} + #[derive(Debug, Encode, Decode, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct EpochState { @@ -241,6 +259,19 @@ pub struct EpochState { #[n(14)] #[cbor(default)] pub incentives: Option, + + /// Cursor + total snapshot for the AShard pipeline within this epoch + /// boundary. `None` means no AShard has run yet (the natural starting + /// state, set by `EpochTransition` during ESTART). `Some(p)` means + /// shards `0..p.committed` have committed; `p.total` is the boundary's + /// shard count, captured at the first shard's commit. Each + /// `EpochEndAccumulate` (AShard) advances `committed`; `EpochWrapUp` + /// (Ewrap) clears the field back to `None`. The persisted `total` + /// guards against a config change mid-boundary breaking the in-flight + /// pipeline. + #[n(15)] + #[cbor(default)] + pub ashard_progress: Option, } impl Default for EpochState { @@ -255,6 +286,7 @@ impl Default for EpochState { nonces: None, end: None, incentives: None, + ashard_progress: None, } } } @@ -336,6 +368,9 @@ pub(crate) mod testing { nonces in prop::option::of(any_nonces()), end in prop::option::of(any_end_stats()), incentives in prop::option::of(any_epoch_incentives()), + ashard_progress in prop::option::of( + (0u32..32u32, 1u32..=32u32).prop_map(|(committed, total)| AShardProgress { committed, total }) + ), ) -> EpochState { EpochState { number, @@ -347,6 +382,7 @@ pub(crate) mod testing { nonces, end, incentives, + ashard_progress, } } } @@ -546,6 +582,11 @@ impl dolos_core::EntityDelta for PParamsUpdate { } } +/// Delta emitted once by `Ewrap` to close the epoch boundary. Carries the +/// fully populated `EndStats` (prepare-time fields from the wrap-up visitor + +/// reward accumulators from the preceding `AShard` runs). Apply +/// overwrites `entity.end` with these final stats, rotates the rolling and +/// pparams snapshots forward, and clears `ashard_progress`. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EpochWrapUp { pub(crate) stats: EndStats, @@ -554,6 +595,7 @@ pub struct EpochWrapUp { pub(crate) prev_rolling: Option>, pub(crate) prev_pparams: Option>, pub(crate) prev_end: Option, + pub(crate) prev_ashard_progress: Option, } impl EpochWrapUp { @@ -563,6 +605,7 @@ impl EpochWrapUp { prev_rolling: None, prev_pparams: None, prev_end: None, + prev_ashard_progress: None, } } } @@ -580,10 +623,12 @@ impl dolos_core::EntityDelta for EpochWrapUp { self.prev_rolling = Some(entity.rolling.clone()); self.prev_pparams = Some(entity.pparams.clone()); self.prev_end = entity.end.clone(); + self.prev_ashard_progress = entity.ashard_progress.clone(); entity.rolling.scheduled_or_default(); entity.pparams.scheduled_or_default(); entity.end = Some(self.stats.clone()); + entity.ashard_progress = None; } fn undo(&self, entity: &mut Option) { @@ -591,6 +636,155 @@ impl dolos_core::EntityDelta for EpochWrapUp { entity.rolling = self.prev_rolling.clone().expect("apply captured rolling"); entity.pparams = self.prev_pparams.clone().expect("apply captured pparams"); entity.end = self.prev_end.clone(); + entity.ashard_progress = self.prev_ashard_progress.clone(); + } +} + +/// Delta emitted once per `AShard` to accumulate the shard's reward- +/// distribution contribution into `EpochState.end` and advance +/// `ashard_progress` to the next shard index. Carries `total_shards` so the +/// boundary's shard count is captured in state at the first commit, which +/// makes the in-flight pipeline robust against a config change between +/// shards (e.g. across a crash and restart). +/// +/// Idempotent on repeat-apply by guarding on the committed shard count — +/// a shard that was already committed will have +/// `ashard_progress.committed > completed_shard_index` and should skip. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EpochEndAccumulate { + pub(crate) effective_delta: u64, + pub(crate) unspendable_to_treasury_delta: u64, + pub(crate) unspendable_to_reserves_delta: u64, + pub(crate) completed_shard_index: u32, + pub(crate) total_shards: u32, + + // undo — captured by `apply` only when state was actually mutated + // (i.e. the idempotency / ordering / consistency guards all passed). + // When `applied = false`, `undo` is a no-op so a rolled-back skip + // can't underflow `end.*` u64s or clobber `ashard_progress`. + pub(crate) applied: bool, + pub(crate) prev_ashard_progress: Option, +} + +impl EpochEndAccumulate { + pub fn new( + effective_delta: u64, + unspendable_to_treasury_delta: u64, + unspendable_to_reserves_delta: u64, + completed_shard_index: u32, + total_shards: u32, + ) -> Self { + Self { + effective_delta, + unspendable_to_treasury_delta, + unspendable_to_reserves_delta, + completed_shard_index, + total_shards, + applied: false, + prev_ashard_progress: None, + } + } +} + +impl dolos_core::EntityDelta for EpochEndAccumulate { + type Entity = EpochState; + + fn key(&self) -> NsKey { + NsKey::from((EpochState::NS, CURRENT_EPOCH_KEY)) + } + + fn apply(&mut self, entity: &mut Option) { + let entity = entity.as_mut().expect("existing epoch"); + + // Idempotency + ordering guard. `ashard_progress` is the authoritative + // cursor for which shards have landed: `None` means no shards have + // run yet (shard 0 is the next expected); `Some(p)` means shards + // `0..p.committed` have committed and `p.committed` is next. AShard + // is the first phase of the epoch boundary, so `None` is the + // natural starting state (set by ESTART's `EpochTransition`). + let stored = entity.ashard_progress.as_ref(); + let expected = stored.map(|p| p.committed).unwrap_or(0); + if expected > self.completed_shard_index { + // Already applied (crash-recovery scenario where the shard's + // state commit landed but the work buffer hadn't advanced past + // it). Skip to preserve idempotency. + tracing::debug!( + completed_shard = self.completed_shard_index, + committed = expected, + "EpochEndAccumulate already applied — skipping (idempotent)" + ); + return; + } + if expected < self.completed_shard_index { + // Out-of-order apply (shard N emitted before shard N-1 ran). + // Treated as a broken invariant because it would leave the + // `ashard_progress` cursor misaligned. + tracing::error!( + completed_shard = self.completed_shard_index, + committed = expected, + "EpochEndAccumulate applied out of order — skipping to avoid corruption" + ); + return; + } + + // Consistency: if a previous shard already wrote `total`, this + // delta's `total_shards` must match. A mismatch means the work unit + // was constructed with a different shard count than the in-flight + // boundary — surfaces as an error so it can't silently corrupt + // state. + if let Some(p) = stored { + if p.total != self.total_shards { + tracing::error!( + completed_shard = self.completed_shard_index, + stored_total = p.total, + delta_total = self.total_shards, + "EpochEndAccumulate total_shards disagrees with in-flight \ + boundary — skipping to avoid corruption (config changed \ + mid-boundary?)" + ); + return; + } + } + + let end = entity + .end + .as_mut() + .expect("ESTART seeded EpochState.end before shards run"); + + // Capture undo state before mutating. + self.prev_ashard_progress = entity.ashard_progress.clone(); + + end.effective_rewards += self.effective_delta; + end.unspendable_to_treasury += self.unspendable_to_treasury_delta; + end.unspendable_to_reserves += self.unspendable_to_reserves_delta; + + entity.ashard_progress = Some(AShardProgress { + committed: self.completed_shard_index + 1, + total: self.total_shards, + }); + + self.applied = true; + } + + fn undo(&self, entity: &mut Option) { + // If `apply` hit a skip branch (idempotent / out-of-order / + // total mismatch) it left state untouched, so `undo` must too — + // otherwise we'd underflow `end.*` u64s and clobber the cursor. + if !self.applied { + return; + } + + let entity = entity.as_mut().expect("existing epoch"); + let end = entity + .end + .as_mut() + .expect("end present if accumulate was applied"); + + end.effective_rewards -= self.effective_delta; + end.unspendable_to_treasury -= self.unspendable_to_treasury_delta; + end.unspendable_to_reserves -= self.unspendable_to_reserves_delta; + + entity.ashard_progress = self.prev_ashard_progress.clone(); } } @@ -658,6 +852,8 @@ pub struct EpochTransition { pub(crate) prev_initial_pots: Option, pub(crate) prev_rolling: Option>, pub(crate) prev_pparams: Option>, + pub(crate) prev_end: Option, + pub(crate) prev_ashard_progress: Option, } impl EpochTransition { @@ -676,6 +872,8 @@ impl EpochTransition { prev_initial_pots: None, prev_rolling: None, prev_pparams: None, + prev_end: None, + prev_ashard_progress: None, } } } @@ -707,6 +905,8 @@ impl dolos_core::EntityDelta for EpochTransition { self.prev_initial_pots = Some(entity.initial_pots.clone()); self.prev_rolling = Some(entity.rolling.clone()); self.prev_pparams = Some(entity.pparams.clone()); + self.prev_end = entity.end.clone(); + self.prev_ashard_progress = entity.ashard_progress.clone(); entity.number = self.new_epoch; entity.initial_pots = self.new_pots.clone(); @@ -724,6 +924,13 @@ impl dolos_core::EntityDelta for EpochTransition { self.genesis.as_ref().expect("genesis not set"), ); } + + // Open the EndStats slot for the new epoch with zeroed defaults. + // Ewrap will overwrite this with the fully-populated EndStats at the + // end of this epoch; until then, downstream readers see a consistent + // empty container instead of the previous epoch's stale data. + entity.end = Some(EndStats::default()); + entity.ashard_progress = None; } fn undo(&self, entity: &mut Option) { @@ -736,6 +943,8 @@ impl dolos_core::EntityDelta for EpochTransition { .prev_initial_pots .clone() .expect("apply captured initial_pots"); + entity.end = self.prev_end.clone(); + entity.ashard_progress = self.prev_ashard_progress.clone(); } } @@ -809,6 +1018,7 @@ mod prop_tests { nonces, end, incentives, + ashard_progress: None, } } } @@ -852,6 +1062,45 @@ mod prop_tests { } } + prop_compose! { + /// Generates an `EpochEndAccumulate` with bounded deltas so the + /// `end.*` fields can underflow only via a buggy `undo` (not via + /// generator-level u64 wraparound). + fn any_epoch_end_accumulate()( + effective_delta in 0u64..1_000_000u64, + unspendable_to_treasury_delta in 0u64..1_000_000u64, + unspendable_to_reserves_delta in 0u64..1_000_000u64, + completed_shard_index in 0u32..16u32, + total_shards in 1u32..=16u32, + ) -> EpochEndAccumulate { + EpochEndAccumulate::new( + effective_delta, + unspendable_to_treasury_delta, + unspendable_to_reserves_delta, + completed_shard_index, + total_shards, + ) + } + } + + // Entity generator that always seeds `end = Some(...)` (AShard's + // invariant) and lets `ashard_progress` vary across `None`, + // matching, ahead, behind, and `total` mismatch — so the proptest + // exercises both the apply-mutates and apply-skips branches. + prop_compose! { + fn any_epoch_state_for_accumulate()( + mut entity in any_epoch_state(), + progress in prop::option::of((0u32..32u32, 1u32..=32u32).prop_map( + |(committed, total)| AShardProgress { committed, total } + )), + end in any_end_stats(), + ) -> EpochState { + entity.end = Some(end); + entity.ashard_progress = progress; + entity + } + } + prop_compose! { fn any_nonce_transition()( next_nonce in prop::option::of(any_nonces()), @@ -939,5 +1188,20 @@ mod prop_tests { ) { assert_delta_roundtrip(Some(entity), delta); } + + /// Roundtrip across all branches of `EpochEndAccumulate::apply` — + /// includes entity states whose `ashard_progress` is ahead of, + /// behind, equal to, or `None` relative to the delta's + /// `completed_shard_index`, plus `total_shards` mismatches. The + /// idempotent / out-of-order / total-mismatch branches must + /// roundtrip via the no-op `undo` path; the mutating branch must + /// roundtrip via the captured `prev_ashard_progress`. + #[test] + fn epoch_end_accumulate_roundtrip( + entity in any_epoch_state_for_accumulate(), + delta in any_epoch_end_accumulate(), + ) { + assert_delta_roundtrip(Some(entity), delta); + } } } diff --git a/crates/cardano/src/model/mod.rs b/crates/cardano/src/model/mod.rs index 7fe482e2e..eb6be42e5 100644 --- a/crates/cardano/src/model/mod.rs +++ b/crates/cardano/src/model/mod.rs @@ -204,6 +204,7 @@ pub enum CardanoDelta { PoolDepositRefund(Box), EpochTransition(Box), EpochWrapUp(Box), + EpochEndAccumulate(Box), DRepDelegatorDrop(Box), PoolDelegatorRetire(Box), PoolWrapUp(Box), @@ -279,6 +280,7 @@ delta_from!(AccountTransition); delta_from!(PoolDepositRefund); delta_from!(EpochTransition); delta_from!(EpochWrapUp); +delta_from!(EpochEndAccumulate); delta_from!(DRepDelegatorDrop); delta_from!(PoolDelegatorRetire); delta_from!(PoolWrapUp); @@ -324,6 +326,7 @@ impl dolos_core::EntityDelta for CardanoDelta { Self::PoolDepositRefund(x) => x.key(), Self::EpochTransition(x) => x.key(), Self::EpochWrapUp(x) => x.key(), + Self::EpochEndAccumulate(x) => x.key(), Self::PoolDelegatorRetire(x) => x.key(), Self::DRepDelegatorDrop(x) => x.key(), Self::PoolWrapUp(x) => x.key(), @@ -368,6 +371,7 @@ impl dolos_core::EntityDelta for CardanoDelta { Self::PoolDepositRefund(x) => Self::downcast_apply(x.as_mut(), entity), Self::EpochTransition(x) => Self::downcast_apply(x.as_mut(), entity), Self::EpochWrapUp(x) => Self::downcast_apply(x.as_mut(), entity), + Self::EpochEndAccumulate(x) => Self::downcast_apply(x.as_mut(), entity), Self::DRepDelegatorDrop(x) => Self::downcast_apply(x.as_mut(), entity), Self::PoolDelegatorRetire(x) => Self::downcast_apply(x.as_mut(), entity), Self::PoolWrapUp(x) => Self::downcast_apply(x.as_mut(), entity), @@ -412,6 +416,7 @@ impl dolos_core::EntityDelta for CardanoDelta { Self::PoolDepositRefund(x) => Self::downcast_undo(x.as_ref(), entity), Self::EpochTransition(x) => Self::downcast_undo(x.as_ref(), entity), Self::EpochWrapUp(x) => Self::downcast_undo(x.as_ref(), entity), + Self::EpochEndAccumulate(x) => Self::downcast_undo(x.as_ref(), entity), Self::DRepDelegatorDrop(x) => Self::downcast_undo(x.as_ref(), entity), Self::PoolDelegatorRetire(x) => Self::downcast_undo(x.as_ref(), entity), Self::PoolWrapUp(x) => Self::downcast_undo(x.as_ref(), entity), diff --git a/crates/cardano/src/model/pending.rs b/crates/cardano/src/model/pending.rs index 944df92b6..1c172cc3d 100644 --- a/crates/cardano/src/model/pending.rs +++ b/crates/cardano/src/model/pending.rs @@ -13,8 +13,9 @@ pub fn credential_to_key(cred: &StakeCredential) -> EntityKey { EntityKey::from(enc) } -/// Pending reward for a single account, waiting to be applied at epoch boundary. -/// Created by RUPD, consumed by EWRAP. +/// Pending reward for a single account, waiting to be applied at the epoch +/// boundary. Created by RUPD, consumed by `AShard` (the per-account +/// leg of the boundary pipeline). #[derive(Debug, Clone, Encode, Decode, Serialize, Deserialize, PartialEq, Eq)] pub struct PendingRewardState { #[n(0)] @@ -48,13 +49,14 @@ impl PendingRewardState { entity_boilerplate!(PendingRewardState, "pending_rewards"); -/// Pending MIR (Move Instantaneous Reward) for a single account, waiting to be -/// applied at epoch boundary. Created during block roll when MIR certificates -/// are processed, consumed by EWRAP. +/// Pending MIR (Move Instantaneous Reward) for a single account, waiting to +/// be applied at the epoch boundary. Created during block roll when MIR +/// certificates are processed, consumed by `Ewrap` (MIR processing is part +/// of the global Ewrap phase, not the per-account `AShard` phase). /// /// Unlike regular rewards, MIRs come from either reserves or treasury. -/// At EWRAP, MIRs are only applied to registered accounts - MIRs to unregistered -/// accounts stay in their source pot (no transfer occurs). +/// During Ewrap, MIRs are only applied to registered accounts — MIRs to +/// unregistered accounts stay in their source pot (no transfer occurs). #[derive(Debug, Clone, Encode, Decode, Serialize, Deserialize, PartialEq, Eq)] pub struct PendingMirState { #[n(0)] @@ -159,7 +161,7 @@ impl dolos_core::EntityDelta for EnqueueReward { } /// Delta to dequeue (consume) a pending reward after applying it. -/// Applied by EWRAP after rewards are assigned to accounts. +/// Applied by `AShard` after rewards are assigned to accounts. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DequeueReward { pub credential: StakeCredential, @@ -275,7 +277,7 @@ impl dolos_core::EntityDelta for EnqueueMir { } /// Delta to dequeue (consume) a pending MIR after applying it. -/// Applied by EWRAP after MIRs are assigned to accounts. +/// Applied by `Ewrap` after MIRs are assigned to accounts. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DequeueMir { pub credential: StakeCredential, diff --git a/crates/cardano/src/rupd/work_unit.rs b/crates/cardano/src/rupd/work_unit.rs index 5aad0167e..53f8fff34 100644 --- a/crates/cardano/src/rupd/work_unit.rs +++ b/crates/cardano/src/rupd/work_unit.rs @@ -2,7 +2,8 @@ //! //! The rupd work unit computes rewards at the stability window boundary //! (4k slots before epoch end). Computed rewards are persisted to state store -//! as PendingRewardState entities, to be consumed by EWRAP. +//! as `PendingRewardState` entities, to be consumed by `AShard` at the +//! epoch boundary. use std::sync::Arc; diff --git a/crates/cardano/src/work.rs b/crates/cardano/src/work.rs index 924ca6ecb..650bae931 100644 --- a/crates/cardano/src/work.rs +++ b/crates/cardano/src/work.rs @@ -13,7 +13,8 @@ use crate::roll::{WorkBatch, WorkBlock}; pub(crate) enum InternalWorkUnit { Genesis, Blocks(WorkBatch), - EWrap(BlockSlot), + Ewrap(BlockSlot), + AShard(BlockSlot, u32), EStart(BlockSlot), Rupd(BlockSlot), ForcedStop, @@ -26,7 +27,25 @@ pub(crate) enum WorkBuffer { OpenBatch(WorkBatch), PreRupdBoundary(WorkBatch, OwnedMultiEraBlock), RupdBoundary(OwnedMultiEraBlock), + /// Pre-flushed state when crossing the epoch boundary with a buffered + /// `WorkBatch`. `pop_work` yields the batch as `InternalWorkUnit::Blocks` + /// and advances to `AShardingBoundary { shard_index: 0, total_shards }` + /// (or `EwrapBoundary` if `total_shards == 0`). PreEwrapBoundary(WorkBatch, OwnedMultiEraBlock, Epoch), + /// Entry state that emits `AShard(shard_index)` until + /// `shard_index == total_shards`, then advances to `EwrapBoundary`. + /// This is the first phase of the epoch-boundary pipeline. + AShardingBoundary { + block: OwnedMultiEraBlock, + epoch: Epoch, + shard_index: u32, + total_shards: u32, + }, + /// Closing phase of the epoch-boundary pipeline. `pop_work` yields + /// `InternalWorkUnit::Ewrap` (which runs the global visitors AND emits + /// `EpochWrapUp` to close the boundary) and advances to + /// `EstartBoundary(block, epoch + 1)`. Reached after all shards have + /// committed (or directly from `PreEwrapBoundary` when `total_shards == 0`). EwrapBoundary(OwnedMultiEraBlock, Epoch), EstartBoundary(OwnedMultiEraBlock, Epoch), PreForcedStop(OwnedMultiEraBlock), @@ -48,6 +67,7 @@ impl WorkBuffer { WorkBuffer::RupdBoundary(block) => block.point(), WorkBuffer::PreEwrapBoundary(_, block, _) => block.point(), WorkBuffer::EwrapBoundary(block, _) => block.point(), + WorkBuffer::AShardingBoundary { block, .. } => block.point(), WorkBuffer::EstartBoundary(block, _) => block.point(), WorkBuffer::PreForcedStop(block) => block.point(), WorkBuffer::ForcedStop => unreachable!(), @@ -97,9 +117,26 @@ impl WorkBuffer { } } - fn on_ewrap_boundary(self, next_block: OwnedMultiEraBlock, epoch: Epoch) -> Self { + fn on_ewrap_boundary( + self, + next_block: OwnedMultiEraBlock, + epoch: Epoch, + account_shards: u32, + ) -> Self { match self { - WorkBuffer::Restart(..) => WorkBuffer::EwrapBoundary(next_block, epoch), + WorkBuffer::Restart(..) => { + if account_shards == 0 { + // No shards — entry goes straight to the global Ewrap phase. + WorkBuffer::EwrapBoundary(next_block, epoch) + } else { + WorkBuffer::AShardingBoundary { + block: next_block, + epoch, + shard_index: 0, + total_shards: account_shards, + } + } + } WorkBuffer::OpenBatch(batch) => WorkBuffer::PreEwrapBoundary(batch, next_block, epoch), _ => unreachable!(), } @@ -110,6 +147,7 @@ impl WorkBuffer { block: OwnedMultiEraBlock, eras: &ChainSummary, stability_window: u64, + account_shards: u32, ) -> Self { assert!( self.can_receive_block(), @@ -127,7 +165,7 @@ impl WorkBuffer { let boundary = pallas_extras::epoch_boundary(eras, prev_slot, next_slot); if let Some((epoch, _, _)) = boundary { - return self.on_ewrap_boundary(block, epoch); + return self.on_ewrap_boundary(block, epoch, account_shards); } let rupd_boundary = @@ -140,7 +178,11 @@ impl WorkBuffer { self.extend_batch(block) } - pub fn pop_work(self, stop_epoch: Option) -> (Option, Self) { + pub fn pop_work( + self, + stop_epoch: Option, + account_shards: u32, + ) -> (Option, Self) { if matches!(self, WorkBuffer::Restart(..)) || matches!(self, WorkBuffer::Empty) { return (None, self); } @@ -165,12 +207,44 @@ impl WorkBuffer { Some(InternalWorkUnit::Rupd(block.slot())), Self::OpenBatch(WorkBatch::for_single_block(WorkBlock::new(block))), ), - WorkBuffer::PreEwrapBoundary(batch, block, epoch) => ( - Some(InternalWorkUnit::Blocks(batch)), - Self::EwrapBoundary(block, epoch), - ), + WorkBuffer::PreEwrapBoundary(batch, block, epoch) => { + let next = if account_shards == 0 { + // No shards requested — skip straight to the global Ewrap + // phase. Shouldn't happen with a valid config but keeps + // the state machine safe. + Self::EwrapBoundary(block, epoch) + } else { + Self::AShardingBoundary { + block, + epoch, + shard_index: 0, + total_shards: account_shards, + } + }; + (Some(InternalWorkUnit::Blocks(batch)), next) + } + WorkBuffer::AShardingBoundary { + block, + epoch, + shard_index, + total_shards, + } => { + let slot = block.slot(); + let next_index = shard_index + 1; + let next = if next_index >= total_shards { + Self::EwrapBoundary(block, epoch) + } else { + Self::AShardingBoundary { + block, + epoch, + shard_index: next_index, + total_shards, + } + }; + (Some(InternalWorkUnit::AShard(slot, shard_index)), next) + } WorkBuffer::EwrapBoundary(block, epoch) => ( - Some(InternalWorkUnit::EWrap(block.slot())), + Some(InternalWorkUnit::Ewrap(block.slot())), Self::EstartBoundary(block, epoch + 1), ), WorkBuffer::EstartBoundary(block, epoch) => ( @@ -199,6 +273,11 @@ mod tests { use crate::model::{EraBoundary, EraSummary}; use dolos_testing::blocks::make_conway_block; + /// Tests that exercise the WorkBuffer state machine use a small shard + /// count to keep per-epoch boundary sequences manageable + /// (`AShard ×N` + `Ewrap`). + const TEST_TOTAL_SHARDS: u32 = 1; + /// Single Conway era, epoch_length=100, slot_length=1. /// Epoch boundaries at slots 0, 100, 200, ... fn test_chain_summary() -> ChainSummary { @@ -243,7 +322,12 @@ mod tests { last: batch.last_slot(), }, InternalWorkUnit::Rupd(s) => WorkTag::Rupd(*s), - InternalWorkUnit::EWrap(s) => WorkTag::EWrap(*s), + // For test purposes, collapse the epoch-boundary phases + // (AShard ×N + Ewrap) into a single `EWrap` tag keyed by + // the boundary slot. Tests care that boundary work was produced + // at all, not about phase count. + InternalWorkUnit::Ewrap(s) => WorkTag::EWrap(*s), + InternalWorkUnit::AShard(s, _) => WorkTag::EWrap(*s), InternalWorkUnit::EStart(s) => WorkTag::EStart(*s), InternalWorkUnit::ForcedStop => WorkTag::ForcedStop, } @@ -265,7 +349,7 @@ mod tests { if buf.can_receive_block() { break; } - let (wu, next) = buf.pop_work(stop_epoch); + let (wu, next) = buf.pop_work(stop_epoch, TEST_TOTAL_SHARDS); buf = next; if let Some(wu) = wu { tags.push(tag_from_internal(&wu)); @@ -274,12 +358,12 @@ mod tests { } } let block = make_block(slot); - buf = buf.receive_block(block, eras, stability_window); + buf = buf.receive_block(block, eras, stability_window, TEST_TOTAL_SHARDS); } // drain remaining loop { - let (wu, next) = buf.pop_work(stop_epoch); + let (wu, next) = buf.pop_work(stop_epoch, TEST_TOTAL_SHARDS); buf = next; match wu { Some(ref wu) if matches!(wu, InternalWorkUnit::ForcedStop) => { @@ -314,7 +398,7 @@ mod tests { if buf.can_receive_block() { break; } - let (wu, next) = buf.pop_work(stop_epoch); + let (wu, next) = buf.pop_work(stop_epoch, TEST_TOTAL_SHARDS); buf = next; if let Some(wu) = wu { update_cursor(&wu, &mut cursor); @@ -324,12 +408,12 @@ mod tests { } } let block = make_block(slot); - buf = buf.receive_block(block, eras, stability_window); + buf = buf.receive_block(block, eras, stability_window, TEST_TOTAL_SHARDS); } // drain remaining loop { - let (wu, next) = buf.pop_work(stop_epoch); + let (wu, next) = buf.pop_work(stop_epoch, TEST_TOTAL_SHARDS); buf = next; match wu { Some(ref wu) if matches!(wu, InternalWorkUnit::ForcedStop) => { diff --git a/crates/cardano/work_units.md b/crates/cardano/work_units.md new file mode 100644 index 000000000..24c47c03d --- /dev/null +++ b/crates/cardano/work_units.md @@ -0,0 +1,141 @@ +# Cardano Work Units and Their Deltas + +## Natural sequence within an epoch + +``` +Estart → Roll … → Rupd → Roll … → AShard ×N → Ewrap +(open) (blocks) (RUPD) (blocks) (per-account) (global + close) + │ + ▼ + next epoch's Estart +``` + +`Genesis` runs once at chain bootstrap, before the first `Estart`. `ForcedStop` is a sentinel that ends the loop at a configured epoch. + +The sections below walk the cycle starting at `Estart` (the opener of every epoch). + +--- + +## 1. `EstartWorkUnit` — opens the epoch + +- Variants: `CardanoWorkUnit::Estart` / `InternalWorkUnit::EStart(BlockSlot)`. +- Struct: `estart::EstartWorkUnit` (`crates/cardano/src/estart/work_unit.rs`). +- Purpose: roll the ledger into the new epoch — nonce transition, snapshot shifts, pot recalc, era transitions, and opening the new `EpochState.end` slot. +- Deltas emitted (4): + - `NonceTransition` — `estart/nonces.rs:40`. Targets `epochs`. + - `AccountTransition` — `estart/reset.rs:127`. One per account. Targets `accounts`. + - `PoolTransition` — `estart/reset.rs:138`. One per pool. Targets `pools`. + - `EpochTransition` — `estart/reset.rs:148`. Single. Targets `epochs`. In addition to rotating `number`/`initial_pots`/`rolling`/`pparams`, also seeds `EpochState.end = Some(EndStats::default())` and resets `ashard_progress = None` for the new epoch — at the next boundary, AShards populate the reward accumulator fields directly via `EpochEndAccumulate`, then Ewrap reads them back, assembles the final `EndStats` (combining accumulators with the prepare-time fields), and emits `EpochWrapUp` to close. +- Direct writes: `EraSummary` writes during era transitions (Shelley→Allegra etc.). +- Namespaces touched: `accounts`, `pools`, `epochs`, `eras`. + +## 2. `RollWorkUnit` — applies a batch of blocks + +- Variants: `CardanoWorkUnit::Roll` / `InternalWorkUnit::Blocks(WorkBatch)`. +- Struct: `roll::RollWorkUnit` (`crates/cardano/src/roll/work_unit.rs`). +- Purpose: process per-transaction effects for a batch of blocks. Fires repeatedly across the epoch — once before RUPD, once after, with potentially many sub-batches each. +- Deltas emitted (22 distinct variants, 41 sites): + - **Accounts** (`roll/accounts.rs`): + - `ControlledAmountInc:102` + - `ControlledAmountDec:78` + - `StakeRegistration:125` + - `StakeDelegation:129` + - `StakeDeregistration:133` + - `VoteDelegation:137` + - `WithdrawalInc:191` + - `EnqueueMir:163,167` + - **Pools** (`roll/pools.rs`): + - `MintedBlocksInc:35` + - `PoolRegistration:46` + - `PoolDeRegistration:68,77` + - **DReps** (`roll/dreps.rs`): + - `DRepRegistration:74` + - `DRepUnRegistration:83` + - `DRepActivity:53,93` + - **Proposals** (`roll/proposals.rs`): + - `NewProposal:286,329` + - **Assets** (`roll/assets.rs`): + - `MintStatsUpdate:35` + - `MetadataTxUpdate:46,79` + - **Datums** (`roll/datums.rs`): + - `DatumRefIncrement:70` + - `DatumRefDecrement:87` + - **Epoch rolling stats** (`roll/epochs.rs`): + - `EpochStatsUpdate:99` + - `NoncesUpdate:115` +- Namespaces touched: `accounts`, `pools`, `dreps`, `proposals`, `assets`, `datums`, `epochs`, `pending_mirs`. + +## 3. `RupdWorkUnit` — computes rewards at the stability window + +- Variants: `CardanoWorkUnit::Rupd` / `InternalWorkUnit::Rupd(BlockSlot)`. +- Struct: `rupd::RupdWorkUnit` (`crates/cardano/src/rupd/work_unit.rs`). +- Purpose: at `randomness_stability_window` into the epoch, compute the reward distribution and persist pending rewards. +- Deltas: **none**. Writes `PendingRewardState` entities directly and updates `EpochState.incentives`. +- Namespaces touched: `pending_rewards`, `epochs`. + +## 4. `AShardWorkUnit` — per-account reward application (×N shards) + +- Variants: `CardanoWorkUnit::AShard` / `InternalWorkUnit::AShard(BlockSlot, u32)`. +- Struct: `ashard::AShardWorkUnit` (`crates/cardano/src/ashard/work_unit.rs`). Runs `total_shards` times in sequence (default 16), each scoped to a first-byte prefix bucket of the account key space. +- Purpose: apply rewards + drops for the accounts in this shard's range; accumulate the shard's reward contribution into `EpochState.end`. This is the first phase of the epoch-boundary pipeline — `Ewrap` (globals + close) follows. +- Deltas emitted (4 variants per shard run): + - **Rewards** (`ashard/rewards.rs`): + - `AssignRewards:79` (one per rewarded account in range) + - **Drops** (account-level, `ewrap/drops.rs`, used by both Ewrap and AShard): + - `PoolDelegatorRetire:32` *(also fires in Ewrap for non-account targets)* + - `DRepDelegatorDrop:53` *(also fires in Ewrap for non-account targets)* + - **Accumulator** (`ashard/loading.rs`): + - `EpochEndAccumulate` (rolls up the shard's `effective` / `unspendable_to_treasury` / `unspendable_to_reserves` totals into `EpochState.end`, and writes `ashard_progress = Some(AShardProgress { committed: shard_index + 1, total })`. The persisted `total` snapshots the boundary's shard count at the first commit so a config change between shards — e.g. across a crash and restart — can't break the in-flight pipeline) +- Direct deletes: `PendingRewardState` entries for credentials whose rewards landed. +- Namespaces touched: `accounts` (shard range), `epochs`. Deletes from `pending_rewards` (shard range). + +## 5. `EwrapWorkUnit` — global epoch-boundary work + close + +- Variants: `CardanoWorkUnit::Ewrap` / `InternalWorkUnit::Ewrap(BlockSlot)`. +- Struct: `ewrap::EwrapWorkUnit` (`crates/cardano/src/ewrap/work_unit.rs`). +- Purpose: classify retiring pools / expiring dreps / enacting+dropping proposals, process pending MIRs, run enactment + refund + wrapup-global visitors, then close the boundary by emitting `EpochWrapUp` with the assembled final `EndStats` (prepare-time fields combined with the accumulator fields populated by the preceding AShards). Also writes the completed `EpochState` to archive. +- Deltas emitted (10 distinct variants, 11 sites): + - **Enactment** (`ewrap/enactment.rs`): + - `PParamsUpdate:36,39` + - `TreasuryWithdrawal:43` + - **Refunds** (`ewrap/refunds.rs`): + - `PoolDepositRefund:88` + - `ProposalDepositRefund:39,62` + - **Drops** (drep-level, `ewrap/drops.rs`): + - `DRepExpiration:67` + - `DRepDelegatorDrop:53` *(also fires in AShard for accounts)* + - `PoolDelegatorRetire:32` *(also fires in AShard for accounts)* + - **Wrap-up globals** (`ewrap/wrapup.rs`): + - `PoolWrapUp:119` + - `EpochWrapUp` (carries the final assembled `EndStats`; apply overwrites `entity.end`, rotates `rolling`/`pparams` snapshots forward, clears `ashard_progress`) + - **MIR processing** (`ewrap/loading.rs`): + - `AssignRewards:207` (one per registered MIR recipient) +- Direct deletes: `PendingMirState` entries for processed MIRs. +- Direct writes: writes the completed `EpochState` to the archive under the epoch-start temporal key. +- Namespaces touched: `pools`, `dreps`, `proposals`, `accounts` (MIR recipients), `epochs`. Deletes from `pending_mirs`. + +## 6. `GenesisWorkUnit` — one-time chain bootstrap + +- Variants: `CardanoWorkUnit::Genesis` / `InternalWorkUnit::Genesis`. +- Struct: `genesis::GenesisWorkUnit` (`crates/cardano/src/genesis/mod.rs`). +- Purpose: bootstrap state from genesis configs (Byron/Shelley/Alonzo/Conway). +- Deltas: **none**. Writes `EpochState` and `EraSummary` entities directly via `bootstrap_epoch` / `bootstrap_eras`. +- Namespaces touched: `epochs`, `eras`. + +## 7. `ForcedStop` — sentinel + +- Variant: `CardanoWorkUnit::ForcedStop` / `InternalWorkUnit::ForcedStop`. +- No struct, no load/compute/commit. Causes the sync loop to stop at the configured `stop_epoch`. + +--- + +## Defined deltas with no emission site + +The following variants exist on `CardanoDelta` (`crates/cardano/src/model/mod.rs`) but no work unit emits them today: + +- `DequeueMir` +- `EnqueueReward` +- `DequeueReward` +- `SetEpochIncentives` + +Pending entities (`PendingMirState`, `PendingRewardState`) are removed via direct `delete_entity` calls instead. TODO: either wire these deltas in or remove the variants. diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index c31c836c7..96a2f2f61 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -984,6 +984,21 @@ pub struct CardanoConfig { #[serde(default, skip_serializing_if = "Vec::is_empty")] pub custom_utxos: Vec, + + /// Number of shards used to partition the per-account leg of the + /// epoch-boundary pipeline (see `AShardWorkUnit`). Must divide 256 + /// (so shards are whole first-byte prefix buckets) and be >= 1. When + /// `None`, defaults to `CardanoConfig::DEFAULT_ACCOUNT_SHARDS`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub account_shards: Option, +} + +impl CardanoConfig { + pub const DEFAULT_ACCOUNT_SHARDS: u32 = 16; + + pub fn account_shards(&self) -> u32 { + self.account_shards.unwrap_or(Self::DEFAULT_ACCOUNT_SHARDS) + } } #[derive(Serialize, Deserialize, Clone)] diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index dab4b9941..803f66f0d 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -423,6 +423,9 @@ pub enum ChainError { #[error("genesis field missing: {0}")] GenesisFieldMissing(String), + #[error("invalid chain config: {0}")] + InvalidConfig(String), + #[error("protocol params not found: {0}")] PParamsNotFound(String), diff --git a/skills/debug-epoch-mismatch/SKILL.md b/skills/debug-epoch-mismatch/SKILL.md index e054417a1..12074a86b 100644 --- a/skills/debug-epoch-mismatch/SKILL.md +++ b/skills/debug-epoch-mismatch/SKILL.md @@ -55,15 +55,16 @@ The test compares 6 datasets. The failure pattern reveals the root cause: | Pattern | Likely Cause | Work Unit | |---------|-------------|-----------| -| Only `epochs` differs (treasury/reserves off) | Pot calculation -- unspendable routing, incentive formula, or delta field | EWRAP/ESTART | -| Equal and opposite delta between rewards and treasury | Unspendable reward routing issue | EWRAP | -| Rewards match but pots don't | Pot aggregation or unspendable handling | EWRAP/ESTART | +| Only `epochs` differs (treasury/reserves off) | Pot calculation -- unspendable routing, incentive formula, or delta field | ASHARD/EWRAP/ESTART | +| Equal and opposite delta between rewards and treasury | Unspendable reward routing issue | ASHARD | +| Rewards match but pots don't | Pot aggregation or unspendable handling | ASHARD/EWRAP/ESTART | | `delegation` + `stake` + `rewards` + `epochs` all differ | Single stake discrepancy cascading | ROLL/ESTART | | Thousands of small reward diffs (±1 lovelace) | One pool's total stake is wrong, rounding cascade | ROLL/ESTART | -| `rewards` has extra/missing rows | RUPD pre-filtering or EWRAP registration check | RUPD/EWRAP | +| `rewards` has extra/missing rows | RUPD pre-filtering or EWRAP-time registration check | RUPD/ASHARD | | `delegation` off by exactly 500,000,000 | Pool deposit refund timing | ROLL (POOLREAP) | | `delegation` off by exactly 2,000,000 | Key deposit timing | ROLL | | Only subset of pools/accounts affected | Registration/retirement window or pool param update | ROLL | +| MIR / treasury-reserves transfer mismatch | MIR application or routing in boundary close | EWRAP | ## Step 4: Identify the Root Account/Pool @@ -121,12 +122,15 @@ ORDER BY b.slot_no; ### Dolos Work Units → Haskell Concepts +The boundary pipeline runs `ASHARD ×N → EWRAP` at the end of each epoch (then `ESTART` opens the next one). `ASHARD` and `EWRAP` are split: per-account effects settle first across N shards, then a single `EWRAP` does the global / boundary-close work. + | Dolos | Haskell | What it does | |-------|---------|-------------| | `ROLL` | Block processing | Applies transactions, certificates, updates pool/account state | | `RUPD` | Reward calculation at stability window | Computes pending rewards using mark snapshot | -| `EWRAP` | `applyRUpd` + reward filtering | Applies rewards, filters unspendable, updates pots | -| `ESTART` | NEWEPOCH transition | Rotates snapshots, computes initial pots, creates new epoch | +| `ASHARD` | `applyRUpd` (per-account) + reward filtering | Per-account: applies pending rewards to registered accounts, filters/routes unspendable rewards (treasury vs reserves), accumulates per-shard contributions into `EpochState.end` | +| `EWRAP` | `applyMIR` + boundary close | Global / once-per-boundary: applies MIRs, processes pool/proposal refunds, finalizes `EndStats`, rotates `pparams` / `rolling` snapshots forward | +| `ESTART` | NEWEPOCH transition | Rotates remaining snapshots, computes initial pots, creates new epoch | ### Key Source Files @@ -135,7 +139,10 @@ ORDER BY b.slot_no; | Pots & incentives | `crates/cardano/src/pots.rs` | | Model types (EpochValue, etc.) | `crates/cardano/src/model.rs` | | ESTART / epoch transition | `crates/cardano/src/estart/reset.rs` | -| EWRAP / reward application | `crates/cardano/src/ewrap/rewards.rs` | +| ASHARD / reward application & unspendable routing | `crates/cardano/src/ashard/rewards.rs` | +| ASHARD / shard partitioning & wiring | `crates/cardano/src/ashard/{loading,commit,shard}.rs` | +| EWRAP / MIR + refunds + wrap-up | `crates/cardano/src/ewrap/{enactment,refunds,wrapup}.rs` | +| EWRAP / boundary-close commit | `crates/cardano/src/ewrap/commit.rs` | | RUPD / reward calculation | `crates/cardano/src/rupd/loading.rs` | | ROLL / certificate processing | `crates/cardano/src/roll/accounts.rs` | | ROLL / batch delta application | `crates/cardano/src/roll/batch.rs` | @@ -163,9 +170,12 @@ Use targeted, temporary logs that point to a single hypothesis: |------------|-------------------| | Missing/wrong block attribution | Pool header data in ROLL visitor | | Wrong pool block count | Pool block counts in RUPD loading | -| Wrong reward amounts | Reward map entries before EWRAP flush | +| Wrong reward amounts | Reward map entries in ASHARD rewards visitor (`ashard/rewards.rs`) before commit | +| Wrong unspendable routing (treasury vs reserves) | ASHARD rewards visitor + `EpochEndAccumulate` deltas | +| Wrong MIR amount or routing | EWRAP enactment / wrap-up (`ewrap/enactment.rs`, `ewrap/wrapup.rs`) | +| Wrong pool / proposal refund | EWRAP refunds visitor (`ewrap/refunds.rs`) | | Wrong pot delta | `apply_delta()` inputs in ESTART | -| Registration boundary issue | Account registration checks at RUPD/EWRAP boundary | +| Registration boundary issue | Account registration checks at RUPD entry and ASHARD reward-application time | Guidelines: - Use `eprintln!` for focused logs diff --git a/tests/epoch_pots/main.rs b/tests/epoch_pots/main.rs index 36c8a3815..bbe7297ce 100644 --- a/tests/epoch_pots/main.rs +++ b/tests/epoch_pots/main.rs @@ -201,8 +201,9 @@ fn dump_stake_csv( Ok(()) } -/// Dump rewards that were actually applied (spendable) at EWRAP time. -/// This filters out rewards for accounts that deregistered between RUPD and EWRAP. +/// Dump rewards that were actually applied (spendable) during the +/// `AShard` phase. Filters out rewards for accounts that deregistered +/// between RUPD and the boundary. fn dump_applied_rewards_csv( applied_rewards: &[AppliedReward], network: Network, @@ -460,6 +461,12 @@ fn run_epoch_pots_test( // When estart fires for epoch N+1, ended_state() holds the completed epoch N. let mut captured_epoch: Option = None; + // Accumulate applied rewards across all AShard work units for the + // subject boundary. Each shard holds only its own slice in memory, but + // the test harness needs the full set at the end for the CSV dump. + let mut accumulated_applied: Vec = Vec::new(); + let mut accumulated_ending_epoch: Option = None; + harness .run(100, |_domain, work| { match work { @@ -490,16 +497,27 @@ fn run_epoch_pots_test( } } } - CardanoWorkUnit::Ewrap(ewrap) => { - // Dump rewards from EWRAP (only actually applied/spendable rewards) - if let Some(boundary) = ewrap.boundary() { - // performance_epoch = ending_epoch - 1 - // For epoch 214 ending, rewards are for performance_epoch 213 - let ending_epoch = boundary.ending_state().number; + CardanoWorkUnit::AShard(shard) => { + if shard.shard_index() == 0 { + // First shard of this boundary — reset accumulator + // and capture the ending epoch. + accumulated_applied.clear(); + if let Some(boundary) = shard.boundary() { + accumulated_ending_epoch = + Some(boundary.ending_state().number); + } + } + if let Some(boundary) = shard.boundary() { + accumulated_applied.extend(boundary.applied_rewards.iter().cloned()); + } + } + CardanoWorkUnit::Ewrap(_) => { + // Boundary close — dump accumulated rewards. + if let Some(ending_epoch) = accumulated_ending_epoch.take() { if ending_epoch >= 1 { let performance_epoch = ending_epoch - 1; if let Err(e) = dump_applied_rewards_csv( - &boundary.applied_rewards, + &accumulated_applied, cardano_network, &dumps_dir, performance_epoch, @@ -507,6 +525,7 @@ fn run_epoch_pots_test( eprintln!("failed to dump rewards csv: {e}"); } } + accumulated_applied.clear(); } } _ => {} diff --git a/tests/memory.rs b/tests/memory.rs index ba216b5d8..303a79c8a 100644 --- a/tests/memory.rs +++ b/tests/memory.rs @@ -86,3 +86,118 @@ fn test_redb3_lazy_iter() { assert_lazy_iter(&store); } + +// --------------------------------------------------------------------------- +// Per-shard range iteration. +// +// AShard work units use key-range iteration to bound per-shard memory. +// This test verifies the property end-to-end: given a store with N entities +// distributed across the full first-byte prefix space, iterating a single +// first-byte prefix range must allocate O(1) on the iterator side. If it +// regresses (e.g. a backend materialising the whole range), AShards +// would stop being memory-bounded. +// --------------------------------------------------------------------------- + +const SHARD_ENTITY_COUNT: u64 = 50_000; +const SHARD_KEY_PREFIX_RANGE: std::ops::Range = 0x10..0x20; // one 16-bucket shard + +fn seed_account_namespace(store: &S) { + let value = vec![0xABu8; ENTITY_SIZE]; + + let mut written = 0u64; + while written < SHARD_ENTITY_COUNT { + let batch_end = std::cmp::min(written + BATCH_SIZE, SHARD_ENTITY_COUNT); + let writer = store.start_writer().expect("start_writer failed"); + for i in written..batch_end { + // Spread keys across the full first-byte space so a shard-range + // iteration only hits the intended slice. + let mut key_bytes = [0u8; 32]; + key_bytes[0] = (i % 256) as u8; + key_bytes[1..9].copy_from_slice(&i.to_be_bytes()); + let key = EntityKey::from(&key_bytes); + writer + .write_entity(NS, &key, &value) + .expect("write_entity failed"); + } + writer.commit().expect("commit failed"); + written = batch_end; + } +} + +fn assert_shard_range_iter(store: &S) { + seed_account_namespace(store); + + // Build a half-open Range spanning one first-byte prefix + // bucket — this is the same shape AShardWorkUnit uses. + let mut start_bytes = [0u8; 32]; + start_bytes[0] = SHARD_KEY_PREFIX_RANGE.start; + let mut end_bytes = [0u8; 32]; + end_bytes[0] = SHARD_KEY_PREFIX_RANGE.end; + let range = std::ops::Range { + start: EntityKey::from(&start_bytes), + end: EntityKey::from(&end_bytes), + }; + + let reg = Region::new(GLOBAL); + + let iter = store + .iter_entities(NS, range) + .expect("iter_entities with range failed"); + + let stats = reg.change(); + let heap_delta = stats.bytes_allocated; + + let threshold = 10 * 1024 * 1024; // 10 MB + assert!( + heap_delta < threshold, + "shard-range iter_entities should allocate O(1) memory. \ + Allocated {} bytes but threshold is {} bytes.", + heap_delta, + threshold, + ); + + // The actual iterator consumption is bounded by the shard size. + // With 50,000 evenly-distributed keys over 256 prefixes, each bucket + // should hold ~195 entries; one 16-prefix shard should hold ~3,120. + // We just assert it's non-empty and much smaller than the full store. + let count = iter.count(); + assert!( + count > 0, + "shard range should contain some entities (got 0)" + ); + assert!( + (count as u64) < SHARD_ENTITY_COUNT / 4, + "shard range should be a strict subset (got {} of {})", + count, + SHARD_ENTITY_COUNT, + ); +} + +#[test] +fn test_fjall_shard_range_iter() { + let tmpdir = tempfile::tempdir().expect("failed to create tempdir"); + let config = FjallStateConfig { + path: None, + cache: Some(64), + max_history: None, + max_journal_size: None, + flush_on_commit: Some(false), + l0_threshold: None, + worker_threads: Some(1), + memtable_size_mb: None, + }; + let store = + dolos_fjall::StateStore::open(tmpdir.path(), &config).expect("failed to open fjall store"); + + assert_shard_range_iter(&store); +} + +#[test] +fn test_redb3_shard_range_iter() { + let mut schema = StateSchema::default(); + schema.insert(NS, NamespaceType::KeyValue); + let store = + dolos_redb3::state::StateStore::in_memory(schema).expect("failed to create redb3 store"); + + assert_shard_range_iter(&store); +}