Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/op_analytics/dagster/assets/sugar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dagster import (
OpExecutionContext,
asset,
)


@asset(group_name="sugar")
def sugar_daily(context: OpExecutionContext) -> None:
"""Pull daily Sugar protocol data.

Fetches and processes daily Sugar protocol metrics and stores them in our data warehouse.
The data includes key protocol metrics like TVL, volume, and other relevant statistics.
"""
from op_analytics.datasources.sugar import execute

result = execute.execute_pull()
context.log.info("Sugar daily pull completed", result=result)
Empty file.
4 changes: 4 additions & 0 deletions src/op_analytics/datasources/sugar/chain_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from sugar.chains import BaseChain, OPChain
Comment thread
dioptx marked this conversation as resolved.
Outdated

# So far, sugar-sdk only supports BaseChain and OPChain
chain_list = [BaseChain, OPChain]
Comment thread
dioptx marked this conversation as resolved.
Outdated
Empty file.
60 changes: 60 additions & 0 deletions src/op_analytics/datasources/sugar/chains/chains.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from typing import Any, Dict, List, Tuple

from op_analytics.coreutils.logger import structlog
from op_analytics.datasources.sugar.prices.dynamic_prices import fetch_prices_with_retry

log = structlog.get_logger()


async def fetch_chain_data(
Comment thread
dioptx marked this conversation as resolved.
Outdated
chain_cls: type,
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Fetch chain data (tokens, pools, prices) for a given chain class.

Args:
chain_cls: A Sugar chain class (e.g. OPChain or BaseChain).

Returns:
A tuple with three lists:
- tokens_data: List of dictionaries for tokens.
- pools_data: List of dictionaries for liquidity pools.
- prices_data: List of dictionaries for token prices.
Comment thread
dioptx marked this conversation as resolved.
Outdated
"""
# Initialize chain instance (assumed async context manager)
async with chain_cls() as chain:
tokens = await chain.get_all_tokens(listed_only=True)
log.info(f"{chain_cls.__name__}: Fetched {len(tokens)} tokens.")

# Build token mapping if needed
tokens_data = [
{
"token_address": t.token_address,
"symbol": t.symbol,
"decimals": t.decimals,
"listed": t.listed,
}
for t in tokens
]

pools = await chain.get_pools()
log.info(f"{chain_cls.__name__}: Fetched {len(pools)} liquidity pools.")
pools_data = [
{
"lp": p.lp,
"factory": p.factory,
"symbol": p.symbol,
"is_stable": p.is_stable,
"total_supply": p.total_supply,
"decimals": p.decimals,
"token0": p.token0.symbol if p.token0 else None,
"token1": p.token1.symbol if p.token1 else None,
"pool_fee": p.pool_fee,
}
for p in pools
]

prices_data = await fetch_prices_with_retry(chain, tokens, initial_batch_size=40)
log.info(f"{chain_cls.__name__}: Fetched prices for {len(prices_data)} tokens.")

return tokens_data, pools_data, prices_data
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general advice. If a function returns a tuple it usually means you need to break it apart into separate functions or if that's not possible then create a dataclass so you can return a single object with attribute names that callers can use to understand what is being returned.

7 changes: 7 additions & 0 deletions src/op_analytics/datasources/sugar/dataaccess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from op_analytics.coreutils.partitioned.dailydata import DailyDataset


class SugarDataAccess(DailyDataset):
Comment thread
dioptx marked this conversation as resolved.
Outdated
TOKENS = "sugar_tokens_v1"
POOLS = "sugar_liquidity_pools_v1"
PRICES = "sugar_prices_v1"
67 changes: 67 additions & 0 deletions src/op_analytics/datasources/sugar/execute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
Sugar data ingestion pipeline.

This module pulls Sugar protocol data from all chains and writes the tokens, pools,
and prices to partitioned datasets in ClickHouse or GCS (depending on the configuration).
"""

import polars as pl
from typing import Dict, Any, List

from op_analytics.coreutils.logger import structlog
from op_analytics.coreutils.partitioned.dailydatautils import dt_summary
from op_analytics.datasources.sugar.dataaccess import SugarDataAccess
from op_analytics.datasources.sugar.chain_list import chain_list
from op_analytics.datasources.sugar.chains.chains import fetch_chain_data

log = structlog.get_logger()


async def _collect_data() -> Dict[str, List[Dict[str, Any]]]:
"""
Collects tokens, pools, and prices data from each configured chain (OPChain, BaseChain).
Returns:
Dictionary containing three lists, keyed by "tokens", "pools", and "prices".
"""
all_data = {"tokens": [], "pools": [], "prices": []}

for chain_cls in chain_list:
tokens, pools, prices = await fetch_chain_data(chain_cls)
all_data["tokens"].extend(tokens)
all_data["pools"].extend(pools)
all_data["prices"].extend(prices)

return all_data


def _write_data(
data: List[Dict[str, Any]],
dataset: SugarDataAccess,
data_type: str,
) -> Dict[str, Any]:
"""
Writes data to the dataset and returns a summary for logging.
"""
df = pl.DataFrame(data)
dataset.write(df)

summary = {f"{data_type}_df": dt_summary(df)}
log.info(f"Sugar {data_type} ingestion completed", summary=summary)
return summary


async def execute_pull() -> Dict[str, Any]:
"""
Main Sugar ingestion entrypoint.
Fetches the data from all chains, writes to configured datasets,
and returns a summary dictionary.
"""
all_data = await _collect_data()

summary: Dict[str, Any] = {}
summary.update(_write_data(all_data["tokens"], SugarDataAccess.TOKENS, "tokens"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be simpler to update as:

summary["tokens"] =  <... tokens write summary ...>

in other words you can nest the summaries, they don't need to be flat. Summaries are only to get an quick idea of what happened from the dagster UI.

summary.update(_write_data(all_data["pools"], SugarDataAccess.POOLS, "pools"))
summary.update(_write_data(all_data["prices"], SugarDataAccess.PRICES, "prices"))

log.info("Sugar ingestion completed", summary=summary)
return summary
Empty file.
57 changes: 57 additions & 0 deletions src/op_analytics/datasources/sugar/pools/pools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import List
Comment thread
dioptx marked this conversation as resolved.
Outdated

from sugar.pool import LiquidityPool
from op_analytics.coreutils.logger import structlog

log = structlog.get_logger()


async def fetch_pool_data(chain) -> List[LiquidityPool]:
"""
Fetch raw pool data without calling get_prices. We build a token mapping
from get_all_tokens directly, then map them into LiquidityPool objects.

Handles pagination with retries on out-of-gas errors by reducing batch size.
"""
pools = []
offset = 0
limit = chain.settings.pool_page_size

tokens = await chain.get_all_tokens(listed_only=True)
tokens_map = {t.token_address: t for t in tokens}

while True:
try:
pools_batch = await chain.sugar.functions.all(limit, offset).call()
pools.extend(pools_batch)
log.info(
"Fetched pool batch",
offset=offset,
batch_size=len(pools_batch),
total_pools=len(pools),
)
if len(pools_batch) < limit:
break
offset += limit

except Exception as exc:
Comment thread
dioptx marked this conversation as resolved.
Outdated
error_str = str(exc)
if "out of gas" in error_str:
if limit > 1:
new_limit = max(1, limit // 2)
log.warning(
"Reducing batch size due to out of gas error",
old_size=limit,
new_size=new_limit,
)
limit = new_limit
else:
log.error("Failed to fetch pools with minimum batch size", error=error_str)
raise
else:
log.error("Unexpected error fetching pools", error=error_str)
raise

result = [LiquidityPool.from_tuple(p, tokens_map) for p in pools if p is not None]
log.info("Pool data fetch completed", total_pools=len(result))
return result
Empty file.
49 changes: 49 additions & 0 deletions src/op_analytics/datasources/sugar/prices/dynamic_prices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import List, Any

from op_analytics.coreutils.logger import structlog

log = structlog.get_logger()


async def fetch_prices_with_retry(
Comment thread
dioptx marked this conversation as resolved.
Outdated
chain: Any, tokens: List[Any], initial_batch_size: int = 40
) -> List[Any]:
"""
Fetch prices for a list of tokens using dynamic batch sizing.

Retries with a reduced batch size if known errors occur.
"""
prices = []
index = 0
current_batch_size = initial_batch_size

while index < len(tokens):
Comment thread
dioptx marked this conversation as resolved.
Outdated
token_chunk = tokens[index : index + current_batch_size]
try:
batch_prices = await chain.get_prices(token_chunk)
prices.extend(batch_prices)
log.info(
"Fetched token prices",
start=index,
end=index + current_batch_size,
total=len(tokens),
)
index += current_batch_size
except Exception as exc:
error_str = str(exc)
if "out of gas" in error_str or "0x3445e17c" in error_str:
Comment thread
dioptx marked this conversation as resolved.
Outdated
if current_batch_size > 1:
log.warning(
"Reducing batch size due to error",
error=error_str,
old_size=current_batch_size,
new_size=current_batch_size // 2,
)
current_batch_size = max(1, current_batch_size // 2)
else:
log.error("Skipping token due to persistent error", error=error_str)
index += 1
else:
log.error("Unexpected error fetching prices", error=error_str)
raise
return [p for p in prices if p is not None]