-
Notifications
You must be signed in to change notification settings - Fork 49
Enhance DomainAdapter to retrieve missing UTXOs from archive store #962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,16 @@ | ||
| pub mod storage; | ||
|
|
||
| use std::collections::HashMap; | ||
| use std::sync::Arc; | ||
|
|
||
| use dolos_cardano::CardanoLogic; | ||
| use dolos_core::{ | ||
| archive::ArchiveStore as _, | ||
| config::{StorageConfig, SyncConfig}, | ||
| indexes::IndexStore as _, | ||
| *, | ||
| }; | ||
| use pallas::ledger::traverse::MultiEraBlock; | ||
|
|
||
| pub use storage::{ | ||
| ArchiveStoreBackend, IndexStoreBackend, MempoolBackend, StateStoreBackend, WalStoreBackend, | ||
|
|
@@ -63,39 +67,6 @@ impl DomainAdapter { | |
| tracing::info!("domain adapter: graceful shutdown complete"); | ||
| Ok(()) | ||
| } | ||
|
|
||
| pub fn get_historical_utxos( | ||
| &self, | ||
| refs: &[pallas::interop::utxorpc::TxoRef], | ||
| ) -> Option<pallas::interop::utxorpc::UtxoMap> { | ||
| if refs.is_empty() { | ||
| return Some(Default::default()); | ||
| } | ||
|
|
||
| let mut result = std::collections::HashMap::new(); | ||
| let refs_set: std::collections::HashSet<_> = | ||
| refs.iter().copied().map(TxoRef::from).collect(); | ||
|
|
||
| let iter = self.wal().iter_logs(None, None).ok()?; | ||
| for (_, log) in iter.rev() { | ||
| for (txo_ref, era_cbor) in &log.inputs { | ||
| if refs_set.contains(txo_ref) { | ||
| let era = era_cbor.0.try_into().expect("era out of range"); | ||
| result.insert(txo_ref.clone().into(), (era, era_cbor.1.clone())); | ||
| } | ||
| } | ||
|
|
||
| if result.len() == refs.len() { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| if result.is_empty() { | ||
| None | ||
| } else { | ||
| Some(result) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Domain for DomainAdapter { | ||
|
|
@@ -180,18 +151,58 @@ impl pallas::interop::utxorpc::LedgerContext for DomainAdapter { | |
| &self, | ||
| refs: &[pallas::interop::utxorpc::TxoRef], | ||
| ) -> Option<pallas::interop::utxorpc::UtxoMap> { | ||
| let refs: Vec<_> = refs.iter().map(|x| TxoRef::from(*x)).collect(); | ||
| let dolos_refs: Vec<TxoRef> = refs.iter().map(|x| TxoRef::from(*x)).collect(); | ||
| let mut result: pallas::interop::utxorpc::UtxoMap = | ||
| dolos_core::StateStore::get_utxos(self.state(), dolos_refs) | ||
| .ok()? | ||
| .into_iter() | ||
| .map(|(k, v)| { | ||
| let era = v.0.try_into().expect("era out of range"); | ||
| (k.into(), (era, v.1.clone())) | ||
| }) | ||
| .collect(); | ||
|
|
||
| let missing: Vec<_> = refs.iter().filter(|r| !result.contains_key(r)).collect(); | ||
| if missing.is_empty() { | ||
| return Some(result); | ||
| } | ||
|
|
||
| let some = dolos_core::StateStore::get_utxos(self.state(), refs) | ||
| .ok()? | ||
| .into_iter() | ||
| .map(|(k, v)| { | ||
| let era = v.0.try_into().expect("era out of range"); | ||
| (k.into(), (era, v.1.clone())) | ||
| }) | ||
| .collect(); | ||
|
|
||
| Some(some) | ||
| let mut by_tx: HashMap<Vec<u8>, Vec<&pallas::interop::utxorpc::TxoRef>> = HashMap::new(); | ||
| for txo_ref in &missing { | ||
| by_tx.entry(txo_ref.0.to_vec()).or_default().push(txo_ref); | ||
| } | ||
|
|
||
| for (tx_hash_bytes, txo_refs) in by_tx { | ||
| let Ok(Some(slot)) = self.indexes().slot_by_tx_hash(&tx_hash_bytes) else { | ||
| continue; | ||
| }; | ||
| let Ok(Some(block_bytes)) = self.archive().get_block_by_slot(&slot) else { | ||
| continue; | ||
| }; | ||
| let Ok(block) = MultiEraBlock::decode(&block_bytes) else { | ||
| continue; | ||
| }; | ||
|
|
||
| let block_txs = block.txs(); | ||
| let Some(tx) = block_txs | ||
| .iter() | ||
| .find(|tx| tx.hash().as_ref() == tx_hash_bytes.as_slice()) | ||
| else { | ||
| continue; | ||
| }; | ||
|
|
||
| let outputs = tx.outputs(); | ||
| let era = block.era(); | ||
|
|
||
| for txo_ref in txo_refs { | ||
| let Some(output) = outputs.get(txo_ref.1 as usize) else { | ||
| continue; | ||
| }; | ||
| result.insert(*txo_ref, (era, output.encode())); | ||
| } | ||
|
Comment on lines
+165
to
+202
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
set -euo pipefail
printf '\n== LedgerContext implementations ==\n'
rg -nP --type=rust -C3 'impl\s+pallas::interop::utxorpc::LedgerContext\s+for\s+\w+|fn\s+get_utxos\s*\('
printf '\n== Historical lookup definitions / call sites ==\n'
rg -nP --type=rust -C3 '\bfn\s+get_historical_utxos\s*\(|(?<!fn\s)\bget_historical_utxos\s*\('
printf '\n== Non-definition get_utxos call sites ==\n'
rg -nP --type=rust -C3 '(?<!fn\s)\bget_utxos\s*\('Repository: txpipe/dolos Length of output: 13638 🏁 Script executed: # Check comments and context around the fallback code in DomainAdapter
sed -n '150,250p' src/adapters/mod.rs
# Look for any documentation or comments about why ArchiveStore fallback exists
rg -B5 -A5 'fn get_utxos' src/adapters/mod.rs
# Check if there are tests that verify this fallback behavior
rg -B3 -A3 'get_utxos.*ArchiveStore|ArchiveStore.*get_utxos|historical.*fallback|fallback.*historical' --type=rustRepository: txpipe/dolos Length of output: 3910 🏁 Script executed: # Check git history for when this fallback was added and any commit messages
git log --oneline -p -- src/adapters/mod.rs | grep -A 20 "get_utxos" | head -100
# Look for any documentation or design notes about LedgerContext
find . -name "*.md" -o -name "*.txt" | xargs grep -l "LedgerContext\|get_utxos\|historical.*utxo" 2>/dev/null | head -5
# Check if StateStore.get_utxos is designed to handle spent outputs (i.e., does it return None for spent?)
rg -B5 -A15 'fn get_utxos.*StateStore|impl.*StateStore.*{' crates/core/src/state.rs | head -80
# Look for comments or documentation explaining StateStore behavior
rg -B5 -A10 'pub trait StateStore|// StateStore|StateStore.*current' crates/core/src/state.rsRepository: txpipe/dolos Length of output: 1875 🏁 Script executed: # Search for StateStore trait definition and documentation
rg -A30 'pub trait StateStore' crates/core/src/state.rs
# Look for comments in the StateStore implementation files
rg 'StateStore.*current|current.*state|unspent|spent' crates/core/src/state.rs -B2 -A2
# Check AGENTS.md or other docs that were mentioned in learnings
cat AGENTS.md 2>/dev/null | grep -A10 -B10 "StateStore\|current.*state\|historical"
# Look for whether get_utxos is expected to return spent outputs or only current state
rg -B5 -A5 'spent|unspent' src/adapters/mod.rs crates/cardano/src/estart/loading.rsRepository: txpipe/dolos Length of output: 10288 Confirm: this fallback should not return historical/spent outputs. The fallback mechanism in The separate
The fallback as currently written allows spent outputs to leak into the current-state API. 🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| Some(result) | ||
| } | ||
|
|
||
| fn get_slot_timestamp(&self, slot: u64) -> Option<u64> { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t collapse archive/index failures into “not found”.
This fallback currently returns a partial
Some(result)when the index lookup, archive read, or block decode errors out. That makes backend corruption/transient outages indistinguishable from a genuine missing ref.Proposed fix
for (tx_hash_bytes, txo_refs) in by_tx { - let Ok(Some(slot)) = self.indexes().slot_by_tx_hash(&tx_hash_bytes) else { - continue; - }; - let Ok(Some(block_bytes)) = self.archive().get_block_by_slot(&slot) else { - continue; - }; - let Ok(block) = MultiEraBlock::decode(&block_bytes) else { - continue; - }; + let Some(slot) = self.indexes().slot_by_tx_hash(&tx_hash_bytes).ok()? else { + continue; + }; + let Some(block_bytes) = self.archive().get_block_by_slot(&slot).ok()? else { + continue; + }; + let block = MultiEraBlock::decode(&block_bytes).ok()?;🤖 Prompt for AI Agents