Introduce a pool-indexer to provide univ3 liquidity#4349
Conversation
- Split `api/uniswap_v3.rs` into `pools.rs` and `ticks.rs` submodules
with shared helpers (`internal_error`, `parse_hex_address`) in `mod.rs`
- Add `{network}` path segment to routes; handlers return 404 for unknown networks
- Add `token0`/`token1` query params for symbol-based pool search
(partial, case-insensitive, order-independent for pairs)
- Extract `search_pools` and `list_pools` as focused internal helpers
- Document all public structs, fields, and handlers
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
# Conflicts: # Cargo.lock # contracts/generated/contracts-facade/Cargo.toml # contracts/generated/contracts-facade/src/lib.rs # contracts/src/main.rs
|
Reminder: Please update the DB Readme and comment whether migrations are reversible (include rollback scripts if applicable).
Caused by: |
|
Is there a way to split this PR into multiple parts and move with smaller iterations? |
I am open to ideas. |
There was a problem hiding this comment.
Code Review
This pull request introduces a new pool-indexer crate to serve as a data source for Uniswap V3 liquidity, providing an alternative to the subgraph. It includes database schema updates, a cold-seeding mechanism for bootstrapping from on-chain data, and an API for querying pool states and ticks. A critical issue was identified regarding the tick query limit, which may cause incorrect price estimations by returning incomplete tick data.
jmg-duarte
left a comment
There was a problem hiding this comment.
1st round, didn't get close to finishing yet
| let seeded_block = if let Some(subgraph_url) = network.subgraph_url.as_ref() { | ||
| crate::subgraph_seeder::seed( | ||
| &db, | ||
| network.name.as_str(), | ||
| network.chain_id, | ||
| factory.address, | ||
| subgraph_url, | ||
| network.seed_block, | ||
| ) | ||
| .await | ||
| .expect("subgraph seeding failed") | ||
| } else { | ||
| crate::cold_seeder::cold_seed( | ||
| &db, | ||
| network.name.as_str(), | ||
| network.chain_id, | ||
| provider, | ||
| factory.address, | ||
| factory.deployment_block, | ||
| network.seed_block, | ||
| ) | ||
| .await | ||
| .expect("cold seeding failed") | ||
| }; | ||
| indexer | ||
| .catch_up(seeded_block) | ||
| .await | ||
| .expect("catch-up indexing failed"); |
There was a problem hiding this comment.
this is begging for a separate function
| fn spawn_api_task(set: &mut JoinSet<()>, state: Arc<AppState>, bind_address: SocketAddr) { | ||
| let router = crate::api::router(state); | ||
| set.spawn(async move { serve(router, bind_address).await }); | ||
| } | ||
|
|
||
| fn spawn_network_task(set: &mut JoinSet<()>, db: PgPool, network: NetworkConfig) { | ||
| set.spawn(async move { | ||
| run_network_indexer(db, network).await; | ||
| }); | ||
| } |
There was a problem hiding this comment.
IMO these could be inlined, just to avoid the &mut out param awkwardness
| factory BYTEA NOT NULL, | ||
| token0 BYTEA NOT NULL, | ||
| token1 BYTEA NOT NULL, | ||
| fee INT NOT NULL, -- hundredths of a basis point (500 = 0.05%, 3000 = 0.3%, 10000 = 1%) |
There was a problem hiding this comment.
is there a way of forcing this to always be positive?
| block_number BIGINT NOT NULL, | ||
| sqrt_price_x96 NUMERIC NOT NULL, -- uint160 | ||
| liquidity NUMERIC NOT NULL, -- uint128 | ||
| tick INT NOT NULL, |
There was a problem hiding this comment.
is the tick here also an index? i find that super confusing so maybe some comments would for future reference too
| if let Err(err) = | ||
| run_symbol_backfill_pass(&provider, &db, &network, chain_id, prefetch_concurrency).await | ||
| { | ||
| tracing::warn!(?err, "token symbol backfill pass failed"); |
There was a problem hiding this comment.
| tracing::warn!(?err, "token symbol backfill pass failed"); | |
| tracing::error!(?err, "token symbol backfill pass failed"); |
| async fn fetch_decimals(provider: &AlloyProvider, token: Address) -> Option<u8> { | ||
| ERC20::Instance::new(token, provider.clone()) | ||
| .decimals() | ||
| .call() | ||
| .await | ||
| .ok() | ||
| } |
There was a problem hiding this comment.
imo this code could be inlined + the error should be logged
| amount0: alloy::primitives::U256::ZERO, | ||
| amount1: alloy::primitives::U256::ZERO, |
There was a problem hiding this comment.
more unnecessary qualified paths
| let addresses: Vec<&[u8]> = pools.iter().map(|pool| pool.address.as_slice()).collect(); | ||
| let token0s: Vec<&[u8]> = pools.iter().map(|pool| pool.token0.as_slice()).collect(); | ||
| let token1s: Vec<&[u8]> = pools.iter().map(|pool| pool.token1.as_slice()).collect(); | ||
| let fees: Vec<i32> = pools.iter().map(|pool| pool.fee.cast_signed()).collect(); | ||
| let t0_decimals: Vec<Option<i16>> = pools | ||
| .iter() | ||
| .map(|pool| pool.token0_decimals.map(i16::from)) | ||
| .collect(); | ||
| let t1_decimals: Vec<Option<i16>> = pools | ||
| .iter() | ||
| .map(|pool| pool.token1_decimals.map(i16::from)) | ||
| .collect(); | ||
| let t0_symbols: Vec<Option<String>> = pools.iter().map(|p| p.token0_symbol.clone()).collect(); | ||
| let t1_symbols: Vec<Option<String>> = pools.iter().map(|p| p.token1_symbol.clone()).collect(); | ||
| let created_blocks: Vec<i64> = pools | ||
| .iter() | ||
| .map(|pool| pool.created_block.cast_signed()) | ||
| .collect(); |
There was a problem hiding this comment.
Can't you use zip/unzip (whatever it is) to just do one loop and extract all vecs at once?
There was a problem hiding this comment.
I don't understand what you mean.
| let addresses: Vec<&[u8]> = states | ||
| .iter() | ||
| .map(|state| state.pool_address.as_slice()) | ||
| .collect(); | ||
| let block_numbers: Vec<i64> = states | ||
| .iter() | ||
| .map(|state| state.block_number.cast_signed()) | ||
| .collect(); | ||
| let sqrt_prices: Vec<BigDecimal> = states | ||
| .iter() | ||
| .map(|state| u160_to_big_decimal(&state.sqrt_price_x96)) | ||
| .collect(); | ||
| let liquidities: Vec<BigDecimal> = states | ||
| .iter() | ||
| .map(|state| sql_u128(state.liquidity)) | ||
| .collect(); | ||
| let ticks: Vec<i32> = states.iter().map(|state| state.tick).collect(); |
| .clone() | ||
| } | ||
|
|
||
| fn validate_networks(networks: &[NetworkConfig]) { |
There was a problem hiding this comment.
chain_id is trusted from config without verifying the RPC.
This validate_networks func checks for duplicate names / chain IDs / factories, but never calls eth_chainId against the configured RPC to confirm the chain_id matches.
Claude skill review :
Compare to driver, which explicitly asserts (ref. crates/driver/src/infra/config/file/load.rs:45-52)
A misconfigured deployment with chain_id = 1 pointed at an Arbitrum RPC URL would silently index Arbitrum pool/state events into the mainnet partition of the shared DB, polluting the data the driver later reads for mainnet auctions.
Action: At startup (likely in build_provider or just before spawning the network task), provider.get_chain_id().await and assert it equals network.chain_id; panic on mismatch.
There was a problem hiding this comment.
This file mixes four largely independent responsibilities in one file: the polling driver (UniswapV3Indexer), the per-event reducer (LogAccumulator), the symbol-backfill task, and the bisecting log fetcher (bisecting_get_logs, is_range_too_large).
Can we split along responsibility,
e.g. indexer/uniswap_v3/{indexer,accumulator,symbol_backfill,logs}.rs
|
This PR is a good example where some decisions should have been made in an RFC to avoid spending a lot of time on rewriting the code. A new long-running indexer is the most expensive option. It's defensible on chains with no subgraph (Ink, Plasma), but the PR doesn't weigh that one case against other (cheaper) options. PR is too big. This should have been at least three: introduce the trait with the existing subgraph as the only impl, add the pool-indexer service with no driver integration, add the driver client and config plumbing. Each is independently reviewable, mergeable, and revertable. The two mock contracts go through the contracts-crate codegen pipeline, producing ~3000 LOC of generated bindings, two new Cargo crates, and facade updates, just to back ~30 lines of Solidity. An inline alloy::sol! macro with embedded bytecode would do the same job in one file. The status-quo pattern is consistent with Counter.sol and friends, but for two tiny test fixtures the consistency tax is high. Also worth noting: the mock pool has no slot0(), so the cold-seed path can't be tested against it, and indeed all four e2e tests pre-seed the checkpoint to bypass seeding entirely. A new long-running service needs dashboards, alerts, and runbooks. The PR doesn't say what happens on indexer_lag_blocks > X, who owns cold-seed runs when adding a new chain, or what the rollback story looks like if the indexer falls behind mid-auction. |
| token0: Token { | ||
| id: pool.token0.id, | ||
| decimals: pool.token0.decimals.unwrap_or(0), | ||
| }, | ||
| token1: Token { | ||
| id: pool.token1.id, | ||
| decimals: pool.token1.decimals.unwrap_or(0), | ||
| }, |
There was a problem hiding this comment.
unwrap_or(0) treats missing decimals as 0. The indexer schema allows token{0,1}_decimals NULL when the cold-seeder's decimals() call fails, and there's no decimals-backfill task. As I read it, the subgraph never produced NULL, so this looks like a regression. With decimals=0 solver price math would be off by 10^18. Either filter such pools out client-side, or fix the indexer to never persist NULL.
| token0: Token { | |
| id: pool.token0.id, | |
| decimals: pool.token0.decimals.unwrap_or(0), | |
| }, | |
| token1: Token { | |
| id: pool.token1.id, | |
| decimals: pool.token1.decimals.unwrap_or(0), | |
| }, | |
| token0: Token { | |
| id: pool.token0.id, | |
| decimals: pool.token0.decimals.context("missing token0 decimals")?, | |
| }, | |
| token1: Token { | |
| id: pool.token1.id, | |
| decimals: pool.token1.decimals.context("missing token1 decimals")?, | |
| }, |
| async fn fetch_pool_liquidity(provider: &AlloyProvider, pool: Address, block: u64) -> Option<u128> { | ||
| contracts::UniswapV3Pool::Instance::new(pool, provider.clone()) | ||
| .liquidity() | ||
| .block(block.into()) | ||
| .call() | ||
| .await | ||
| .ok() | ||
| } | ||
|
|
||
| async fn fetch_decimals(provider: &AlloyProvider, token: Address) -> Option<u8> { | ||
| ERC20::Instance::new(token, provider.clone()) | ||
| .decimals() | ||
| .call() | ||
| .await | ||
| .ok() | ||
| } |
There was a problem hiding this comment.
Both fetch_pool_liquidity and fetch_decimals end with .ok(). As I read it, an RPC blip on a Mint/Burn block leaves liquidity stale until the next Swap. A decimals() failure at discovery leaves the pool with NULL decimals forever (which feeds the bug above). At least log a warn!. Better, bubble the error so the chunk retries.
| async fn backfill_symbols( | ||
| provider: AlloyProvider, | ||
| db: sqlx::PgPool, | ||
| network: NetworkName, | ||
| chain_id: u64, | ||
| prefetch_concurrency: usize, | ||
| poll_interval: std::time::Duration, | ||
| ) -> ! { | ||
| loop { | ||
| if let Err(err) = | ||
| run_symbol_backfill_pass(&provider, &db, &network, chain_id, prefetch_concurrency).await | ||
| { | ||
| tracing::warn!(?err, "token symbol backfill pass failed"); | ||
| } | ||
| tokio::time::sleep(poll_interval).await; | ||
| } | ||
| } |
There was a problem hiding this comment.
Symbols get a dedicated retry loop with an empty-string sentinel. Decimals are fetched once at pool discovery and never retried. A transient RPC failure would leave the pool with NULL decimals permanently. Same backfill pattern would help here.
There was a problem hiding this comment.
This is on purpose. Symbols are there only for debugging. I'd rather drop the symbols altogether than put it more work into reliability.
| pub(crate) fn is_range_too_large(err: &anyhow::Error) -> bool { | ||
| err.chain().any(|e| { | ||
| let msg = e.to_string().to_lowercase(); | ||
| // Alchemy: "query exceeds max block range 10000" | ||
| msg.contains("max block range") | ||
| // OVH: "request timed out" — the server cuts off oversized queries | ||
| // instead of rejecting with a size error, so bisecting on timeout | ||
| // eventually lands on a tractable range. | ||
| || msg.contains("timed out") | ||
| }) | ||
| } |
There was a problem hiding this comment.
is_range_too_large substring-matches "timed out". That catches transient client/network timeouts unrelated to range size. Worst case is log(range) wasted bisection RPC calls before bottoming out at to == from and returning the original error. Consider scoping to the OVH error specifically (HTTP status code, JSON-RPC error code, or a more precise phrase).
| async fn seed(self) -> Result<u64> { | ||
| info!( | ||
| block = self.snapshot_block, | ||
| "seeding pool-indexer from subgraph" | ||
| ); | ||
|
|
||
| let pool_ids = self.seed_pools().await?; | ||
| let total_ticks = self.seed_ticks(&pool_ids).await?; | ||
|
|
||
| info!( | ||
| block = self.snapshot_block, | ||
| pools = pool_ids.len(), | ||
| ticks = total_ticks, | ||
| "seeding complete" | ||
| ); | ||
| Ok(self.snapshot_block) | ||
| } |
There was a problem hiding this comment.
No consistency check between subgraph data and on-chain truth. IIUC a poisoned or lagging subgraph would silently seed wrong data, and the live indexer would replay events on top of bad state. Cheap mitigation: spot-check N random pools' slot0() + liquidity() against the seeded state right after seeding, before the first checkpoint write.
The new pool-indexer service is ~94% of the non-contracts rust code. The spitting you are suggesting wouldn't help much at all. |
| /// too many. Only applies when using the subgraph source. | ||
| pub max_pools_per_tick_query: usize, |
There was a problem hiding this comment.
Sounds like this param should be extracted to a separate config struct together with the subgraph url.
| // `Url::join` replaces the last path segment unless the base ends in | ||
| // a `/`. Build `<service-root>/api/v1/<network>/uniswap/v3/` once so | ||
| // every `path()` call behaves like "append". | ||
| let prefix = format!("api/v1/{}/uniswap/v3/", chain.slug()); | ||
| let with_trailing_slash = if base_url.path().ends_with('/') { | ||
| base_url | ||
| } else { | ||
| let mut u = base_url; | ||
| let p = format!("{}/", u.path()); | ||
| u.set_path(&p); | ||
| u | ||
| }; | ||
| let base_url = with_trailing_slash | ||
| .join(&prefix) | ||
| .with_context(|| format!("joining {prefix} onto {with_trailing_slash}"))?; | ||
| Ok(Self { base_url, http }) |
There was a problem hiding this comment.
Wouldn't that do the same trick?
| // `Url::join` replaces the last path segment unless the base ends in | |
| // a `/`. Build `<service-root>/api/v1/<network>/uniswap/v3/` once so | |
| // every `path()` call behaves like "append". | |
| let prefix = format!("api/v1/{}/uniswap/v3/", chain.slug()); | |
| let with_trailing_slash = if base_url.path().ends_with('/') { | |
| base_url | |
| } else { | |
| let mut u = base_url; | |
| let p = format!("{}/", u.path()); | |
| u.set_path(&p); | |
| u | |
| }; | |
| let base_url = with_trailing_slash | |
| .join(&prefix) | |
| .with_context(|| format!("joining {prefix} onto {with_trailing_slash}"))?; | |
| Ok(Self { base_url, http }) | |
| let path = format!( | |
| "{}/api/v1/{}/uniswap/v3/", | |
| base_url.path().trim_end_matches('/'), | |
| chain.slug(), | |
| ); | |
| base_url.set_path(&path); | |
| Ok(Self { base_url, http }) |
Description
Introduces pool-indexer service that indexes Uniswap V3 pool state from on-chain events and serves it over HTTP, plus the driver-side plumbing to use it as a drop-in replacement for the Uniswap V3 subgraph. The goal is to remove the driver's request-path dependency on external subgraphs.
How it works at a high level. Pool-indexer maintains a Postgres snapshot of every Uniswap V3 pool on the factories it's configured to watch. The lifecycle is:
writers filter unknown pools out instead). Within a single chunk transaction: new pools are inserted, Swap/Initialize events update pool state (they contain sqrtPrice/liquidity/tick in the event payload — free state), Mint/Burn adjust tick liquidity_net and trigger a pool.liquidity() refresh if no Swap was
seen in the same chunk, and the checkpoint advances.
pass.
Driver integration. Behind a new V3PoolDataSource trait, the existing UniV3SubgraphClient and a new PoolIndexerClient are interchangeable. Config grows an optional pool-indexer-url; when set, it's used instead of the subgraph. The rest of the driver's V3 machinery (checkpoint, event replay, merge on new
blocks) is unchanged — pool-indexer just plugs in as the source of truth for initial + on-demand pool snapshots. Per-network switchover is one config line, fully backward compatible.
How to test
Added E2E tests.
I also deployed this to mainnet staging and did a swap and baseline participated. I did a bunch of smoke test that compare the results to the subgraph results and they were identical.