From 4df8173803c5e2036963e158e3cf92a3c318bf3c Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 12 Sep 2025 13:02:34 +0100 Subject: [PATCH 1/4] opt(indexer): remove block fetching for get events --- crates/indexer/engine/src/engine.rs | 37 ++++++- crates/indexer/fetcher/src/json_rpc.rs | 128 ++++++++++--------------- crates/indexer/fetcher/src/lib.rs | 1 - 3 files changed, 88 insertions(+), 78 deletions(-) diff --git a/crates/indexer/engine/src/engine.rs b/crates/indexer/engine/src/engine.rs index afa45f6e..82ae7bf3 100644 --- a/crates/indexer/engine/src/engine.rs +++ b/crates/indexer/engine/src/engine.rs @@ -343,9 +343,42 @@ impl Engine

{ pub async fn process_range(&mut self, range: &FetchRangeResult) -> Result<(), ProcessError> { let mut processed_blocks = HashSet::new(); + let mut block_timestamps = HashMap::new(); + + // Fetch timestamps for all blocks we need to process + let block_numbers: Vec = range.blocks.keys().copied().collect(); + if !block_numbers.is_empty() { + use starknet::core::types::{BlockId, requests::GetBlockWithTxHashesRequest, MaybePreConfirmedBlockWithTxHashes}; + use starknet::providers::{ProviderRequestData, ProviderResponseData}; + + let mut requests = Vec::new(); + for block_number in &block_numbers { + requests.push(ProviderRequestData::GetBlockWithTxHashes( + GetBlockWithTxHashesRequest { + block_id: BlockId::Number(*block_number), + }, + )); + } + + let results = self.provider.batch_requests(&requests).await?; + for (block_number, result) in block_numbers.iter().zip(results) { + match result { + ProviderResponseData::GetBlockWithTxHashes(block) => { + let timestamp = match block { + MaybePreConfirmedBlockWithTxHashes::Block(block) => block.timestamp, + _ => unreachable!(), + }; + block_timestamps.insert(*block_number, timestamp); + } + _ => unreachable!(), + } + } + } // Process all transactions in the chunk for (block_number, block) in &range.blocks { + let timestamp = *block_timestamps.get(block_number).expect("Block timestamp should exist"); + for (transaction_hash, tx) in &block.transactions { if tx.events.is_empty() { continue; @@ -357,7 +390,7 @@ impl Engine

{ *transaction_hash, tx.events.as_slice(), *block_number, - block.timestamp, + timestamp, &tx.transaction, ) .await?; @@ -365,7 +398,7 @@ impl Engine

{ // Process block if !processed_blocks.contains(&block_number) { - self.process_block(*block_number, block.timestamp).await?; + self.process_block(*block_number, timestamp).await?; processed_blocks.insert(block_number); } } diff --git a/crates/indexer/fetcher/src/json_rpc.rs b/crates/indexer/fetcher/src/json_rpc.rs index 576c4cfa..e6637224 100644 --- a/crates/indexer/fetcher/src/json_rpc.rs +++ b/crates/indexer/fetcher/src/json_rpc.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Debug; use std::time::Duration; @@ -93,7 +93,6 @@ impl Fetcher

{ let mut events = vec![]; let mut cursors = cursors.clone(); let mut blocks = BTreeMap::new(); - let mut block_numbers = BTreeSet::new(); let mut cursor_transactions = HashMap::new(); // Step 1: Create initial batch requests for events from all contracts @@ -141,75 +140,24 @@ impl Fetcher

{ counter!("torii_fetcher_events_fetched_total").increment(fetched_events.len() as u64); events.extend(fetched_events); - // Step 3: Collect unique block numbers from events and cursors - for event in &events { - block_numbers.insert(event.block_number.unwrap()); - } - for (_, cursor) in cursors.iter() { - if let Some(head) = cursor.head { - block_numbers.insert(head); - } - } - - // Step 4: Fetch block data (timestamps and transaction hashes) - let mut block_requests = Vec::new(); - counter!("torii_fetcher_blocks_to_fetch_total").increment(block_numbers.len() as u64); - for block_number in &block_numbers { - block_requests.push(ProviderRequestData::GetBlockWithTxHashes( - GetBlockWithTxHashesRequest { - block_id: BlockId::Number(*block_number), - }, - )); - } - - // Step 5: Execute block requests in batch and initialize blocks with transaction order - if !block_requests.is_empty() { - let block_results = self.chunked_batch_requests(&block_requests).await?; - for (block_number, result) in block_numbers.iter().zip(block_results) { - match result { - ProviderResponseData::GetBlockWithTxHashes(block) => { - let (timestamp, tx_hashes, block_hash) = match block { - MaybePreConfirmedBlockWithTxHashes::Block(block) => { - (block.timestamp, block.transactions, Some(block.block_hash)) - } - _ => unreachable!(), - }; - // Initialize block with transactions in the order provided by the block - let transactions = IndexMap::from_iter(tx_hashes.iter().map(|tx_hash| { - ( - *tx_hash, - FetchTransaction { - transaction: None, - events: vec![], - }, - ) - })); - - blocks.insert( - *block_number, - FetchRangeBlock { - block_hash, - timestamp, - transactions, - }, - ); - } - _ => unreachable!(), - } - } - } - - // Step 6: Assign events to their respective blocks and transactions + // Step 3: Assign events to their respective blocks and transactions for event in events { let block_number = event.block_number.unwrap(); - let block = blocks.get_mut(&block_number).expect("Block not found"); + // Create block entry if it doesn't exist + let block = blocks.entry(block_number).or_insert_with(|| FetchRangeBlock { + block_hash: None, + transactions: IndexMap::new(), + }); - // Push the event to the transaction + // Create transaction entry if it doesn't exist (no ordering) block .transactions - .get_mut(&event.transaction_hash) - .expect("Transaction should exist.") + .entry(event.transaction_hash) + .or_insert_with(|| FetchTransaction { + transaction: None, + events: vec![], + }) .events .push(Event { from_address: event.from_address, @@ -224,12 +172,7 @@ impl Fetcher

{ .insert(event.transaction_hash); } - // Step 7: Filter out transactions that don't have any events (not relevant to indexed contracts) - for (_, block) in blocks.iter_mut() { - block.transactions.retain(|_, tx| !tx.events.is_empty()); - } - - // Step 7: Fetch transaction details if enabled + // Step 4: Fetch transaction details if enabled if self.config.flags.contains(FetchingFlags::TRANSACTIONS) && !blocks.is_empty() { let mut transaction_requests = Vec::new(); let mut block_numbers_for_tx = Vec::new(); @@ -266,11 +209,46 @@ impl Fetcher

{ } } - // Step 8: Update cursor timestamps - for (_, cursor) in cursors.iter_mut() { + // Step 5: Update cursor timestamps - fetch only the head block timestamp + let mut head_blocks_to_fetch = HashSet::new(); + for (_, cursor) in cursors.iter() { if let Some(head) = cursor.head { - if let Some(block) = blocks.get(&head) { - cursor.last_block_timestamp = Some(block.timestamp); + head_blocks_to_fetch.insert(head); + } + } + + if !head_blocks_to_fetch.is_empty() { + let mut block_requests = Vec::new(); + for block_number in &head_blocks_to_fetch { + block_requests.push(ProviderRequestData::GetBlockWithTxHashes( + GetBlockWithTxHashesRequest { + block_id: BlockId::Number(*block_number), + }, + )); + } + + let block_results = self.chunked_batch_requests(&block_requests).await?; + let mut block_timestamps = HashMap::new(); + + for (block_number, result) in head_blocks_to_fetch.iter().zip(block_results) { + match result { + ProviderResponseData::GetBlockWithTxHashes(block) => { + let timestamp = match block { + MaybePreConfirmedBlockWithTxHashes::Block(block) => block.timestamp, + _ => unreachable!(), + }; + block_timestamps.insert(*block_number, timestamp); + } + _ => unreachable!(), + } + } + + // Update cursor timestamps + for (_, cursor) in cursors.iter_mut() { + if let Some(head) = cursor.head { + if let Some(timestamp) = block_timestamps.get(&head) { + cursor.last_block_timestamp = Some(*timestamp); + } } } } diff --git a/crates/indexer/fetcher/src/lib.rs b/crates/indexer/fetcher/src/lib.rs index fa48c0a2..ffe937db 100644 --- a/crates/indexer/fetcher/src/lib.rs +++ b/crates/indexer/fetcher/src/lib.rs @@ -49,7 +49,6 @@ pub struct FetchRangeBlock { // We check the parent hash of the pending block to the latest block // to see if we need to re fetch the pending block. pub block_hash: Option, - pub timestamp: u64, pub transactions: IndexMap, } From ef48822dfeb117d263aadce9724467b5bc6e029f Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 12 Sep 2025 13:11:24 +0100 Subject: [PATCH 2/4] fix test --- crates/indexer/engine/src/engine.rs | 14 +++++++++----- crates/indexer/fetcher/src/json_rpc.rs | 20 +++++++++----------- crates/indexer/fetcher/src/lib.rs | 4 ---- crates/indexer/fetcher/src/test.rs | 3 +-- 4 files changed, 19 insertions(+), 22 deletions(-) diff --git a/crates/indexer/engine/src/engine.rs b/crates/indexer/engine/src/engine.rs index 82ae7bf3..b1e5bc03 100644 --- a/crates/indexer/engine/src/engine.rs +++ b/crates/indexer/engine/src/engine.rs @@ -348,9 +348,11 @@ impl Engine

{ // Fetch timestamps for all blocks we need to process let block_numbers: Vec = range.blocks.keys().copied().collect(); if !block_numbers.is_empty() { - use starknet::core::types::{BlockId, requests::GetBlockWithTxHashesRequest, MaybePreConfirmedBlockWithTxHashes}; + use starknet::core::types::{ + requests::GetBlockWithTxHashesRequest, BlockId, MaybePreConfirmedBlockWithTxHashes, + }; use starknet::providers::{ProviderRequestData, ProviderResponseData}; - + let mut requests = Vec::new(); for block_number in &block_numbers { requests.push(ProviderRequestData::GetBlockWithTxHashes( @@ -359,7 +361,7 @@ impl Engine

{ }, )); } - + let results = self.provider.batch_requests(&requests).await?; for (block_number, result) in block_numbers.iter().zip(results) { match result { @@ -377,8 +379,10 @@ impl Engine

{ // Process all transactions in the chunk for (block_number, block) in &range.blocks { - let timestamp = *block_timestamps.get(block_number).expect("Block timestamp should exist"); - + let timestamp = *block_timestamps + .get(block_number) + .expect("Block timestamp should exist"); + for (transaction_hash, tx) in &block.transactions { if tx.events.is_empty() { continue; diff --git a/crates/indexer/fetcher/src/json_rpc.rs b/crates/indexer/fetcher/src/json_rpc.rs index e6637224..05994bc9 100644 --- a/crates/indexer/fetcher/src/json_rpc.rs +++ b/crates/indexer/fetcher/src/json_rpc.rs @@ -144,14 +144,12 @@ impl Fetcher

{ for event in events { let block_number = event.block_number.unwrap(); - // Create block entry if it doesn't exist - let block = blocks.entry(block_number).or_insert_with(|| FetchRangeBlock { - block_hash: None, - transactions: IndexMap::new(), - }); - // Create transaction entry if it doesn't exist (no ordering) - block + blocks + .entry(block_number) + .or_insert_with(|| FetchRangeBlock { + transactions: IndexMap::new(), + }) .transactions .entry(event.transaction_hash) .or_insert_with(|| FetchTransaction { @@ -216,7 +214,7 @@ impl Fetcher

{ head_blocks_to_fetch.insert(head); } } - + if !head_blocks_to_fetch.is_empty() { let mut block_requests = Vec::new(); for block_number in &head_blocks_to_fetch { @@ -226,10 +224,10 @@ impl Fetcher

{ }, )); } - + let block_results = self.chunked_batch_requests(&block_requests).await?; let mut block_timestamps = HashMap::new(); - + for (block_number, result) in head_blocks_to_fetch.iter().zip(block_results) { match result { ProviderResponseData::GetBlockWithTxHashes(block) => { @@ -242,7 +240,7 @@ impl Fetcher

{ _ => unreachable!(), } } - + // Update cursor timestamps for (_, cursor) in cursors.iter_mut() { if let Some(head) = cursor.head { diff --git a/crates/indexer/fetcher/src/lib.rs b/crates/indexer/fetcher/src/lib.rs index ffe937db..1271d6d5 100644 --- a/crates/indexer/fetcher/src/lib.rs +++ b/crates/indexer/fetcher/src/lib.rs @@ -45,10 +45,6 @@ impl Default for FetcherConfig { #[derive(Debug, Clone)] pub struct FetchRangeBlock { - // For pending blocks, this is None. - // We check the parent hash of the pending block to the latest block - // to see if we need to re fetch the pending block. - pub block_hash: Option, pub transactions: IndexMap, } diff --git a/crates/indexer/fetcher/src/test.rs b/crates/indexer/fetcher/src/test.rs index 3f342fdb..86095595 100644 --- a/crates/indexer/fetcher/src/test.rs +++ b/crates/indexer/fetcher/src/test.rs @@ -491,8 +491,7 @@ async fn test_range_one_block() { // Expecting the block right after the cursor head + the chunk size. assert_eq!(result.range.blocks.len(), 2); - assert_eq!(torii_block.block_hash, Some(expected.block_hash)); - assert_eq!(torii_block.timestamp, expected.timestamp); + assert_eq!(torii_block.transactions.len(), expected.transactions.len()); // Verify all transactions are present and match for (torii_tx_hash, _torii_tx) in torii_block.transactions.iter() { From 2ae0b3edd9e11c3c116e5a761706a2df4f34eb4e Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 12 Sep 2025 13:19:44 +0100 Subject: [PATCH 3/4] assert block exists & 0 timestamp --- crates/indexer/engine/src/engine.rs | 41 ++------------------------ crates/indexer/fetcher/src/json_rpc.rs | 15 +++++----- crates/indexer/fetcher/src/lib.rs | 1 + 3 files changed, 11 insertions(+), 46 deletions(-) diff --git a/crates/indexer/engine/src/engine.rs b/crates/indexer/engine/src/engine.rs index b1e5bc03..afa45f6e 100644 --- a/crates/indexer/engine/src/engine.rs +++ b/crates/indexer/engine/src/engine.rs @@ -343,46 +343,9 @@ impl Engine

{ pub async fn process_range(&mut self, range: &FetchRangeResult) -> Result<(), ProcessError> { let mut processed_blocks = HashSet::new(); - let mut block_timestamps = HashMap::new(); - - // Fetch timestamps for all blocks we need to process - let block_numbers: Vec = range.blocks.keys().copied().collect(); - if !block_numbers.is_empty() { - use starknet::core::types::{ - requests::GetBlockWithTxHashesRequest, BlockId, MaybePreConfirmedBlockWithTxHashes, - }; - use starknet::providers::{ProviderRequestData, ProviderResponseData}; - - let mut requests = Vec::new(); - for block_number in &block_numbers { - requests.push(ProviderRequestData::GetBlockWithTxHashes( - GetBlockWithTxHashesRequest { - block_id: BlockId::Number(*block_number), - }, - )); - } - - let results = self.provider.batch_requests(&requests).await?; - for (block_number, result) in block_numbers.iter().zip(results) { - match result { - ProviderResponseData::GetBlockWithTxHashes(block) => { - let timestamp = match block { - MaybePreConfirmedBlockWithTxHashes::Block(block) => block.timestamp, - _ => unreachable!(), - }; - block_timestamps.insert(*block_number, timestamp); - } - _ => unreachable!(), - } - } - } // Process all transactions in the chunk for (block_number, block) in &range.blocks { - let timestamp = *block_timestamps - .get(block_number) - .expect("Block timestamp should exist"); - for (transaction_hash, tx) in &block.transactions { if tx.events.is_empty() { continue; @@ -394,7 +357,7 @@ impl Engine

{ *transaction_hash, tx.events.as_slice(), *block_number, - timestamp, + block.timestamp, &tx.transaction, ) .await?; @@ -402,7 +365,7 @@ impl Engine

{ // Process block if !processed_blocks.contains(&block_number) { - self.process_block(*block_number, timestamp).await?; + self.process_block(*block_number, block.timestamp).await?; processed_blocks.insert(block_number); } } diff --git a/crates/indexer/fetcher/src/json_rpc.rs b/crates/indexer/fetcher/src/json_rpc.rs index 05994bc9..09ac5575 100644 --- a/crates/indexer/fetcher/src/json_rpc.rs +++ b/crates/indexer/fetcher/src/json_rpc.rs @@ -148,6 +148,7 @@ impl Fetcher

{ blocks .entry(block_number) .or_insert_with(|| FetchRangeBlock { + timestamp: 0, transactions: IndexMap::new(), }) .transactions @@ -226,8 +227,6 @@ impl Fetcher

{ } let block_results = self.chunked_batch_requests(&block_requests).await?; - let mut block_timestamps = HashMap::new(); - for (block_number, result) in head_blocks_to_fetch.iter().zip(block_results) { match result { ProviderResponseData::GetBlockWithTxHashes(block) => { @@ -235,7 +234,10 @@ impl Fetcher

{ MaybePreConfirmedBlockWithTxHashes::Block(block) => block.timestamp, _ => unreachable!(), }; - block_timestamps.insert(*block_number, timestamp); + blocks + .get_mut(block_number) + .expect("Block should exist.") + .timestamp = timestamp; } _ => unreachable!(), } @@ -243,10 +245,9 @@ impl Fetcher

{ // Update cursor timestamps for (_, cursor) in cursors.iter_mut() { - if let Some(head) = cursor.head { - if let Some(timestamp) = block_timestamps.get(&head) { - cursor.last_block_timestamp = Some(*timestamp); - } + if let Some(head) = &cursor.head { + let timestamp = blocks.get(head).expect("Block should exist.").timestamp; + cursor.last_block_timestamp = Some(timestamp); } } } diff --git a/crates/indexer/fetcher/src/lib.rs b/crates/indexer/fetcher/src/lib.rs index 1271d6d5..6514ff70 100644 --- a/crates/indexer/fetcher/src/lib.rs +++ b/crates/indexer/fetcher/src/lib.rs @@ -45,6 +45,7 @@ impl Default for FetcherConfig { #[derive(Debug, Clone)] pub struct FetchRangeBlock { + pub timestamp: u64, pub transactions: IndexMap, } From f2d0a48d2fb7fb1d9a4f43b75fe5ab1a72c476df Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 12 Sep 2025 13:48:17 +0100 Subject: [PATCH 4/4] fix panic --- crates/indexer/fetcher/src/json_rpc.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/indexer/fetcher/src/json_rpc.rs b/crates/indexer/fetcher/src/json_rpc.rs index 09ac5575..69aa9e57 100644 --- a/crates/indexer/fetcher/src/json_rpc.rs +++ b/crates/indexer/fetcher/src/json_rpc.rs @@ -235,8 +235,11 @@ impl Fetcher

{ _ => unreachable!(), }; blocks - .get_mut(block_number) - .expect("Block should exist.") + .entry(*block_number) + .or_insert_with(|| FetchRangeBlock { + timestamp, + transactions: IndexMap::new(), + }) .timestamp = timestamp; } _ => unreachable!(),