Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions crates/cardano/src/ashard/commit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! Commit logic for `AShardWorkUnit`.
//!
//! Adds the AShard-specific commit method to `BoundaryWork`. Reuses the
//! shared `stream_and_apply_namespace` helper that lives in `ewrap/commit.rs`.

use dolos_core::{
ArchiveStore, ArchiveWriter, ChainError, ChainPoint, Domain, LogKey, StateStore, StateWriter,
TemporalKey,
};
use tracing::{debug, instrument, warn};

use crate::{
ewrap::BoundaryWork, rupd::credential_to_key, AccountState, EpochState, FixedNamespace,
PendingRewardState,
};

impl BoundaryWork {
/// Commit a single account shard (ashard): apply per-account deltas
/// (rewards + drops) and the `EpochEndAccumulate` delta against
/// `EpochState`, flush archive logs (`{Leader,Member}RewardLog`), delete
/// applied pending rewards.
#[instrument(skip(self, state, archive))]
pub fn commit_ashard<D: Domain>(
&mut self,
state: &D::State,
archive: &D::Archive,
range: std::ops::Range<dolos_core::EntityKey>,
) -> Result<(), ChainError> {
debug!("committing ashard changes");

let writer = state.start_writer()?;
let archive_writer = archive.start_writer()?;

// Stream accounts in this shard's range only.
self.stream_and_apply_namespace::<D, AccountState>(state, &writer, Some(range))?;

// EpochState gets the EpochEndAccumulate delta (single entity).
self.stream_and_apply_namespace::<D, EpochState>(state, &writer, None)?;

// Delete applied pending rewards.
debug!(
count = self.applied_reward_credentials.len(),
"deleting applied pending rewards"
);
for credential in self.applied_reward_credentials.drain(..) {
let key = credential_to_key(&credential);
writer.delete_entity(PendingRewardState::NS, &key)?;
}

// Any unspendable rewards left in the map after flush (i.e. those not
// in drain_unspendable — shouldn't happen today but kept for safety).
if !self.rewards.is_empty() {
warn!(
remaining = self.rewards.len(),
"draining remaining pending rewards (shard)"
);
for (credential, _) in self.rewards.iter_pending() {
let key = credential_to_key(credential);
writer.delete_entity(PendingRewardState::NS, &key)?;
}
}

// Archive logs — share the epoch-start temporal key across shards.
let start_of_epoch = self.chain_summary.epoch_start(self.ending_state().number);
let temporal_key = TemporalKey::from(&ChainPoint::Slot(start_of_epoch));

debug!(log_count = self.logs.len(), "writing shard archive logs");
for (entity_key, log) in self.logs.drain(..) {
let log_key = LogKey::from((temporal_key.clone(), entity_key));
archive_writer.write_log_typed(&log_key, &log)?;
}

if !self.deltas.entities.is_empty() {
warn!(quantity = %self.deltas.entities.len(), "uncommitted shard deltas");
}

writer.commit()?;
archive_writer.commit()?;

debug!("ashard commit complete");
Ok(())
}
}
137 changes: 137 additions & 0 deletions crates/cardano/src/ashard/loading.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//! Load + compute helpers for `AShardWorkUnit`.
//!
//! Adds AShard-specific methods to `BoundaryWork` (defined in `ewrap`).
//! The shared boundary helpers (`new_empty`, `load_pool_data`,
//! `load_drep_data`) live in `ewrap/loading.rs`; this file builds on top.

use std::{collections::HashMap, ops::Range, sync::Arc};

use dolos_core::{ChainError, Domain, EntityKey, Genesis, StateStore};
use pallas::ledger::primitives::StakeCredential;

use crate::{
ewrap::{BoundaryVisitor as _, BoundaryWork},
rewards::{Reward, RewardMap},
AccountState, FixedNamespace as _, PendingRewardState,
};

impl BoundaryWork {
/// Range-load pending rewards from state store (persisted by RUPD) into
/// `self.rewards`. `range = Some(r)` restricts iteration to a shard's
/// key range; `None` loads everything (kept for completeness — currently
/// unused since the only caller is `load_ashard`, which always passes a
/// shard range).
fn load_pending_rewards_range<D: Domain>(
&mut self,
state: &D::State,
range: Option<Range<EntityKey>>,
) -> Result<(), ChainError> {
let pending_iter = state
.iter_entities_typed::<PendingRewardState>(PendingRewardState::NS, range)?;

let mut pending: HashMap<StakeCredential, Reward> = HashMap::new();

for record in pending_iter {
let (_, pending_state) = record?;
let credential = pending_state.credential.clone();
let reward = Reward::from_pending_state(&pending_state);
pending.insert(credential, reward);
}

let pending_total: u64 = pending.values().map(|r| r.total_value()).sum();
let spendable_count = pending.values().filter(|r| r.is_spendable()).count();
let unspendable_count = pending.len() - spendable_count;

tracing::debug!(
pending_count = pending.len(),
%pending_total,
%spendable_count,
%unspendable_count,
"loaded pending rewards from state"
);

let incentives = self.rewards.incentives().clone();
self.rewards = RewardMap::from_pending(pending, incentives);

Ok(())
}

/// Load + compute for an `AShard` phase:
/// * reload the small classifications that drops.visit_account needs
/// (retiring_pools, retiring_dreps, reregistrating_dreps),
/// * range-load pending rewards for this shard's key range,
/// * iterate accounts in range, applying rewards+drops visitors, and
/// * emit an `EpochEndAccumulate` delta carrying the shard's reward
/// contribution.
pub fn load_ashard<D: Domain>(
state: &D::State,
genesis: Arc<Genesis>,
shard_index: u32,
total_shards: u32,
range: Range<EntityKey>,
) -> Result<BoundaryWork, ChainError> {
let mut boundary = Self::new_empty::<D>(state, genesis)?;

// drops.visit_account needs retiring_pools + retiring_dreps +
// reregistrating_dreps. These sets are small (handful per epoch) so
// re-classifying them per shard is cheap.
boundary.load_pool_data::<D>(state)?;
boundary.load_drep_data::<D>(state)?;

boundary.load_pending_rewards_range::<D>(state, Some(range.clone()))?;

boundary.compute_shard_deltas::<D>(state, range, shard_index, total_shards)?;

Ok(boundary)
}

fn compute_shard_deltas<D: Domain>(
&mut self,
state: &D::State,
range: Range<EntityKey>,
shard_index: u32,
total_shards: u32,
) -> Result<(), ChainError> {
let mut visitor_rewards = super::rewards::BoundaryVisitor::default();
let mut visitor_drops = crate::ewrap::drops::BoundaryVisitor::default();

let accounts =
state.iter_entities_typed::<AccountState>(AccountState::NS, Some(range))?;

for record in accounts {
let (account_id, account) = record?;
// HACK: rewards must apply before drops. Rewards update the live
// value before the snapshot; drops schedule refunds for after the
// snapshot. If reordered, the rewards would be overwritten by the
// refund schedule. With this order, the refund clones the live
// values with rewards already applied.
// TODO: move retires to ESTART (after the snapshot has been taken)
// and drop this ordering hack.
visitor_rewards.visit_account(self, &account_id, &account)?;
visitor_drops.visit_account(self, &account_id, &account)?;
}

visitor_rewards.flush(self)?;
visitor_drops.flush(self)?;

// Snapshot the reward-map counters for this shard and emit the
// accumulator delta. The RewardMap's applied_* counters reflect only
// this shard's contribution (the map was created fresh for this shard
// with just this shard's pending rewards).
self.shard_applied_effective = self.rewards.applied_effective();
self.shard_applied_unspendable_to_treasury =
self.rewards.applied_unspendable_to_treasury();
self.shard_applied_unspendable_to_reserves =
self.rewards.applied_unspendable_to_reserves();

self.add_delta(crate::EpochEndAccumulate::new(
self.shard_applied_effective,
self.shard_applied_unspendable_to_treasury,
self.shard_applied_unspendable_to_reserves,
shard_index,
total_shards,
));

Ok(())
}
}
14 changes: 14 additions & 0 deletions crates/cardano/src/ashard/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! AShard work unit — per-account leg of the epoch-boundary pipeline.
//!
//! Builds on the shared `BoundaryWork` / `BoundaryVisitor` infrastructure
//! defined in `crate::ewrap`. The drops visitor (used by both phases) also
//! lives in `ewrap`; this module owns only the AShard-specific work
//! unit, the rewards visitor, and the key-range partitioning helpers.

pub mod commit;
pub mod loading;
pub mod rewards;
pub mod shard;
pub mod work_unit;

pub use work_unit::AShardWorkUnit;
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use dolos_core::{ChainError, EntityKey};
use tracing::debug;

use crate::{
ewrap::AppliedReward, AccountState, AssignRewards, CardanoDelta, CardanoEntity,
LeaderRewardLog, MemberRewardLog,
ewrap::{AccountId, AppliedReward, BoundaryWork},
AccountState, AssignRewards, CardanoDelta, CardanoEntity, LeaderRewardLog, MemberRewardLog,
};

#[derive(Default)]
Expand All @@ -23,11 +23,11 @@ impl BoundaryVisitor {
}
}

impl super::BoundaryVisitor for BoundaryVisitor {
impl crate::ewrap::BoundaryVisitor for BoundaryVisitor {
fn visit_account(
&mut self,
ctx: &mut super::BoundaryWork,
id: &super::AccountId,
ctx: &mut BoundaryWork,
id: &AccountId,
account: &AccountState,
) -> Result<(), ChainError> {
let Some(reward) = ctx.rewards.take_for_apply(&account.credential) else {
Expand Down Expand Up @@ -79,6 +79,9 @@ impl super::BoundaryVisitor for BoundaryVisitor {
self.change(AssignRewards::new(id.clone(), reward.total_value()));

for (pool, value, as_leader) in reward.into_vec() {
// Per-shard bookkeeping: each shard's applied_rewards is bounded
// by the shard's account count, so holding it for the duration of
// the shard is fine (the memory bound is per-shard, not epoch).
ctx.applied_rewards.push(AppliedReward {
credential: account.credential.clone(),
pool,
Expand Down Expand Up @@ -108,7 +111,7 @@ impl super::BoundaryVisitor for BoundaryVisitor {
Ok(())
}

fn flush(&mut self, ctx: &mut super::BoundaryWork) -> Result<(), ChainError> {
fn flush(&mut self, ctx: &mut BoundaryWork) -> Result<(), ChainError> {
let mark_protocol = ctx
.ending_state()
.pparams
Expand Down
105 changes: 105 additions & 0 deletions crates/cardano/src/ashard/shard.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
//! Shard range helper for account-shard key-range partitioning.
//!
//! Shards partition per-account boundary work by first-byte prefix of the
//! `EntityKey`. `total_shards` must divide 256 so that each shard covers an
//! equal whole number of prefix buckets (e.g. at `total_shards = 16` each
//! shard owns 16 consecutive prefix values).
//!
//! The state-store iterator takes a half-open `Range<EntityKey>`, so shard
//! `i` gets:
//!
//! - `start = [i*step, 0, 0, ..., 0]`
//! - `end = [(i+1)*step, 0, 0, ..., 0]` for `i < total - 1`
//! - `end = [0xFF; KEY_SIZE]` for the final shard, matching
//! `EntityKey::full_range`'s convention.

use std::ops::Range;

use dolos_core::{EntityKey, KEY_SIZE};

pub const PREFIX_SPACE: u32 = 256;

/// Return `Ok(())` if `total_shards` is a valid sharding factor (>= 1 and
/// divides 256).
pub fn validate_total_shards(total_shards: u32) -> Result<(), String> {
if total_shards == 0 {
return Err("account_shards must be >= 1".into());
}
if !PREFIX_SPACE.is_multiple_of(total_shards) {
return Err(format!(
"account_shards ({total_shards}) must divide {PREFIX_SPACE}"
));
}
Ok(())
}

/// Compute the key range for `shard_index` out of `total_shards`.
pub fn shard_key_range(shard_index: u32, total_shards: u32) -> Range<EntityKey> {
debug_assert!(validate_total_shards(total_shards).is_ok());
debug_assert!(shard_index < total_shards);

let step = PREFIX_SPACE / total_shards;
let first_byte = (shard_index * step) as u8;

let mut start = [0u8; KEY_SIZE];
start[0] = first_byte;

let end = if shard_index + 1 == total_shards {
[0xFFu8; KEY_SIZE]
} else {
let next_first_byte = ((shard_index + 1) * step) as u8;
let mut end = [0u8; KEY_SIZE];
end[0] = next_first_byte;
end
};

Range {
start: EntityKey::from(&start),
end: EntityKey::from(&end),
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn shards_cover_full_range_at_16() {
let ranges: Vec<_> = (0..16).map(|i| shard_key_range(i, 16)).collect();

assert_eq!(ranges[0].start.as_ref()[0], 0x00);
assert_eq!(ranges[0].end.as_ref()[0], 0x10);
assert_eq!(ranges[1].start.as_ref()[0], 0x10);
assert_eq!(ranges[15].start.as_ref()[0], 0xF0);
assert_eq!(ranges[15].end.as_ref(), &[0xFFu8; KEY_SIZE]);

// Adjacent ranges meet: end_i == start_{i+1} for i < 15
for i in 0..15 {
assert_eq!(
ranges[i].end.as_ref()[0],
ranges[i + 1].start.as_ref()[0],
"shards {i} and {} must meet",
i + 1
);
}
}

#[test]
fn shards_cover_full_range_at_1() {
let range = shard_key_range(0, 1);
assert_eq!(range.start.as_ref()[0], 0x00);
assert_eq!(range.end.as_ref(), &[0xFFu8; KEY_SIZE]);
}

#[test]
fn validates_total_shards() {
assert!(validate_total_shards(1).is_ok());
assert!(validate_total_shards(2).is_ok());
assert!(validate_total_shards(16).is_ok());
assert!(validate_total_shards(256).is_ok());

assert!(validate_total_shards(0).is_err());
assert!(validate_total_shards(3).is_err());
assert!(validate_total_shards(100).is_err());
}
}
Loading
Loading