Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions crates/cardano/src/estart/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use dolos_core::{
batch::WorkDeltas, config::CardanoConfig, BlockSlot, ChainError, Domain, EntityKey, Genesis,
};
use tracing::{debug, info, instrument};
use tracing::{info, instrument};

use crate::{
AccountState, CardanoDelta, CardanoEntity, CardanoLogic, DRepState, EpochState, EraProtocol,
Expand Down Expand Up @@ -104,13 +104,14 @@ pub fn execute<D: Domain>(
_config: &CardanoConfig,
genesis: Arc<Genesis>,
) -> Result<(), ChainError> {
let started = Instant::now();
info!("executing ESTART work unit");

let mut work = WorkContext::load::<D>(state, genesis)?;

work.commit::<D>(state, archive, slot)?;

debug!("ESTART work unit committed");
info!(elapsed =? started.elapsed(), "ESTART work unit committed");

Ok(())
}
7 changes: 4 additions & 3 deletions crates/cardano/src/ewrap/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{
collections::{HashMap, HashSet},
sync::Arc,
sync::Arc, time::Instant,
};

use dolos_core::{
batch::WorkDeltas, config::CardanoConfig, BlockSlot, ChainError, Domain, EntityKey, Genesis,
};
use pallas::ledger::primitives::conway::DRep;
use tracing::{debug, info, instrument};
use tracing::{info, instrument};

use crate::{
rewards::RewardMap, rupd::RupdWork, AccountState, CardanoDelta, CardanoEntity, CardanoLogic,
Expand Down Expand Up @@ -156,13 +156,14 @@ pub fn execute<D: Domain>(
genesis: Arc<Genesis>,
rewards: RewardMap<RupdWork>,
) -> Result<(), ChainError> {
let started = Instant::now();
info!("executing EWRAP work unit");

let mut boundary = BoundaryWork::load::<D>(state, genesis, rewards)?;

boundary.commit::<D>(state, archive)?;

debug!("EWRAP work unit committed");
info!(elapsed =? started.elapsed(), "EWRAP work unit committed");

Ok(())
}
4 changes: 2 additions & 2 deletions crates/cardano/src/genesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ pub fn bootstrap_utxos<D: Domain>(
let writer = state.start_writer()?;

let delta = crate::utxoset::compute_origin_delta(genesis);
writer.apply_utxoset(&delta)?;
writer.apply_utxoset(&delta, false)?;

let delta = crate::utxoset::build_custom_utxos_delta(config)?;
writer.apply_utxoset(&delta)?;
writer.apply_utxoset(&delta, false)?;

writer.commit()?;

Expand Down
5 changes: 4 additions & 1 deletion crates/cardano/src/rewards/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, marker::PhantomData};
use std::{collections::HashMap, marker::PhantomData, time::Instant};

use dolos_core::ChainError;
use pallas::ledger::primitives::StakeCredential;
Expand Down Expand Up @@ -331,6 +331,7 @@ pub trait RewardsContext {

pub fn define_rewards<C: RewardsContext>(ctx: &C) -> Result<RewardMap<C>, ChainError> {
let mut map = RewardMap::<C>::new(ctx.incentives().clone());
let start = Instant::now();

for pool in ctx.iter_all_pools() {
let pool_params = ctx.pool_params(pool);
Expand Down Expand Up @@ -419,6 +420,8 @@ pub fn define_rewards<C: RewardsContext>(ctx: &C) -> Result<RewardMap<C>, ChainE
}
}

tracing::info!(elapsed =? start.elapsed(), "finished rewards calculation");

Ok(map)
}

Expand Down
8 changes: 7 additions & 1 deletion crates/cardano/src/rupd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::{collections::{HashMap, HashSet}, time::Instant};

use dolos_core::{
ArchiveStore, ArchiveWriter, BlockSlot, ChainError, ChainPoint, Domain, EntityKey, Genesis,
Expand Down Expand Up @@ -111,6 +111,7 @@ fn log_work<D: Domain>(
rewards: &RewardMap<RupdWork>,
archive: &D::Archive,
) -> Result<(), ChainError> {
let started = Instant::now();
let Some((_, epoch)) = work.relevant_epochs() else {
return Ok(());
};
Expand Down Expand Up @@ -160,6 +161,8 @@ fn log_work<D: Domain>(

writer.commit()?;

info!(elapsed =? started.elapsed(), "log_work finished");

Ok(())
}

Expand All @@ -170,6 +173,7 @@ pub fn execute<D: Domain>(
slot: BlockSlot,
genesis: &Genesis,
) -> Result<RewardMap<RupdWork>, ChainError> {
let started = Instant::now();
info!(slot, "executing rupd work unit");

let work = RupdWork::load::<D>(state, genesis)?;
Expand All @@ -181,5 +185,7 @@ pub fn execute<D: Domain>(
// the time being for simplicity.
log_work::<D>(&work, &rewards, archive)?;

info!(elapsed =? started.elapsed(), "finished rupd work unit");

Ok(rewards)
}
2 changes: 1 addition & 1 deletion crates/core/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl<C: ChainLogic> WorkBatch<C> {
// this into the entity system.
for block in self.blocks.iter() {
if let Some(utxo_delta) = &block.utxo_delta {
writer.apply_utxoset(utxo_delta)?;
writer.apply_utxoset(utxo_delta, false)?;
}
}

Expand Down
12 changes: 10 additions & 2 deletions crates/core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, marker::PhantomData, ops::Range};
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use crate::{ChainError, ChainPoint, Domain, TxoRef, UtxoMap, UtxoSet, UtxoSetDelta};
use crate::{ChainError, ChainPoint, Domain, EraCbor, TxoRef, UtxoMap, UtxoSet, UtxoSetDelta};

pub const KEY_SIZE: usize = 32;

Expand Down Expand Up @@ -275,7 +275,10 @@ pub trait StateWriter: Sized + Send + Sync {

fn delete_entity(&self, ns: Namespace, key: &EntityKey) -> Result<(), StateError>;

fn apply_utxoset(&self, delta: &UtxoSetDelta) -> Result<(), StateError>;
fn apply_utxoset(&self, delta: &UtxoSetDelta, defer_indexes: bool) -> Result<(), StateError>;

/// Apply utxoset delta to indexes only.
fn index_utxoset(&self, delta: &UtxoSetDelta) -> Result<(), StateError>;

#[allow(clippy::double_must_use)]
#[must_use]
Expand Down Expand Up @@ -314,6 +317,7 @@ pub trait StateWriter: Sized + Send + Sync {
}

pub trait StateStore: Sized + Send + Sync + Clone {
type UtxoIter: Iterator<Item = Result<(TxoRef, EraCbor), StateError>>;
type EntityIter: Iterator<Item = Result<(EntityKey, EntityValue), StateError>>;
type EntityValueIter: Iterator<Item = Result<EntityValue, StateError>>;
type Writer: StateWriter;
Expand Down Expand Up @@ -396,6 +400,10 @@ pub trait StateStore: Sized + Send + Sync + Clone {

// TODO: generalize UTxO Set into generic entity system

fn iter_utxos(&self) -> Result<Self::UtxoIter, StateError>;

fn amount_of_utxos(&self) -> Result<u64, StateError>;

fn get_utxos(&self, refs: Vec<TxoRef>) -> Result<UtxoMap, StateError>;

fn get_utxo_by_address(&self, address: &[u8]) -> Result<UtxoSet, StateError>;
Expand Down
29 changes: 25 additions & 4 deletions crates/redb3/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::{collections::HashMap, path::Path, sync::Arc};

use dolos_core::{
ChainPoint, EntityKey, EntityValue, Namespace, StateError, StateSchema, TxoRef, UtxoMap,
UtxoSet,
ChainPoint, EntityKey, EntityValue, Namespace, StateError, StateSchema, TxoRef, UtxoMap, UtxoSet
};

use redb::{
Expand All @@ -13,7 +12,7 @@ use tracing::warn;

mod utxoset;

use crate::{build_tables, Error, Table};
use crate::{build_tables, state::utxoset::UtxosIterator, Error, Table};

impl From<Error> for StateError {
fn from(error: Error) -> Self {
Expand Down Expand Up @@ -215,8 +214,17 @@ impl dolos_core::StateWriter for StateWriter {
Ok(())
}

fn apply_utxoset(&self, delta: &dolos_core::UtxoSetDelta) -> Result<(), StateError> {
fn apply_utxoset(&self, delta: &dolos_core::UtxoSetDelta, defer_indexes: bool) -> Result<(), StateError> {
utxoset::UtxosTable::apply(&self.wx, delta)?;

if !defer_indexes {
utxoset::FilterIndexes::apply(&self.wx, delta)?;
}

Ok(())
}

fn index_utxoset(&self, delta: &dolos_core::UtxoSetDelta) -> Result<(), StateError> {
utxoset::FilterIndexes::apply(&self.wx, delta)?;

Ok(())
Expand All @@ -230,6 +238,7 @@ impl dolos_core::StateWriter for StateWriter {
}

impl dolos_core::StateStore for StateStore {
type UtxoIter = UtxosIterator;
type EntityIter = EntityIter;
type EntityValueIter = EntityValueIter;
type Writer = StateWriter;
Expand Down Expand Up @@ -307,6 +316,18 @@ impl dolos_core::StateStore for StateStore {
Ok(out)
}

fn iter_utxos(&self) -> Result<Self::UtxoIter, StateError> {
let rx = self.db().begin_read().map_err(Error::from)?;

Ok(utxoset::UtxosTable::iter(&rx)?)
}

fn amount_of_utxos(&self) -> Result<u64, StateError> {
let rx = self.db().begin_read().map_err(Error::from)?;

Ok(utxoset::UtxosTable::len(&rx)?)
}

fn get_utxos(&self, refs: Vec<TxoRef>) -> Result<UtxoMap, StateError> {
// exit early before opening a read tx in case there's nothing to fetch
if refs.is_empty() {
Expand Down
12 changes: 8 additions & 4 deletions crates/redb3/src/state/utxoset.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use dolos_core::{EraCbor, TxoRef, UtxoMap, UtxoSetDelta};
use dolos_core::{EraCbor, StateError, TxoRef, UtxoMap, UtxoSetDelta};
use pallas::ledger::{addresses::ShelleyDelegationPart, traverse::MultiEraOutput};
use redb::{
MultimapTableDefinition, Range, ReadTransaction, ReadableDatabase, ReadableTable as _,
Expand All @@ -19,10 +19,10 @@ type UtxosValue = (u16, &'static [u8]);
pub struct UtxosIterator(Range<'static, UtxosKey, UtxosValue>);

impl Iterator for UtxosIterator {
type Item = Result<(TxoRef, EraCbor), ::redb::StorageError>;
type Item = Result<(TxoRef, EraCbor), StateError>;

fn next(&mut self) -> Option<Self::Item> {
let x = self.0.next()?;
let x = self.0.next()?.map_err(|err| StateError::InternalStoreError(err.to_string()));

let x = x.map(|(k, v)| {
let (hash, idx) = k.value();
Expand Down Expand Up @@ -50,13 +50,17 @@ impl UtxosTable {
Ok(())
}

#[allow(unused)]
pub fn iter(rx: &ReadTransaction) -> Result<UtxosIterator, Error> {
let table = rx.open_table(UtxosTable::DEF)?;
let range = table.range::<UtxosKey>(..)?;
Ok(UtxosIterator(range))
}

pub fn len(rx: &ReadTransaction) -> Result<u64, Error> {
let table = rx.open_table(UtxosTable::DEF)?;
Ok(table.len()?)
}

pub fn get_sparse(rx: &ReadTransaction, refs: Vec<TxoRef>) -> Result<UtxoMap, Error> {
let table = rx.open_table(Self::DEF)?;
let mut out = HashMap::new();
Expand Down
47 changes: 47 additions & 0 deletions src/bin/dolos/doctor/build_indexes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::{sync::Arc};

use dolos_core::config::RootConfig;
use itertools::Itertools;
use miette::{Context, IntoDiagnostic};

use dolos::prelude::*;

use crate::feedback::Feedback;

#[derive(Debug, clap::Args)]
pub struct Args {
#[arg(short, long, default_value_t = 500)]
pub chunk: usize,
}

#[tokio::main]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Remove #[tokio::main] attribute from non-main function.

The #[tokio::main] attribute is designed to transform the main() entry point to set up a Tokio runtime. Using it on a regular async function that is called by the actual main function is incorrect and may cause compilation errors or unexpected behavior. Since this function is called from the CLI dispatcher (which already has a runtime), this attribute should be removed.

🔎 Proposed fix
-#[tokio::main]
 pub async fn run(config: &RootConfig, args: &Args, feedback: &Feedback) -> miette::Result<()> {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[tokio::main]
pub async fn run(config: &RootConfig, args: &Args, feedback: &Feedback) -> miette::Result<()> {
🤖 Prompt for AI Agents
In src/bin/dolos/doctor/build_indexes.rs around line 17, the function is
incorrectly annotated with #[tokio::main]; remove that attribute so the function
is a plain async function (e.g., async fn build_indexes(...)) or a sync fn as
appropriate, and ensure callers in the CLI dispatcher await the async function
(or spawn it) under the existing runtime; keep the function signature and return
type consistent (propagate Result types) and do not create a new runtime here.

pub async fn run(config: &RootConfig, args: &Args, feedback: &Feedback) -> miette::Result<()> {
//crate::common::setup_tracing(&config.logging)?;

let progress = feedback.slot_progress_bar();
progress.set_message("building indexes");

let domain = crate::common::setup_domain(config).await?;

progress.set_length(domain.state.amount_of_utxos().into_diagnostic().context("getting amount of utxos")?);

let remaining = domain
.state.iter_utxos()
.into_diagnostic()
.context("iterating over utxos")?;

for chunk in remaining.chunks(args.chunk).into_iter() {
let produced_utxo = chunk.into_iter().map(|x| {
let ( k, v ) = x.into_diagnostic().context("decoding utxoset")?;
Ok((k, Arc::new(v)))
}).collect::<miette::Result<_>>()?;
let utxoset = UtxoSetDelta {produced_utxo, ..Default::default()};

let writer = domain.state.start_writer().into_diagnostic().context("starting writer")?;
writer.index_utxoset(&utxoset).into_diagnostic().context("indexing")?;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

progress.inc(args.chunk as u64);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Progress increment may overcount on the last chunk.

The progress increments by args.chunk regardless of how many items were actually processed. On the final iteration, the chunk may contain fewer items than args.chunk, causing the progress bar to potentially exceed 100%.

🔎 Proposed fix

Track the actual chunk size:

     for chunk in remaining.chunks(args.chunk).into_iter() {
-        let produced_utxo = chunk.into_iter().map(|x| {
+        let chunk_items: Vec<_> = chunk.collect();
+        let chunk_len = chunk_items.len();
+        let produced_utxo = chunk_items.into_iter().map(|x| {
             let ( k, v ) = x.into_diagnostic().context("decoding utxoset")?;
             Ok((k, Arc::new(v)))
         }).collect::<miette::Result<_>>()?;
         let utxoset = UtxoSetDelta {produced_utxo, ..Default::default()};

         let writer = domain.state.start_writer().into_diagnostic().context("starting writer")?;
         writer.index_utxoset(&utxoset).into_diagnostic().context("indexing")?;
+        writer.commit().into_diagnostic().context("committing indexes")?;

-        progress.inc(args.chunk as u64);
+        progress.inc(chunk_len as u64);
     }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/bin/dolos/doctor/build_indexes.rs around line 43, the
progress.inc(args.chunk as u64) call always adds the configured chunk size which
can overcount on the final iteration; replace that with the actual number of
items processed for this iteration (e.g., compute actual_count =
processed_items.len() or the number of records returned/handled for the chunk,
convert to u64) and call progress.inc(actual_count), optionally clamping to the
remaining total to avoid exceeding 100%.

}

Ok(())
}
5 changes: 5 additions & 0 deletions src/bin/dolos/doctor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use dolos_core::config::RootConfig;

use crate::feedback::Feedback;

mod build_indexes;
mod catchup_stores;
mod reset_wal;
mod rollback;
Expand Down Expand Up @@ -32,6 +33,9 @@ pub enum Command {

/// manually updates an entity in the state
UpdateEntity(update_entity::Args),

/// catch up store data from WAL records
BuildIndexes(build_indexes::Args),
Comment on lines +37 to +38
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix the doc comment.

The doc comment "catch up store data from WAL records" is incorrect and appears to be copy-pasted from the CatchupStores command. Based on the implementation in build_indexes.rs, this command builds UTXO indexes by iterating over existing UTXOs in chunks.

🔎 Suggested fix
-    /// catch up store data from WAL records
+    /// builds UTXO indexes in batches
     BuildIndexes(build_indexes::Args),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// catch up store data from WAL records
BuildIndexes(build_indexes::Args),
/// builds UTXO indexes in batches
BuildIndexes(build_indexes::Args),
🤖 Prompt for AI Agents
In src/bin/dolos/doctor/mod.rs around lines 37 to 38, the doc comment "catch up
store data from WAL records" is incorrect for the BuildIndexes variant; update
the comment to accurately describe the command’s behavior (it builds UTXO
indexes by iterating over existing UTXOs in chunks and constructing indexes) —
replace the incorrect copy-pasted comment with a concise description like "Build
UTXO indexes by iterating over existing UTXOs in chunks."

}

#[derive(Debug, Parser)]
Expand All @@ -42,6 +46,7 @@ pub struct Args {

pub fn run(config: &RootConfig, args: &Args, feedback: &Feedback) -> miette::Result<()> {
match &args.command {
Command::BuildIndexes(x) => build_indexes::run(config, x, feedback)?,
Command::CatchupStores(x) => catchup_stores::run(config, x, feedback)?,
Command::ResetWal(x) => reset_wal::run(config, x, feedback)?,
Command::WalIntegrity(x) => wal_integrity::run(config, x)?,
Expand Down
2 changes: 1 addition & 1 deletion src/facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ where
let utxo_undo = dolos_cardano::utxoset::compute_undo_delta(blockd, &inputs)
.map_err(dolos_core::ChainError::from)?;

writer.apply_utxoset(&utxo_undo)?;
writer.apply_utxoset(&utxo_undo, false)?;

// TODO: we should differ notifications until the we commit the writers
self.notify_tip(TipEvent::Undo(point.clone(), block));
Expand Down
Loading