diff --git a/notebooks/adhoc/coingecko/adhoc_price_data_run.ipynb b/notebooks/adhoc/coingecko/adhoc_price_data_run.ipynb index b1238fea73b..3c7806677c8 100644 --- a/notebooks/adhoc/coingecko/adhoc_price_data_run.ipynb +++ b/notebooks/adhoc/coingecko/adhoc_price_data_run.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -20,19 +20,6 @@ "from op_analytics.coreutils.request import new_session" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# 1. Load chain metadata to get token IDs\n", - "from op_analytics.datapipeline.chains.load import load_chain_metadata\n", - "\n", - "# Load chain metadata\n", - "chain_metadata = load_chain_metadata()\n" - ] - }, { "cell_type": "code", "execution_count": null, @@ -174,8 +161,9 @@ "source": [ "with write_to_prod():\n", " # Run the full pipeline, including extra tokens\n", - " result = execute_pull(days=365, extra_token_ids_file=extra_token_ids_file, include_top_tokens=25, fetch_metadata=True)\n", + " result = execute_pull(days=365, extra_token_ids_file=extra_token_ids_file, include_top_tokens=0, fetch_metadata=False, skip_existing_partitions=False)\n", " # result = execute_pull(days=365, fetch_metadata=True)\n", + " # result = execute_pull(days=365, skip_existing_partitions=True, fetch_metadata=False, token_id='ethereum')\n", " #Metadata Only\n", " # result = execute_metadata_pull(extra_token_ids_file=extra_token_ids_file, include_top_tokens=25)\n", " # result = execute_metadata_pull()" @@ -198,6 +186,30 @@ "source": [ "print(result)" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 1. Load chain metadata to get token IDs\n", + "from op_analytics.datapipeline.chains.load import load_chain_metadata\n", + "\n", + "# Load chain metadata\n", + "chain_metadata = load_chain_metadata()\n", + "\n", + "# Get unique non-null CoinGecko API keys\n", + "token_ids = (\n", + " chain_metadata.filter(pl.col(\"cgt_coingecko_api\").is_not_null())\n", + " .select(\"cgt_coingecko_api\")\n", + " .unique()\n", + " .to_series()\n", + " .to_list()\n", + ")\n", + "\n", + "print(f\"Found {len(token_ids)} unique tokens with CoinGecko API keys\")" + ] } ], "metadata": { diff --git a/notebooks/adhoc/defillama/defillama_price_puller_example.ipynb b/notebooks/adhoc/defillama/defillama_price_puller_example.ipynb new file mode 100644 index 00000000000..4addd1ae8e1 --- /dev/null +++ b/notebooks/adhoc/defillama/defillama_price_puller_example.ipynb @@ -0,0 +1,316 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "vscode": { + "languageId": "raw" + } + }, + "source": [ + "# DeFiLlama Price Puller Example\n", + "\n", + "This notebook demonstrates how to use the new DeFiLlama price puller functionality.\n", + "\n", + "The DeFiLlama price puller supports:\n", + "- Current prices\n", + "- Historical prices with flexible date ranges\n", + "- First recorded prices\n", + "- Both CoinGecko slugs and chain:address format\n", + "- Custom token lists from files\n", + "- Gas tokens from chain metadata\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "sys.path.append('../../..')\n", + "\n", + "from src.op_analytics.datasources.defillama.tokenprice.execute import (\n", + " execute_pull_current,\n", + " execute_pull_historical,\n", + " execute_pull_first_prices,\n", + ")\n", + "from src.op_analytics.datasources.defillama.tokenprice.price_data import DefiLlamaTokenPrices\n", + "from op_analytics.datapipeline.chains.tokens import get_token_ids_from_metadata\n", + "from op_analytics.coreutils.request import new_session\n", + "from datetime import datetime, timedelta\n", + "import polars as pl\n", + "\n", + "# Optional: Set up logging\n", + "import logging\n", + "logging.basicConfig(level=logging.INFO)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "vscode": { + "languageId": "raw" + } + }, + "source": [ + "## 1. Get Token IDs from Chain Metadata\n", + "\n", + "This will get both CoinGecko slugs and gas tokens from the chain metadata:\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[2m2025-07-09 15:33:46\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mloaded vault from .env file \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mvault.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m32\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m\n", + "\u001b[2m2025-07-09 15:33:46\u001b[0m [\u001b[32m\u001b[1mdebug \u001b[0m] \u001b[1mloaded vault: 28 items \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mvault.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m79\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m\n", + "\u001b[2m2025-07-09 15:33:46\u001b[0m [\u001b[32m\u001b[1mdebug \u001b[0m] \u001b[1mFound vault variable GOOGLE_SERVICE_ACCOUNT (has JSON key)\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgcpauth.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m18\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m\n", + "\u001b[2m2025-07-09 15:33:48\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mLoaded OP chains metadata from /Users/michaelsilberling/Documents/GitHub/op-analytics/op_chains_tracking/inputs/chain_metadata_raw.csv\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mload.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m87\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m\n", + "\u001b[2m2025-07-09 15:33:48\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1m[REPO vs. GSHEETS] ERROR: Chain Metadata is different\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mload.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m99\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m\n", + "\u001b[2m2025-07-09 15:33:48\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mfound_token_ids \u001b[0m \u001b[36mcoingecko_count\u001b[0m=\u001b[35m12\u001b[0m \u001b[36mcount\u001b[0m=\u001b[35m27\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mtokens.py\u001b[0m \u001b[36mgas_token_count\u001b[0m=\u001b[35m15\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m58\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m\n", + "Found 27 token IDs from chain metadata\n", + "First 10 token IDs: ['automata', 'SYS', 'RSS3', 'GSWIFT', 'ETH', 'celo', 'mantle', 'frxETH', 'rss3', 'hashkey-ecopoints']\n" + ] + } + ], + "source": [ + "# Get token IDs from chain metadata\n", + "token_ids = get_token_ids_from_metadata()\n", + "print(f\"Found {len(token_ids)} token IDs from chain metadata\")\n", + "print(\"First 10 token IDs:\", token_ids[:10])\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "vscode": { + "languageId": "raw" + } + }, + "source": [ + "## 2. Direct API Usage (Without Writing to GCS)\n", + "\n", + "Use the price data class directly for analysis:\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[2m2025-07-09 15:33:49\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mfetched_current_prices \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mprice_data.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m150\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m \u001b[36mtoken_count\u001b[0m=\u001b[35m3\u001b[0m\n", + "Fetched 3 current prices\n", + "shape: (3, 4)\n", + "┌─────────────────────────────────┬────────────┬───────────┬────────────────────────────┐\n", + "│ token_id ┆ dt ┆ price_usd ┆ last_updated │\n", + "│ --- ┆ --- ┆ --- ┆ --- │\n", + "│ str ┆ str ┆ f64 ┆ str │\n", + "╞═════════════════════════════════╪════════════╪═══════════╪════════════════════════════╡\n", + "│ bitcoin ┆ 2025-07-09 ┆ 109718.0 ┆ 2025-07-09T15:33:49.638084 │\n", + "│ ethereum ┆ 2025-07-09 ┆ 2723.41 ┆ 2025-07-09T15:33:49.638084 │\n", + "│ ethereum:0xa0b86991c6218b36c1d… ┆ 2025-07-09 ┆ 0.999902 ┆ 2025-07-09T15:33:49.638084 │\n", + "└─────────────────────────────────┴────────────┴───────────┴────────────────────────────┘\n", + "\u001b[2m2025-07-09 15:33:51\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mfetched_historical_prices \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mprice_data.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m305\u001b[0m \u001b[36mparams\u001b[0m=\u001b[35m{'start': 1751484829, 'span': 7}\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m \u001b[36mtoken_count\u001b[0m=\u001b[35m3\u001b[0m\n", + "\\nFetched 21 historical prices\n", + "shape: (5, 4)\n", + "┌─────────────────────────────────┬────────────┬───────────┬────────────────────────────┐\n", + "│ token_id ┆ dt ┆ price_usd ┆ last_updated │\n", + "│ --- ┆ --- ┆ --- ┆ --- │\n", + "│ str ┆ str ┆ f64 ┆ str │\n", + "╞═════════════════════════════════╪════════════╪═══════════╪════════════════════════════╡\n", + "│ ethereum:0xa0b86991c6218b36c1d… ┆ 2025-07-02 ┆ 0.999864 ┆ 2025-07-09T15:33:51.155860 │\n", + "│ ethereum:0xa0b86991c6218b36c1d… ┆ 2025-07-03 ┆ 0.999901 ┆ 2025-07-09T15:33:51.155870 │\n", + "│ ethereum:0xa0b86991c6218b36c1d… ┆ 2025-07-04 ┆ 0.999899 ┆ 2025-07-09T15:33:51.155876 │\n", + "│ ethereum:0xa0b86991c6218b36c1d… ┆ 2025-07-05 ┆ 0.999806 ┆ 2025-07-09T15:33:51.155880 │\n", + "│ ethereum:0xa0b86991c6218b36c1d… ┆ 2025-07-06 ┆ 0.9999 ┆ 2025-07-09T15:33:51.155883 │\n", + "└─────────────────────────────────┴────────────┴───────────┴────────────────────────────┘\n", + "\\nHistorical prices sorted by token and date:\n", + "shape: (21, 4)\n", + "┌─────────────────────────────────┬────────────┬───────────────┬────────────────────────────┐\n", + "│ token_id ┆ dt ┆ price_usd ┆ last_updated │\n", + "│ --- ┆ --- ┆ --- ┆ --- │\n", + "│ str ┆ str ┆ f64 ┆ str │\n", + "╞═════════════════════════════════╪════════════╪═══════════════╪════════════════════════════╡\n", + "│ bitcoin ┆ 2025-07-02 ┆ 109564.775607 ┆ 2025-07-09T15:33:51.155895 │\n", + "│ bitcoin ┆ 2025-07-03 ┆ 109889.0 ┆ 2025-07-09T15:33:51.155899 │\n", + "│ bitcoin ┆ 2025-07-04 ┆ 107341.0 ┆ 2025-07-09T15:33:51.155902 │\n", + "│ bitcoin ┆ 2025-07-05 ┆ 108050.0 ┆ 2025-07-09T15:33:51.155906 │\n", + "│ bitcoin ┆ 2025-07-06 ┆ 108494.0 ┆ 2025-07-09T15:33:51.155912 │\n", + "│ … ┆ … ┆ … ┆ … │\n", + "│ ethereum:0xa0b86991c6218b36c1d… ┆ 2025-07-04 ┆ 0.999899 ┆ 2025-07-09T15:33:51.155876 │\n", + "│ ethereum:0xa0b86991c6218b36c1d… ┆ 2025-07-05 ┆ 0.999806 ┆ 2025-07-09T15:33:51.155880 │\n", + "│ ethereum:0xa0b86991c6218b36c1d… ┆ 2025-07-06 ┆ 0.9999 ┆ 2025-07-09T15:33:51.155883 │\n", + "│ ethereum:0xa0b86991c6218b36c1d… ┆ 2025-07-07 ┆ 0.999898 ┆ 2025-07-09T15:33:51.155887 │\n", + "│ ethereum:0xa0b86991c6218b36c1d… ┆ 2025-07-08 ┆ 0.999865 ┆ 2025-07-09T15:33:51.155891 │\n", + "└─────────────────────────────────┴────────────┴───────────────┴────────────────────────────┘\n", + "\\nPrice summary:\n", + "shape: (3, 7)\n", + "┌──────────────────┬───────────┬───────────┬───────────────┬─────────────┬────────────┬────────────┐\n", + "│ token_id ┆ min_price ┆ max_price ┆ avg_price ┆ data_points ┆ min_dt ┆ max_dt │\n", + "│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │\n", + "│ str ┆ f64 ┆ f64 ┆ f64 ┆ u32 ┆ str ┆ str │\n", + "╞══════════════════╪═══════════╪═══════════╪═══════════════╪═════════════╪════════════╪════════════╡\n", + "│ ethereum:0xa0b86 ┆ 0.999806 ┆ 0.999901 ┆ 0.999876 ┆ 7 ┆ 2025-07-02 ┆ 2025-07-08 │\n", + "│ 991c6218b36c1d… ┆ ┆ ┆ ┆ ┆ ┆ │\n", + "│ bitcoin ┆ 107341.0 ┆ 109889.0 ┆ 108644.110801 ┆ 7 ┆ 2025-07-02 ┆ 2025-07-08 │\n", + "│ ethereum ┆ 2478.73 ┆ 2609.7 ┆ 2549.291429 ┆ 7 ┆ 2025-07-02 ┆ 2025-07-08 │\n", + "└──────────────────┴───────────┴───────────┴───────────────┴─────────────┴────────────┴────────────┘\n" + ] + } + ], + "source": [ + "session = new_session()\n", + "\n", + "# Get current prices for multiple tokens\n", + "tokens = [\"bitcoin\", \"ethereum\", \"ethereum:0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48\"]\n", + "price_data = DefiLlamaTokenPrices.fetch_prices_current(tokens, session=session)\n", + "\n", + "print(f\"Fetched {len(price_data)} current prices\")\n", + "print(price_data.df.head())\n", + "\n", + "# Get historical prices for the last 7 days \n", + "# This uses start timestamp + span=days to get exactly 7 daily data points from the start date\n", + "price_data = DefiLlamaTokenPrices.fetch_prices_by_days(\n", + " token_ids=tokens,\n", + " days=7,\n", + " session=session\n", + ")\n", + "\n", + "print(f\"\\\\nFetched {len(price_data)} historical prices\")\n", + "print(price_data.df.head())\n", + "\n", + "# Sort by token_id and date to see the progression\n", + "print(\"\\\\nHistorical prices sorted by token and date:\")\n", + "historical_sorted = price_data.df.sort([\"token_id\", \"dt\"])\n", + "print(historical_sorted)\n", + "\n", + "# Basic analysis\n", + "print(\"\\\\nPrice summary:\")\n", + "print(price_data.df.group_by(\"token_id\").agg([\n", + " pl.col(\"price_usd\").min().alias(\"min_price\"),\n", + " pl.col(\"price_usd\").max().alias(\"max_price\"),\n", + " pl.col(\"price_usd\").mean().alias(\"avg_price\"),\n", + " pl.col(\"dt\").count().alias(\"data_points\"),\n", + " pl.col(\"dt\").min().alias(\"min_dt\"),\n", + " pl.col(\"dt\").max().alias(\"max_dt\"),\n", + "]))\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[2m2025-07-09 15:38:27\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mfound_token_ids \u001b[0m \u001b[36mcoingecko_count\u001b[0m=\u001b[35m12\u001b[0m \u001b[36mcount\u001b[0m=\u001b[35m27\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mtokens.py\u001b[0m \u001b[36mgas_token_count\u001b[0m=\u001b[35m15\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m58\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m\n", + "\u001b[2m2025-07-09 15:38:27\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mfinal_token_ids \u001b[0m \u001b[36mcount\u001b[0m=\u001b[35m27\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mtokens.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m154\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m\n", + "\u001b[2m2025-07-09 15:38:27\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mhistorical_price_pull_start \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mexecute.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m136\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m \u001b[36mtoken_count\u001b[0m=\u001b[35m27\u001b[0m\n", + "\u001b[2m2025-07-09 15:38:27\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mfetching_historical_prices \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mexecute.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m144\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m \u001b[36mtoken_count\u001b[0m=\u001b[35m27\u001b[0m\n", + "\u001b[2m2025-07-09 15:40:58\u001b[0m [\u001b[31m\u001b[1merror \u001b[0m] \u001b[1mretrying exception HTTPSConnectionPool(host='pro-api.llama.fi', port=443): Max retries exceeded with url: /slPjmq113xSROlwRStlysKOhP0coMlgQkcPd0lgLMV3sL316g8l8CQ/coins/chart/coingecko:automata,coingecko:SYS,coingecko:RSS3,coingecko:GSWIFT,coingecko:ETH,coingecko:celo,coingecko:mantle,coingecko:frxETH,coingecko:rss3,coingecko:hashkey-ecopoints,coingecko:frax-ether,coingecko:BNB,coingecko:ATA,coingecko:unknown,coingecko:syscoin,coingecko:BNRY,coingecko:gameswift,coingecko:binary-holdings,coingecko:SEXY,coingecko:settled-ethxy-token,coingecko:CELO,coingecko:binancecoin,coingecko:HSK,coingecko:MNT,coingecko:clearpool,coingecko:UNITE,coingecko:CPOOL?start=1720553907&span=365 (Caused by ResponseError('too many 504 error responses'))\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mrequest.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m30\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m\n", + "\u001b[2m2025-07-09 15:40:58\u001b[0m [\u001b[33m\u001b[1mwarning \u001b[0m] \u001b[1mstamina.retry_scheduled \u001b[0m \u001b[36margs\u001b[0m=\u001b[35m()\u001b[0m \u001b[36mcallable\u001b[0m=\u001b[35m''\u001b[0m \u001b[36mcaused_by\u001b[0m=\u001b[35m'RetryError(MaxRetryError(\"HTTPSConnectionPool(host=\\'pro-api.llama.fi\\', port=443): Max retries exceeded with url: /slPjmq113xSROlwRStlysKOhP0coMlgQkcPd0lgLMV3sL316g8l8CQ/coins/chart/coingecko:automata,coingecko:SYS,coingecko:RSS3,coingecko:GSWIFT,coingecko:ETH,coingecko:celo,coingecko:mantle,coingecko:frxETH,coingecko:rss3,coingecko:hashkey-ecopoints,coingecko:frax-ether,coingecko:BNB,coingecko:ATA,coingecko:unknown,coingecko:syscoin,coingecko:BNRY,coingecko:gameswift,coingecko:binary-holdings,coingecko:SEXY,coingecko:settled-ethxy-token,coingecko:CELO,coingecko:binancecoin,coingecko:HSK,coingecko:MNT,coingecko:clearpool,coingecko:UNITE,coingecko:CPOOL?start=1720553907&span=365 (Caused by ResponseError(\\'too many 504 error responses\\'))\"))'\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35m_structlog.py\u001b[0m \u001b[36mkwargs\u001b[0m=\u001b[35m{}\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m21\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m \u001b[36mretry_num\u001b[0m=\u001b[35m1\u001b[0m \u001b[36mwait_for\u001b[0m=\u001b[35m10.22\u001b[0m \u001b[36mwaited_so_far\u001b[0m=\u001b[35m0.0\u001b[0m\n", + "\u001b[2m2025-07-09 15:41:08\u001b[0m [\u001b[33m\u001b[1mwarning \u001b[0m] \u001b[1mretry attempt https://pro-api.llama.fi/slPjmq113xSROlwRStlysKOhP0coMlgQkcPd0lgLMV3sL316g8l8CQ/coins/chart/coingecko:automata,coingecko:SYS,coingecko:RSS3,coingecko:GSWIFT,coingecko:ETH,coingecko:celo,coingecko:mantle,coingecko:frxETH,coingecko:rss3,coingecko:hashkey-ecopoints,coingecko:frax-ether,coingecko:BNB,coingecko:ATA,coingecko:unknown,coingecko:syscoin,coingecko:BNRY,coingecko:gameswift,coingecko:binary-holdings,coingecko:SEXY,coingecko:settled-ethxy-token,coingecko:CELO,coingecko:binancecoin,coingecko:HSK,coingecko:MNT,coingecko:clearpool,coingecko:UNITE,coingecko:CPOOL\u001b[0m \u001b[36mattempt\u001b[0m=\u001b[35m2\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mrequest.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m75\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m43292\u001b[0m\n" + ] + } + ], + "source": [ + "# Overall run\n", + "from op_analytics.coreutils.partitioned.dailydatawrite import write_to_prod\n", + "\n", + "# Path to your config file\n", + "extra_token_ids_file = \"../../../src/op_analytics/datasources/coingecko/config/extra_token_ids.txt\"\n", + "with write_to_prod():\n", + " result = execute_pull_historical(days=365)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "vscode": { + "languageId": "raw" + } + }, + "source": [ + "## Summary\n", + "\n", + "The DeFiLlama price puller provides:\n", + "\n", + "1. **Three main execution modes**: `current`, `historical`, and `first`\n", + "2. **Flexible token identification**: CoinGecko slugs, chain:address format, custom files\n", + "3. **Automatic chain metadata integration**: Pulls gas tokens and CoinGecko IDs from your chain metadata\n", + "4. **Full DeFiLlama API parameter support**: timestamps, periods, spans, search width\n", + "5. **GCS integration**: Writes data to your existing data pipeline\n", + "6. **Direct API access**: Use the price data classes for analysis without writing to GCS\n", + "\n", + "### Command Line Usage:\n", + "\n", + "```bash\n", + "# Current prices\n", + "python -m src.op_analytics.datasources.defillama.tokenprice.execute --mode current\n", + "\n", + "# Historical prices (last 30 days)\n", + "python -m src.op_analytics.datasources.defillama.tokenprice.execute --mode historical --days 30\n", + "\n", + "# Custom historical with timestamps\n", + "python -m src.op_analytics.datasources.defillama.tokenprice.execute --mode historical --start-timestamp 1704067200 --period 1h\n", + "\n", + "# Single token\n", + "python -m src.op_analytics.datasources.defillama.tokenprice.execute --mode current --token-id bitcoin\n", + "\n", + "# With custom token file\n", + "python -m src.op_analytics.datasources.defillama.tokenprice.execute --mode current --extra-token-ids-file /path/to/tokens.txt\n", + "```\n", + "\n", + "### Key Features:\n", + "\n", + "- **Support for both formats**: CoinGecko slugs (e.g., \"bitcoin\") and chain:address format (e.g., \"ethereum:0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48\")\n", + "- **Automatic gas token detection**: Pulls gas tokens from your chain metadata\n", + "- **Flexible time ranges**: Use days, timestamps, or first recorded prices\n", + "- **Custom token lists**: Load tokens from CSV or TXT files\n", + "- **Skip existing partitions**: Efficient backfilling with `--skip-existing-partitions`\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/notebooks/adhoc/dune/dune_uni_lm.ipynb b/notebooks/adhoc/dune/dune_uni_lm.ipynb index 941ab9c3cdd..07709e1b347 100644 --- a/notebooks/adhoc/dune/dune_uni_lm.ipynb +++ b/notebooks/adhoc/dune/dune_uni_lm.ipynb @@ -30,7 +30,7 @@ "\n", "from op_analytics.datasources.dune.unichain_lm import DuneUniLMSummary\n", "\n", - "# result = DuneUniLMSummary.fetch()" + "result = DuneUniLMSummary.fetch()" ] }, { @@ -44,7 +44,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ @@ -68,7 +68,7 @@ "\n", "from op_analytics.coreutils.partitioned.location import DataLocation\n", "\n", - "# read_df = Dune.UNI_LM_2025.read_polars(min_date=\"2025-05-07\", max_date=\"2025-05-11\", location=DataLocation.LOCAL)" + "read_df = Dune.UNI_LM_2025.read_polars(min_date=\"2025-05-07\", max_date=\"2025-05-11\", location=DataLocation.LOCAL)" ] }, { diff --git a/op_chains_tracking/inputs/chain_metadata_raw.csv b/op_chains_tracking/inputs/chain_metadata_raw.csv index ac55df43b53..1381a2821c0 100644 --- a/op_chains_tracking/inputs/chain_metadata_raw.csv +++ b/op_chains_tracking/inputs/chain_metadata_raw.csv @@ -16,7 +16,7 @@ campaign,Camp Network,,,,modern,Standard,,,,yes,,,https://www.campnetwork.xyz/,, ozean,Ozean,,,,,Standard,,,,yes,Caldera,,https://ozean.finance/,,,,ozean,,CPOOL,clearpool,L2,2,ethereum,ethereum,,,,,,,,,,,,,, soneium,Soneium,1868,5E5E5F,1/14/25,modern,Standard,12/2/24,,,yes,Self-Hosted,https://rpc.soneium.org,https://soneium.org/,https://soneium.blockscout.com/,,Soneium,soneium,,ETH,,L2,2,ethereum,ethereum,0x7A8Ed66B319911A0F3E7288BDdAB30d9c0C875c3,soneium,,soneium,,,,0x6776BE80dBAda6A02B5F2095cF13734ac303B8d1,0x008dC74CecC9dedA8595B2Fe210cE5979F0BfA8e,0x400c164C4a8cA84385B70EEd6eB03ea847c8E1b8,,0xeb9bf100225c214Efc3E7C651ebbaDcF85177607,0x88e529A6ccd302c948689Cd5156C83D4614FAE92,0x512A3d2c7a43BD9261d2B8E8C9c70D4bd4D503C0 op,OP Mainnet,10,FF0420,11/11/21,modern,OP Mainnet,6/23/21,,6/23/21,no,OP Mainnet,https://mainnet.optimism.io,https://optimism.io,https://explorer.optimism.io/,,"Optimism,OP Mainnet","optimism,op-mainnet",optimism,ETH,,L2,2,ethereum,ethereum,0x229047fed2591dbec1eF1118d64F7aF3dB9EB290,op,op_sepolia,optimism,optimism,optimism,optimism,0x6887246668a3b87F54DeB3b94Ba47a6f63F32985,0xff00000000000000000000000000000000000010,0x473300df21D047806A082244b417f96b32f13A33,0xdfe97868233d1aa22e815a266982f2cf17685a27,0x99C9fc46f92E8a1c0deC1b1747d010903E884bE1,,0xe5965Ab5962eDc7477C8520243A95517CD252fA9 -fraxtal,Fraxtal,252,979797,3/11/24,modern,Non-Standard,2/1/24,,,yes,Self-Hosted,https://rpc.frax.com,https://www.frax.com/,,,Fraxtal,fraxtal,,"{0:frxETH,19571245:frax}","{0:frax-ether,19571245:frax-share}",L2,2,fraxtalda,ethereum,0x34a9f273cbD847d49c3De015FC26c3E66825f8b2,fraxtal,,frax,,,frax,0x6017f75108f251a488B045A7ce2a7C15b179d1f2,0xfF000000000000000000000000000000000420fC,0xFb90465f3064fF63FC460F01A6307eC73d64bc50,0x66CC916Ed5C6C2FA97014f7D1cD141528Ae171e4,0x34C0bD5877A5Ee7099D0f5688D65F4bB9158BDE2,, +fraxtal,Fraxtal,252,979797,3/11/24,modern,Non-Standard,2/1/24,,,yes,Self-Hosted,https://rpc.frax.com,https://www.frax.com/,,,Fraxtal,fraxtal,,"{0:""frxETH"",19571245:""FRAX""}","{0:""frax-ether"",19571245:""frax-share""}",L2,2,fraxtalda,ethereum,0x34a9f273cbD847d49c3De015FC26c3E66825f8b2,fraxtal,,frax,,,frax,0x6017f75108f251a488B045A7ce2a7C15b179d1f2,0xfF000000000000000000000000000000000420fC,0xFb90465f3064fF63FC460F01A6307eC73d64bc50,0x66CC916Ed5C6C2FA97014f7D1cD141528Ae171e4,0x34C0bD5877A5Ee7099D0f5688D65F4bB9158BDE2,, redstone,Redstone,690,f34242,5/1/24,modern,Non-Standard,5/1/24,,,yes,Self-Hosted,https://rpc.redstonechain.com,https://redstone.xyz/,,,Redstone,redstone,redstone,ETH,,L2,2,op-plasma,ethereum,0x8f2428F7189c0d92D1c4a5358903A8c80Ec6a69D,redstone,,redstone,,,,0xA31cb9Bc414601171D4537580f98F66C03aECd43,0xff00000000000000000000000000000000000690,,0xa426A052f657AEEefc298b3B5c35a470e4739d69,0xc473ca7E02af24c129c2eEf51F2aDf0411c1Df69,, cyber,Cyber,7560,00D82F,5/15/24,modern,Non-Standard,5/15/24,,,yes,AltLayer,https://cyber.alt.technology/,https://cyber.co/,,,Cyber,cyber,,ETH,,L2,2,eigenda,ethereum,0x5D1F4bbaF6D484fA9D5D9705f92dE6063bff6055,cyber,,cyber,,,cyber,0xf0748C52EDC23135d9845CDFB91279Cf61ee14b4,0xfF00000000000000000000000000000000001d88,0xF2987f0A626c8D29dFB2E0A21144ca3026d6F1E1,0xa669A743b065828682eE16109273F5CFeF5e676d,0x12a580c05466eefb2c467C6b115844cDaF55B255,, kroma,Kroma,255,7fe342,9/6/23,modern,Non-Standard,5/29/24,,,yes,Self-Hosted,https://api.kroma.network/,https://kroma.network/,,,Kroma,kroma,,ETH,,L2,2,ethereum,ethereum,,kroma,,kroma,,,,0x41b8cD6791De4D8f9E0eaF7861aC506822AdcE12,0xfF00000000000000000000000000000000000255,0x,0x180c77aE51a9c505a43A2C7D81f8CE70cacb93A6,0x827962404D7104202C5aaa6b929115C8211d9596,, diff --git a/reference_data/historical_dumps/backfill_market_data.ipynb b/reference_data/historical_dumps/backfill_market_data.ipynb index 55e60e01b93..31a76bdf64a 100644 --- a/reference_data/historical_dumps/backfill_market_data.ipynb +++ b/reference_data/historical_dumps/backfill_market_data.ipynb @@ -2,9 +2,29 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "ERROR:yfinance:Failed to get ticker 'ETH-USD' reason: Expecting value: line 1 column 1 (char 0)\n", + "ERROR:yfinance:ETH-USD: No timezone found, symbol may be delisted\n" + ] + }, + { + "ename": "AttributeError", + "evalue": "'Index' object has no attribute 'tz_convert'", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mAttributeError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[4], line 18\u001b[0m\n\u001b[1;32m 14\u001b[0m hist \u001b[38;5;241m=\u001b[39m eth\u001b[38;5;241m.\u001b[39mhistory(period\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mmax\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[1;32m 16\u001b[0m \u001b[38;5;66;03m# Create a DataFrame with date and average daily price\u001b[39;00m\n\u001b[1;32m 17\u001b[0m df \u001b[38;5;241m=\u001b[39m pd\u001b[38;5;241m.\u001b[39mDataFrame({\n\u001b[0;32m---> 18\u001b[0m \u001b[38;5;124m'\u001b[39m\u001b[38;5;124mtimestamp\u001b[39m\u001b[38;5;124m'\u001b[39m: \u001b[43mhist\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mindex\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mtz_convert\u001b[49m(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mUTC\u001b[39m\u001b[38;5;124m'\u001b[39m)\u001b[38;5;241m.\u001b[39mtz_localize(\u001b[38;5;28;01mNone\u001b[39;00m), \u001b[38;5;66;03m# Convert to UTC then remove timezone info\u001b[39;00m\n\u001b[1;32m 19\u001b[0m \u001b[38;5;124m'\u001b[39m\u001b[38;5;124meth_usd\u001b[39m\u001b[38;5;124m'\u001b[39m: (hist[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mHigh\u001b[39m\u001b[38;5;124m'\u001b[39m] \u001b[38;5;241m+\u001b[39m hist[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mLow\u001b[39m\u001b[38;5;124m'\u001b[39m]) \u001b[38;5;241m/\u001b[39m \u001b[38;5;241m2\u001b[39m\n\u001b[1;32m 20\u001b[0m })\n\u001b[1;32m 22\u001b[0m \u001b[38;5;66;03m# Reset index to make Date a column\u001b[39;00m\n\u001b[1;32m 23\u001b[0m df \u001b[38;5;241m=\u001b[39m df\u001b[38;5;241m.\u001b[39mreset_index(drop\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mTrue\u001b[39;00m)\n", + "\u001b[0;31mAttributeError\u001b[0m: 'Index' object has no attribute 'tz_convert'" + ] + } + ], "source": [ "import pandas as pd\n", "import yfinance as yf\n", diff --git a/src/op_analytics/coreutils/request.py b/src/op_analytics/coreutils/request.py index 79877a3a771..8df2a649059 100644 --- a/src/op_analytics/coreutils/request.py +++ b/src/op_analytics/coreutils/request.py @@ -1,8 +1,8 @@ import time +import re from urllib3.util.retry import Retry import requests -import stamina from requests.adapters import HTTPAdapter from typing import Any from op_analytics.coreutils.logger import structlog @@ -17,6 +17,40 @@ ) +def mask_api_key_in_text(text: str) -> str: + """ + Mask API keys in any text (error messages, URLs, etc.). + + This function looks for common API key patterns and masks them for secure logging. + + Args: + text: Text that may contain API keys + + Returns: + Text with API keys masked + """ + # Pattern to find DeFiLlama API keys + defillama_pattern = r"(https://pro-api\.llama\.fi/)([^/\s]+)" + + def replace_defillama_match(match): + base_url, api_key = match.groups() + if len(api_key) > 8: + masked_key = api_key[:4] + "*" * (len(api_key) - 8) + api_key[-4:] + else: + masked_key = "*" * len(api_key) + return f"{base_url}{masked_key}" + + # Apply DeFiLlama masking + text = re.sub(defillama_pattern, replace_defillama_match, text) + + # Add more patterns here for other APIs as needed + # Example for generic API keys in URLs: + # generic_pattern = r"(https?://[^/]+/)([a-zA-Z0-9]{20,})" + # text = re.sub(generic_pattern, replace_generic_match, text) + + return text + + def new_session() -> requests.Session: session = requests.Session() adapter = HTTPAdapter(max_retries=DEFAULT_RETRY_STRATEGY) @@ -26,11 +60,6 @@ def new_session() -> requests.Session: return session -def retry_logger(exc: Exception) -> bool: - log.error(f"retrying exception {exc}") - return True - - def get_data( session: requests.Session, url: str, @@ -62,17 +91,18 @@ def get_data( timeout=timeout, ) - # Retry on exceptions. - for attempt in stamina.retry_context( - on=retry_logger, - attempts=retry_attempts, - timeout=retries_timeout, - wait_initial=retries_wait_initial, - wait_max=retries_wait_max, - ): - with attempt: - if attempt.num > 1: - log.warning(f"retry attempt {url}", attempt=attempt.num) + # Custom retry logic to avoid stamina's API key exposure + last_exception = None + wait_time = retries_wait_initial + total_wait_time = 0 + + for attempt in range(1, retry_attempts + 1): + try: + if attempt > 1: + # Mask API keys in the URL before logging retry attempts + masked_url = mask_api_key_in_text(url) + log.warning(f"retry attempt {masked_url}", attempt=attempt) + return _get_data( session=session, url=url, @@ -82,6 +112,49 @@ def get_data( timeout=timeout, ) + except Exception as e: + last_exception = e + + # Mask API keys in the exception message before logging + masked_error = mask_api_key_in_text(str(e)) + log.error(f"retrying exception {masked_error}") + + # If this is the last attempt, don't wait + if attempt == retry_attempts: + break + + # Check if we've exceeded the total timeout + if total_wait_time >= retries_timeout: + log.error( + "retry timeout exceeded", + total_wait_time=total_wait_time, + retries_timeout=retries_timeout, + ) + break + + # Wait before the next attempt + actual_wait = min(wait_time, retries_wait_max) + if total_wait_time + actual_wait > retries_timeout: + actual_wait = retries_timeout - total_wait_time + + if actual_wait > 0: + log.warning( + f"waiting {actual_wait}s before retry", attempt=attempt, wait_time=actual_wait + ) + time.sleep(actual_wait) + total_wait_time += actual_wait + + # Exponential backoff + wait_time *= 2 + + # If we get here, all attempts failed + if last_exception: + # Mask API keys in the final exception message + masked_error = mask_api_key_in_text(str(last_exception)) + raise type(last_exception)(masked_error) from last_exception + else: + raise Exception("All retry attempts failed") + def _get_data( session: requests.Session, @@ -97,8 +170,12 @@ def _get_data( resp.raise_for_status() if resp.status_code != 200: - raise Exception(f"status={resp.status_code}, url={url!r}") + # Mask API keys in the URL before including in exception + masked_url = mask_api_key_in_text(url) + raise Exception(f"status={resp.status_code}, url={masked_url!r}") if emit_log: - log.info(f"Fetched from {url}: {time.time() - start:.2f} seconds") + # Mask API keys in the URL before logging + masked_url = mask_api_key_in_text(url) + log.info(f"Fetched from {masked_url}: {time.time() - start:.2f} seconds") return resp.json() diff --git a/src/op_analytics/datapipeline/chains/tokens.py b/src/op_analytics/datapipeline/chains/tokens.py new file mode 100644 index 00000000000..78c6f4b0e0a --- /dev/null +++ b/src/op_analytics/datapipeline/chains/tokens.py @@ -0,0 +1,276 @@ +""" +Token utilities for chain metadata. + +This module provides functions to extract token IDs from chain metadata +for use across different price data sources (CoinGecko, DeFiLlama, etc.). +""" + +from typing import List +import os +import csv +import json + +import polars as pl + +from op_analytics.coreutils.logger import structlog +from .load import load_chain_metadata + +log = structlog.get_logger() + + +def get_token_ids_from_metadata() -> List[str]: + """ + Get list of token IDs from the chain metadata. + + Extracts both CoinGecko API IDs and gas token IDs from the chain metadata. + Handles both simple string format and dictionary format for cgt_coingecko_api. + + Returns: + List of token IDs (could be CoinGecko slugs or chain:address format) + """ + # Load chain metadata + chain_metadata = load_chain_metadata() + + # Get token IDs from chain metadata - check both coingecko and gas_token fields + token_ids: List[str] = [] + + # Get CoinGecko IDs - handle both string and dictionary formats + coingecko_rows = ( + chain_metadata.filter(pl.col("cgt_coingecko_api").is_not_null()) + .select("cgt_coingecko_api") + .unique() + .to_series() + .to_list() + ) + + for api_value in coingecko_rows: + if api_value: + # Try to parse as JSON dictionary first + try: + parsed_dict = json.loads(api_value) + if isinstance(parsed_dict, dict): + # Extract all values from the dictionary (these are the coingecko API IDs) + token_ids.extend(parsed_dict.values()) + log.info( + "parsed_dictionary_format", + api_value=api_value, + extracted_ids=list(parsed_dict.values()), + ) + else: + # If it's not a dict, treat as simple string + token_ids.append(api_value) + except (json.JSONDecodeError, TypeError): + # If JSON parsing fails, treat as simple string + token_ids.append(api_value) + + # Remove duplicates + token_ids = list(set(token_ids)) + + log.info("found_token_ids", count=len(token_ids)) + return token_ids + + +def read_token_ids_from_file(filepath: str) -> List[str]: + """ + Read token IDs from a CSV or TXT file. + + For CSV files, looks for a 'token_id' column. If not found, treats the first + column as token IDs. For TXT files, assumes one token ID per line. + + Args: + filepath: Path to the file containing token IDs + + Returns: + List of token IDs + + Raises: + FileNotFoundError: If the file doesn't exist + ValueError: If the file extension is not supported + """ + if not os.path.exists(filepath): + raise FileNotFoundError(f"File not found: {filepath}") + + token_ids: set[str] = set() + ext = os.path.splitext(filepath)[1].lower() + + if ext == ".txt": + with open(filepath, "r") as f: + for line in f: + tid = line.strip() + if tid: + token_ids.add(tid) + elif ext == ".csv": + with open(filepath, newline="") as csvfile: + # First try to read as CSV with headers + reader = csv.DictReader(csvfile) + if reader.fieldnames and "token_id" in reader.fieldnames: + for row in reader: + tid = row["token_id"].strip() + if tid: + token_ids.add(tid) + else: + # fallback: treat as single-column CSV + csvfile.seek(0) + # Use a separate context to avoid type conflicts + with open(filepath, newline="") as fallback_file: + fallback_reader = csv.reader(fallback_file) + for row in fallback_reader: # type: ignore[assignment] + if row: # Check if row is not empty + tid = row[0].strip() + if tid and tid != "token_id": + token_ids.add(tid) + else: + raise ValueError(f"Unsupported file extension: {ext}") + + return list(token_ids) + + +def get_token_ids_from_metadata_and_file( + extra_token_ids_file: str | None = None, + include_top_tokens: int = 0, + top_tokens_fetcher=None, +) -> List[str]: + """ + Get unique list of token IDs from chain metadata and an optional file. + + Args: + extra_token_ids_file: Optional path to file with extra token IDs + include_top_tokens: Number of top tokens by market cap to include (0 for none) + top_tokens_fetcher: Optional function to fetch top tokens (e.g., from CoinGecko) + + Returns: + List of unique token IDs + """ + token_ids: set[str] = set(get_token_ids_from_metadata()) + + if extra_token_ids_file: + extra_ids = read_token_ids_from_file(extra_token_ids_file) + token_ids.update(extra_ids) + + if include_top_tokens > 0 and top_tokens_fetcher is not None: + try: + top_token_ids = top_tokens_fetcher(limit=include_top_tokens) + token_ids.update(top_token_ids) + log.info("Added top tokens by market cap", count=len(top_token_ids)) + except Exception as e: + log.error("Failed to fetch top tokens by market cap", error=str(e)) + # Continue without top tokens if there's an error + + result = list(token_ids) + log.info("final_token_ids", count=len(result)) + return result + + +def get_coingecko_token_for_block(chain_name: str, block_number: int) -> str | None: + """ + Get the appropriate CoinGecko token ID for a specific chain and block number. + + For chains with changing gas tokens, this function will return the correct + token ID based on the block number. + + Args: + chain_name: Name of the chain (e.g., "fraxtal") + block_number: Block number to check + + Returns: + CoinGecko token ID string, or None if not found + """ + # Load chain metadata + chain_metadata = load_chain_metadata() + + # Find the row for this chain + chain_row = ( + chain_metadata.filter(pl.col("chain_name") == chain_name) + .select("cgt_coingecko_api") + .to_series() + .to_list() + ) + + if not chain_row or not chain_row[0]: + return None + + api_value = chain_row[0] + + # Try to parse as JSON dictionary + try: + parsed_dict = json.loads(api_value) + if isinstance(parsed_dict, dict): + # Convert keys to integers for comparison + block_ranges = {int(k): v for k, v in parsed_dict.items()} + + # Find the appropriate token ID based on block number + # Get all starting blocks <= current block, then take the maximum + valid_starts = [start for start in block_ranges.keys() if start <= block_number] + if valid_starts: + latest_start = max(valid_starts) + return block_ranges[latest_start] + else: + return None + else: + # If it's not a dict, return as simple string + return api_value + except (json.JSONDecodeError, TypeError): + # If JSON parsing fails, return as simple string + return api_value + + +def get_coingecko_token_ids() -> List[str]: + """ + Get only CoinGecko token IDs from chain metadata. + Handles both string and dictionary formats. + + Returns: + List of CoinGecko token IDs + """ + chain_metadata = load_chain_metadata() + + token_ids: List[str] = [] + coingecko_rows = ( + chain_metadata.filter(pl.col("cgt_coingecko_api").is_not_null()) + .select("cgt_coingecko_api") + .unique() + .to_series() + .to_list() + ) + + for api_value in coingecko_rows: + if api_value: + # Try to parse as JSON dictionary first + try: + parsed_dict = json.loads(api_value) + if isinstance(parsed_dict, dict): + # Extract all values from the dictionary + token_ids.extend(parsed_dict.values()) + else: + # If it's not a dict, treat as simple string + token_ids.append(api_value) + except (json.JSONDecodeError, TypeError): + # If JSON parsing fails, treat as simple string + token_ids.append(api_value) + + # Remove duplicates + token_ids = list(set(token_ids)) + + log.info("found_coingecko_token_ids", count=len(token_ids)) + return token_ids + + +def get_gas_token_ids() -> List[str]: + """ + Get only gas token IDs from chain metadata. + + Returns: + List of gas token IDs + """ + chain_metadata = load_chain_metadata() + + token_ids = ( + chain_metadata.filter(pl.col("gas_token").is_not_null()) + .select("gas_token") + .unique() + .to_series() + .to_list() + ) + + log.info("found_gas_token_ids", count=len(token_ids)) + return token_ids diff --git a/src/op_analytics/datasources/coingecko/execute.py b/src/op_analytics/datasources/coingecko/execute.py index ee11f4cc0aa..04785dd79bd 100644 --- a/src/op_analytics/datasources/coingecko/execute.py +++ b/src/op_analytics/datasources/coingecko/execute.py @@ -2,9 +2,7 @@ Execute CoinGecko price data collection. """ -from typing import List, Dict, Any -import os -import csv +from typing import Dict, Any import polars as pl @@ -14,7 +12,9 @@ from op_analytics.coreutils.partitioned.dailydatautils import dt_summary from op_analytics.coreutils.request import new_session from op_analytics.coreutils.time import now_dt -from op_analytics.datapipeline.chains.load import load_chain_metadata +from op_analytics.datapipeline.chains.tokens import ( + get_token_ids_from_metadata_and_file, +) from .dataaccess import CoinGecko from .price_data import CoinGeckoDataSource @@ -23,9 +23,6 @@ log = structlog.get_logger() -PRICE_TABLE_LAST_N_DAYS = 90 - - PRICE_DF_SCHEMA = { "token_id": pl.String, "dt": pl.String, @@ -56,99 +53,22 @@ } -def get_token_ids_from_metadata() -> List[str]: +def get_coingecko_top_tokens_fetcher(session): """ - Get list of CoinGecko token IDs from the chain metadata. - - Returns: - List of CoinGecko token IDs - """ - # Load chain metadata - chain_metadata = load_chain_metadata() - - # Get token IDs from chain metadata - token_ids = ( - chain_metadata.filter(pl.col("cgt_coingecko_api").is_not_null()) - .select("cgt_coingecko_api") - .unique() - .to_series() - .to_list() - ) - - log.info("found_token_ids", count=len(token_ids)) - return token_ids - - -def read_token_ids_from_file(filepath: str) -> list[str]: - """ - Read token IDs from a CSV or TXT file. Assumes one token_id per line or a column named 'token_id'. - """ - if not os.path.exists(filepath): - raise FileNotFoundError(f"File not found: {filepath}") - token_ids: set[str] = set() - ext = os.path.splitext(filepath)[1].lower() - if ext == ".txt": - with open(filepath, "r") as f: - for line in f: - tid = line.strip() - if tid: - token_ids.add(tid) - elif ext == ".csv": - with open(filepath, newline="") as csvfile: - # First try to read as CSV with headers - reader = csv.DictReader(csvfile) - if reader.fieldnames and "token_id" in reader.fieldnames: - for row in reader: - tid = row["token_id"].strip() - if tid: - token_ids.add(tid) - else: - # fallback: treat as single-column CSV - csvfile.seek(0) - # Use a separate context to avoid type conflicts - with open(filepath, newline="") as fallback_file: - fallback_reader = csv.reader(fallback_file) - for row in fallback_reader: # type: ignore[assignment] - if row: # Check if row is not empty - tid = row[0].strip() - if tid and tid != "token_id": - token_ids.add(tid) - else: - raise ValueError(f"Unsupported file extension: {ext}") - return list(token_ids) - - -def get_token_ids_from_metadata_and_file( - extra_token_ids_file: str | None = None, include_top_tokens: int = 0 -) -> list[str]: - """ - Get unique list of CoinGecko token IDs from chain metadata and an optional file. + Create a top tokens fetcher function for CoinGecko. Args: - extra_token_ids_file: Optional path to file with extra token IDs - include_top_tokens: Number of top tokens by market cap to include (0 for none) - """ - token_ids: set[str] = set(get_token_ids_from_metadata()) + session: HTTP session to use for requests - if extra_token_ids_file: - extra_ids = read_token_ids_from_file(extra_token_ids_file) - token_ids.update(extra_ids) + Returns: + Function that fetches top tokens by market cap + """ - if include_top_tokens > 0: - # Fetch top tokens by market cap - session = new_session() + def fetcher(limit: int) -> list[str]: data_source = CoinGeckoDataSource(session=session) - try: - top_token_ids = data_source.get_top_tokens_by_market_cap(limit=include_top_tokens) - token_ids.update(top_token_ids) - log.info("Added top tokens by market cap", count=len(top_token_ids)) - except Exception as e: - log.error("Failed to fetch top tokens by market cap", error=str(e)) - # Continue without top tokens if there's an error + return data_source.get_top_tokens_by_market_cap(limit=limit) - result = list(token_ids) - log.info("final_token_ids", count=len(result)) - return result + return fetcher def _fetch_and_write_metadata( @@ -251,7 +171,14 @@ def execute_metadata_pull( log.info("single_token_mode", token_id=token_id) else: # Use normal token collection logic - token_ids = get_token_ids_from_metadata_and_file(extra_token_ids_file, include_top_tokens) + top_tokens_fetcher = ( + get_coingecko_top_tokens_fetcher(session) if include_top_tokens > 0 else None + ) + token_ids = get_token_ids_from_metadata_and_file( + extra_token_ids_file=extra_token_ids_file, + include_top_tokens=include_top_tokens, + top_tokens_fetcher=top_tokens_fetcher, + ) log.info("metadata_pull_start", token_count=len(token_ids)) @@ -300,7 +227,14 @@ def execute_pull( log.info("single_token_mode", token_id=token_id) else: # Use normal token collection logic - token_ids = get_token_ids_from_metadata_and_file(extra_token_ids_file, include_top_tokens) + top_tokens_fetcher = ( + get_coingecko_top_tokens_fetcher(session) if include_top_tokens > 0 else None + ) + token_ids = get_token_ids_from_metadata_and_file( + extra_token_ids_file=extra_token_ids_file, + include_top_tokens=include_top_tokens, + top_tokens_fetcher=top_tokens_fetcher, + ) log.info("price_pull_start", token_count=len(token_ids)) @@ -383,9 +317,7 @@ def execute_pull( # Write prices log.info("writing_price_data", original_rows=len(price_df)) - filtered_price_df = most_recent_dates( - price_df, n_dates=PRICE_TABLE_LAST_N_DAYS, date_column="dt" - ) + filtered_price_df = most_recent_dates(price_df, n_dates=days, date_column="dt") log.info( "filtered_price_data", filtered_rows=len(filtered_price_df), diff --git a/src/op_analytics/datasources/coingecko/price_data.py b/src/op_analytics/datasources/coingecko/price_data.py index 2f4da084ae9..829bb4cdb6a 100644 --- a/src/op_analytics/datasources/coingecko/price_data.py +++ b/src/op_analytics/datasources/coingecko/price_data.py @@ -193,6 +193,10 @@ def get_token_prices( "days": str(days), "interval": "daily", } + from urllib.parse import urlencode + + full_url = f"{url}?{urlencode(params)}" + print(f"Requesting URL: {full_url}") try: response = self.session.get(url, params=params) diff --git a/src/op_analytics/datasources/defillama/dataaccess.py b/src/op_analytics/datasources/defillama/dataaccess.py index 60bddf5266c..33c7582f172 100644 --- a/src/op_analytics/datasources/defillama/dataaccess.py +++ b/src/op_analytics/datasources/defillama/dataaccess.py @@ -43,3 +43,6 @@ class DefiLlama(DailyDataset): # Token Mappings TOKEN_MAPPINGS = "dim_token_mappings_v1" PROTOCOL_CATEGORY_MAPPINGS = "dim_protocol_category_mappings_v1" + + # Token Prices + TOKEN_PRICES = "token_prices_v1" diff --git a/src/op_analytics/datasources/defillama/tokenprice/__init__.py b/src/op_analytics/datasources/defillama/tokenprice/__init__.py new file mode 100644 index 00000000000..0519ecba6ea --- /dev/null +++ b/src/op_analytics/datasources/defillama/tokenprice/__init__.py @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/src/op_analytics/datasources/defillama/tokenprice/execute.py b/src/op_analytics/datasources/defillama/tokenprice/execute.py new file mode 100644 index 00000000000..158a03bc386 --- /dev/null +++ b/src/op_analytics/datasources/defillama/tokenprice/execute.py @@ -0,0 +1,548 @@ +""" +Execute DeFiLlama token price data collection. +""" + +from typing import Dict, Any + +import polars as pl + +from op_analytics.coreutils.bigquery.write import most_recent_dates +from op_analytics.coreutils.logger import structlog +from op_analytics.coreutils.misc import raise_for_schema_mismatch +from op_analytics.coreutils.partitioned.dailydatautils import dt_summary +from op_analytics.coreutils.request import new_session +from op_analytics.datapipeline.chains.tokens import get_token_ids_from_metadata_and_file + +from ..dataaccess import DefiLlama +from .price_data import DefiLlamaTokenPrices, TOKEN_PRICES_SCHEMA, mask_api_key_in_text + +log = structlog.get_logger() + + +def execute_pull_current( + extra_token_ids_file: str | None = None, + token_id: str | None = None, + show_progress: bool = False, + batch_size: int = 50, +) -> Dict[str, Any] | None: + """ + Execute DeFiLlama current price data pull. + + Args: + extra_token_ids_file: Optional path to file with extra token IDs + token_id: Optional single token ID to process (if provided, ignores other token sources) + show_progress: Whether to show progress logging for token processing + batch_size: Number of tokens to process per batch (when show_progress=True) + + Returns: + Summary of the price fetch operation + """ + try: + session = new_session() + + # Get list of token IDs + if token_id is not None: + # Use single token if provided + token_ids = [token_id] + log.info("single_token_mode", token_id=token_id) + else: + # Use normal token collection logic + token_ids = get_token_ids_from_metadata_and_file( + extra_token_ids_file=extra_token_ids_file, + include_top_tokens=0, + top_tokens_fetcher=None, + ) + + log.info("current_price_pull_start", token_count=len(token_ids)) + + if not token_ids: + log.error("no_token_ids_found") + return None + + # Fetch current prices + try: + log.info( + "fetching_current_prices", token_count=len(token_ids), show_progress=show_progress + ) + + if show_progress: + price_data = DefiLlamaTokenPrices.fetch_prices_current_with_progress( + token_ids, session=session, batch_size=batch_size + ) + else: + price_data = DefiLlamaTokenPrices.fetch_prices_current(token_ids, session=session) + + if price_data.is_empty(): + log.warning("no_price_data_returned") + return None + + log.info("fetched_current_prices", row_count=len(price_data)) + + except Exception as e: + log.error("failed_to_fetch_current_prices", error=mask_api_key_in_text(str(e))) + return None + + # Schema assertions + raise_for_schema_mismatch( + actual_schema=price_data.df.schema, + expected_schema=TOKEN_PRICES_SCHEMA, + ) + + # Write prices (current prices are typically just one day) + log.info("writing_current_price_data", rows=len(price_data.df)) + + DefiLlama.TOKEN_PRICES.write( + dataframe=price_data.df, + sort_by=["token_id"], + ) + log.info("wrote_current_price_data") + + # Create BigQuery external table + DefiLlama.TOKEN_PRICES.create_bigquery_external_table() + log.info("created_price_external_table") + + # Return summary information + return { + "price_df": dt_summary(price_data.df), + } + + except Exception as e: + log.error("execute_pull_current_failed", error=mask_api_key_in_text(str(e))) + return None + + +def execute_pull_historical( + days: int = 30, + extra_token_ids_file: str | None = None, + token_id: str | None = None, + start_timestamp: int | None = None, + end_timestamp: int | None = None, + span: int = 0, + period: str = "1d", + search_width: str | None = None, + skip_existing_partitions: bool = False, + show_progress: bool = False, + batch_size: int = 50, + time_chunk_days: int = 7, +) -> Dict[str, Any] | None: + """ + Execute DeFiLlama historical price data pull. + + Args: + days: Number of days of historical data to fetch (ignored if start_timestamp is provided) + extra_token_ids_file: Optional path to file with extra token IDs + token_id: Optional single token ID to process (if provided, ignores other token sources) + start_timestamp: Unix timestamp of earliest data point (optional) + end_timestamp: Unix timestamp of latest data point (optional) + span: Number of data points returned, defaults to 0 (all available) + period: Duration between data points (e.g., '1d', '1h', '1w') + search_width: Time range on either side to find price data + skip_existing_partitions: Whether to skip writing partitions that already exist + show_progress: Whether to show progress logging for token processing + batch_size: Number of tokens to process per batch (when show_progress=True) + time_chunk_days: Number of days per API call chunk (default: 7) + + Returns: + Summary of the price fetch operation + """ + try: + session = new_session() + + # Get list of token IDs + if token_id is not None: + # Use single token if provided + token_ids = [token_id] + log.info("single_token_mode", token_id=token_id) + else: + # Use normal token collection logic + token_ids = get_token_ids_from_metadata_and_file( + extra_token_ids_file=extra_token_ids_file, + include_top_tokens=0, + top_tokens_fetcher=None, + ) + + log.info( + "historical_price_pull_start", + token_count=len(token_ids), + time_chunk_days=time_chunk_days, + ) + + if not token_ids: + log.error("no_token_ids_found") + return None + + # Fetch historical prices + try: + log.info( + "fetching_historical_prices", + token_count=len(token_ids), + show_progress=show_progress, + time_chunk_days=time_chunk_days, + ) + + if start_timestamp is not None or end_timestamp is not None: + # Use explicit timestamps + if show_progress: + price_data = DefiLlamaTokenPrices.fetch_prices_historical_with_progress( + token_ids=token_ids, + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + span=span, + period=period, + search_width=search_width, + session=session, + batch_size=batch_size, + time_chunk_days=time_chunk_days, + ) + else: + price_data = DefiLlamaTokenPrices.fetch_prices_historical( + token_ids=token_ids, + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + span=span, + period=period, + search_width=search_width, + session=session, + time_chunk_days=time_chunk_days, + ) + else: + # Use days parameter + if show_progress: + # For days-based fetching, we need to calculate the timestamp parameters + from datetime import datetime, timedelta + + start_date = datetime.now() - timedelta(days=days) + start_timestamp = int(start_date.timestamp()) + + price_data = DefiLlamaTokenPrices.fetch_prices_historical_with_progress( + token_ids=token_ids, + start_timestamp=start_timestamp, + span=days, + period="1d", + session=session, + batch_size=batch_size, + time_chunk_days=time_chunk_days, + ) + else: + price_data = DefiLlamaTokenPrices.fetch_prices_by_days( + token_ids=token_ids, + days=days, + session=session, + time_chunk_days=time_chunk_days, + ) + + if price_data.is_empty(): + log.warning("no_price_data_returned") + return None + + log.info( + "fetched_historical_prices", + row_count=len(price_data.df), + date_range=f"{str(price_data.df['dt'].min())} to {str(price_data.df['dt'].max())}", + unique_dates=price_data.df["dt"].n_unique(), + ) + + except Exception as e: + log.error("failed_to_fetch_historical_prices", error=mask_api_key_in_text(str(e))) + return None + + # Filter out existing partitions if requested + if skip_existing_partitions and not price_data.df.is_empty(): + from op_analytics.coreutils.partitioned.dataaccess import init_data_access + from op_analytics.coreutils.partitioned.dailydatawrite import ( + determine_location, + MARKERS_TABLE, + ) + from op_analytics.coreutils.partitioned.output import ExpectedOutput + + # Get unique dates from the fetched data + unique_dates = price_data.df["dt"].unique().to_list() + + # Check which dates already have complete markers + client = init_data_access() + location = determine_location() + + existing_dates = [] + for dt in unique_dates: + # Construct the expected output marker path for this date + expected_output = ExpectedOutput( + root_path=DefiLlama.TOKEN_PRICES.root_path, + file_name="out.parquet", + marker_path=f"{dt}/{DefiLlama.TOKEN_PRICES.root_path}", + ) + + if client.marker_exists( + data_location=location, + marker_path=expected_output.marker_path, + markers_table=MARKERS_TABLE, + ): + existing_dates.append(dt) + + if existing_dates: + # Filter out existing dates from the price data + original_count = len(price_data.df) + price_data.df = price_data.df.filter(~pl.col("dt").is_in(existing_dates)) + + log.info( + "filtered_existing_partitions", + original_rows=original_count, + filtered_rows=len(price_data.df), + skipped_dates=len(existing_dates), + skipped_date_list=existing_dates, + ) + + if price_data.df.is_empty(): + log.info("all_partitions_already_exist") + return { + "price_df": None, + } + + # Schema assertions + raise_for_schema_mismatch( + actual_schema=price_data.df.schema, + expected_schema=TOKEN_PRICES_SCHEMA, + ) + + # Write prices + log.info("writing_historical_price_data", rows=len(price_data.df)) + + # Use the requested days parameter for filtering + filtered_price_df = most_recent_dates(price_data.df, n_dates=days, date_column="dt") + log.info( + "filtered_price_data", + filtered_rows=len(filtered_price_df), + date_range=f"{str(filtered_price_df['dt'].min())} to {str(filtered_price_df['dt'].max())}", + ) + + DefiLlama.TOKEN_PRICES.write( + dataframe=filtered_price_df, + sort_by=["token_id"], + ) + log.info("wrote_historical_price_data") + + # Create BigQuery external table + DefiLlama.TOKEN_PRICES.create_bigquery_external_table() + log.info("created_price_external_table") + + # Return summary information + return { + "price_df": dt_summary(price_data.df), + } + + except Exception as e: + log.error("execute_pull_historical_failed", error=mask_api_key_in_text(str(e))) + return None + + +def execute_pull_first_prices( + extra_token_ids_file: str | None = None, + token_id: str | None = None, + show_progress: bool = False, + batch_size: int = 50, +) -> Dict[str, Any] | None: + """ + Execute DeFiLlama first recorded price data pull. + + Args: + extra_token_ids_file: Optional path to file with extra token IDs + token_id: Optional single token ID to process (if provided, ignores other token sources) + show_progress: Whether to show progress logging for token processing + batch_size: Number of tokens to process per batch (when show_progress=True) + + Returns: + Summary of the price fetch operation + """ + try: + session = new_session() + + # Get list of token IDs + if token_id is not None: + # Use single token if provided + token_ids = [token_id] + log.info("single_token_mode", token_id=token_id) + else: + # Use normal token collection logic + token_ids = get_token_ids_from_metadata_and_file( + extra_token_ids_file=extra_token_ids_file, + include_top_tokens=0, + top_tokens_fetcher=None, + ) + + log.info("first_price_pull_start", token_count=len(token_ids)) + + if not token_ids: + log.error("no_token_ids_found") + return None + + # Fetch first prices + try: + log.info( + "fetching_first_prices", token_count=len(token_ids), show_progress=show_progress + ) + + if show_progress: + price_data = DefiLlamaTokenPrices.fetch_first_prices_with_progress( + token_ids, session=session, batch_size=batch_size + ) + else: + price_data = DefiLlamaTokenPrices.fetch_first_prices(token_ids, session=session) + + if price_data.is_empty(): + log.warning("no_price_data_returned") + return None + + log.info("fetched_first_prices", row_count=len(price_data)) + + except Exception as e: + log.error("failed_to_fetch_first_prices", error=mask_api_key_in_text(str(e))) + return None + + # Schema assertions + raise_for_schema_mismatch( + actual_schema=price_data.df.schema, + expected_schema=TOKEN_PRICES_SCHEMA, + ) + + # Write prices + log.info("writing_first_price_data", rows=len(price_data.df)) + + DefiLlama.TOKEN_PRICES.write( + dataframe=price_data.df, + sort_by=["token_id"], + ) + log.info("wrote_first_price_data") + + # Create BigQuery external table + DefiLlama.TOKEN_PRICES.create_bigquery_external_table() + log.info("created_price_external_table") + + # Return summary information + return { + "price_df": dt_summary(price_data.df), + } + + except Exception as e: + log.error("execute_pull_first_prices_failed", error=mask_api_key_in_text(str(e))) + return None + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Fetch DeFiLlama token price data") + parser.add_argument( + "--mode", + choices=["current", "historical", "first"], + default="historical", + help="Mode of price fetching: current, historical, or first", + ) + parser.add_argument( + "--days", + type=int, + default=30, + help="Number of days of historical data to fetch (for historical mode)", + ) + parser.add_argument( + "--extra-token-ids-file", + type=str, + default=None, + help="Optional path to a file (csv or txt) with extra token IDs to include", + ) + parser.add_argument( + "--token-id", + type=str, + default=None, + help="Optional single token ID to process (if provided, ignores other token sources)", + ) + parser.add_argument( + "--start-timestamp", + type=int, + default=None, + help="Unix timestamp of earliest data point (for historical mode)", + ) + parser.add_argument( + "--end-timestamp", + type=int, + default=None, + help="Unix timestamp of latest data point (for historical mode)", + ) + parser.add_argument( + "--span", + type=int, + default=0, + help="Number of data points returned (for historical mode)", + ) + parser.add_argument( + "--period", + type=str, + default="1d", + help="Duration between data points (for historical mode)", + ) + parser.add_argument( + "--search-width", + type=str, + default=None, + help="Time range on either side to find price data (for historical mode)", + ) + parser.add_argument( + "--skip-existing-partitions", + action="store_true", + help="Skip writing partitions that already exist (for historical mode)", + ) + parser.add_argument( + "--show-progress", + action="store_true", + help="Show progress logging for token processing", + ) + parser.add_argument( + "--batch-size", + type=int, + default=50, + help="Number of tokens to process per batch (when --show-progress is enabled)", + ) + parser.add_argument( + "--time-chunk-days", + type=int, + default=7, + help="Number of days per API call chunk (default: 7, helps avoid timeouts on large requests)", + ) + args = parser.parse_args() + + try: + if args.mode == "current": + result = execute_pull_current( + extra_token_ids_file=args.extra_token_ids_file, + token_id=args.token_id, + show_progress=args.show_progress, + batch_size=args.batch_size, + ) + elif args.mode == "historical": + result = execute_pull_historical( + days=args.days, + extra_token_ids_file=args.extra_token_ids_file, + token_id=args.token_id, + start_timestamp=args.start_timestamp, + end_timestamp=args.end_timestamp, + span=args.span, + period=args.period, + search_width=args.search_width, + skip_existing_partitions=args.skip_existing_partitions, + show_progress=args.show_progress, + batch_size=args.batch_size, + time_chunk_days=args.time_chunk_days, + ) + elif args.mode == "first": + result = execute_pull_first_prices( + extra_token_ids_file=args.extra_token_ids_file, + token_id=args.token_id, + show_progress=args.show_progress, + batch_size=args.batch_size, + ) + + if result is not None: + log.info("completed_price_processing") + else: + log.error("failed_price_processing") + + except Exception as e: + log.error("main_execution_failed", error=mask_api_key_in_text(str(e))) diff --git a/src/op_analytics/datasources/defillama/tokenprice/price_data.py b/src/op_analytics/datasources/defillama/tokenprice/price_data.py new file mode 100644 index 00000000000..5c5a36de869 --- /dev/null +++ b/src/op_analytics/datasources/defillama/tokenprice/price_data.py @@ -0,0 +1,1160 @@ +from dataclasses import dataclass +from datetime import datetime, timedelta +import re + +import polars as pl +import requests + +from op_analytics.coreutils.request import get_data, new_session +from op_analytics.coreutils.env.vault import env_get +from op_analytics.coreutils.logger import structlog +from op_analytics.coreutils.time import dt_fromepoch + +log = structlog.get_logger() + +# DeFiLlama Pro API endpoints +COINS_CHART_ENDPOINT = "https://pro-api.llama.fi/{api_key}/coins/chart/{coins}" +COINS_PRICES_CURRENT_ENDPOINT = "https://pro-api.llama.fi/{api_key}/coins/prices/current/{coins}" +COINS_PRICES_HISTORICAL_ENDPOINT = ( + "https://pro-api.llama.fi/{api_key}/coins/prices/historical/{timestamp}/{coins}" +) +COINS_BATCH_HISTORICAL_ENDPOINT = "https://pro-api.llama.fi/{api_key}/coins/batchHistorical" + + +TOKEN_PRICES_SCHEMA = pl.Schema( + [ + ("token_id", pl.String), + ("dt", pl.String), + ("price_usd", pl.Float64), + ("last_updated", pl.String), + ] +) + + +def format_token_id_for_defillama(token_id: str) -> str: + """ + Format token ID for DeFiLlama API. + + DeFiLlama requires CoinGecko slugs to be prefixed with 'coingecko:'. + Chain:address format tokens are used as-is. + + Args: + token_id: Token ID (either CoinGecko slug or chain:address format) + + Returns: + Properly formatted token ID for DeFiLlama + """ + # If it already has a chain prefix (contains ":"), use as-is + if ":" in token_id: + return token_id + + # Otherwise, it's a CoinGecko slug and needs the coingecko: prefix + return f"coingecko:{token_id}" + + +def mask_api_key_in_url(url: str) -> str: + """ + Mask the API key in a URL for secure logging. + + Args: + url: URL that may contain an API key + + Returns: + URL with API key masked + """ + # Pattern to match the API key in DeFiLlama URLs + pattern = r"(https://pro-api\.llama\.fi/)([^/]+)(/.*)" + match = re.match(pattern, url) + + if match: + base_url, api_key, path = match.groups() + # Mask the API key, showing only first 4 and last 4 characters + if len(api_key) > 8: + masked_key = api_key[:4] + "*" * (len(api_key) - 8) + api_key[-4:] + else: + masked_key = "*" * len(api_key) + return f"{base_url}{masked_key}{path}" + + return url + + +def mask_api_key_in_text(text: str) -> str: + """ + Mask API keys in any text (error messages, URLs, etc.). + + Args: + text: Text that may contain API keys + + Returns: + Text with API keys masked + """ + # Pattern to find DeFiLlama API keys in any context + pattern = r"(https://pro-api\.llama\.fi/)([^/\s]+)" + + def replace_match(match): + base_url, api_key = match.groups() + if len(api_key) > 8: + masked_key = api_key[:4] + "*" * (len(api_key) - 8) + api_key[-4:] + else: + masked_key = "*" * len(api_key) + return f"{base_url}{masked_key}" + + return re.sub(pattern, replace_match, text) + + +def safe_get_data(session, url, **kwargs): + """ + Wrapper around get_data that masks API keys in error messages. + + Args: + session: HTTP session + url: URL to fetch + **kwargs: Additional arguments to pass to get_data + + Returns: + Response data + + Raises: + Exception with masked URL + """ + try: + return get_data(session, url, **kwargs) + except Exception as e: + # Mask the API key in the exception message + error_message = mask_api_key_in_text(str(e)) + + # Create a new exception with the masked message + new_exception = type(e)(error_message) + raise new_exception from e + + +@dataclass +class DefiLlamaTokenPrices: + """Token price data from DeFiLlama Pro API.""" + + df: pl.DataFrame + + @classmethod + def fetch_prices_current( + cls, + token_ids: list[str], + session: requests.Session | None = None, + ) -> "DefiLlamaTokenPrices": + """Fetch current prices for tokens. + + Args: + token_ids: List of token IDs (coingecko slugs or chain:address format) + session: Optional requests session + + Returns: + DefiLlamaTokenPrices instance with current price data + """ + session = session or new_session() + api_key = env_get("DEFILLAMA_API_KEY") + + # Format token IDs for DeFiLlama + formatted_tokens = [format_token_id_for_defillama(tid) for tid in token_ids] + tokens_str = ",".join(formatted_tokens) + + url = COINS_PRICES_CURRENT_ENDPOINT.format(api_key=api_key, coins=tokens_str) + + try: + response = safe_get_data( + session, + url, + retry_attempts=3, + emit_log=False, # Don't emit logs because the key is in the URL + ) + + log.info("fetched_current_prices", token_count=len(token_ids)) + + # Parse response into records + records = [] + current_time = datetime.now().isoformat() + + if "coins" in response: + for formatted_token_id, price_info in response["coins"].items(): + if price_info and "price" in price_info: + # Map back from formatted token ID to original token ID + original_token_id = formatted_token_id + if formatted_token_id.startswith("coingecko:"): + original_token_id = formatted_token_id[ + 10: + ] # Remove "coingecko:" prefix + + records.append( + { + "token_id": original_token_id, + "dt": current_time[:10], # YYYY-MM-DD format + "price_usd": float(price_info["price"]), + "last_updated": current_time, + } + ) + + df = pl.DataFrame(records, schema=TOKEN_PRICES_SCHEMA, strict=False) + return cls(df=df) + + except Exception as e: + log.error("failed_to_fetch_current_prices", error=str(e)) + return cls(df=pl.DataFrame([], schema=TOKEN_PRICES_SCHEMA)) + + @classmethod + def fetch_prices_at_timestamp( + cls, + token_ids: list[str], + timestamp: int, + session: requests.Session | None = None, + ) -> "DefiLlamaTokenPrices": + """Fetch prices at a specific timestamp. + + Args: + token_ids: List of token IDs (coingecko slugs or chain:address format) + timestamp: Unix timestamp to get prices for + session: Optional requests session + + Returns: + DefiLlamaTokenPrices instance with price data at the specified timestamp + """ + session = session or new_session() + api_key = env_get("DEFILLAMA_API_KEY") + + # Format token IDs for DeFiLlama + formatted_tokens = [format_token_id_for_defillama(tid) for tid in token_ids] + tokens_str = ",".join(formatted_tokens) + + url = COINS_PRICES_HISTORICAL_ENDPOINT.format( + api_key=api_key, timestamp=timestamp, coins=tokens_str + ) + + try: + response = safe_get_data( + session, + url, + retry_attempts=3, + emit_log=False, # Don't emit logs because the key is in the URL + ) + + log.info("fetched_prices_at_timestamp", token_count=len(token_ids), timestamp=timestamp) + + # Parse response into records + records = [] + + if "coins" in response: + for formatted_token_id, price_info in response["coins"].items(): + if price_info and "price" in price_info: + # Map back from formatted token ID to original token ID + original_token_id = formatted_token_id + if formatted_token_id.startswith("coingecko:"): + original_token_id = formatted_token_id[ + 10: + ] # Remove "coingecko:" prefix + + records.append( + { + "token_id": original_token_id, + "dt": dt_fromepoch(timestamp), + "price_usd": float(price_info["price"]), + "last_updated": datetime.now().isoformat(), + } + ) + + df = pl.DataFrame(records, schema=TOKEN_PRICES_SCHEMA, strict=False) + return cls(df=df) + + except Exception as e: + log.error("failed_to_fetch_prices_at_timestamp", error=str(e)) + return cls(df=pl.DataFrame([], schema=TOKEN_PRICES_SCHEMA)) + + @classmethod + def fetch_prices_historical( + cls, + token_ids: list[str], + start_timestamp: int | None = None, + end_timestamp: int | None = None, + span: int = 0, + period: str = "1d", + search_width: str | None = None, + session: requests.Session | None = None, + time_chunk_days: int = 7, + ) -> "DefiLlamaTokenPrices": + """Fetch historical prices for tokens. + + Args: + token_ids: List of token IDs (coingecko slugs or chain:address format) + start_timestamp: Unix timestamp of earliest data point (optional) + end_timestamp: Unix timestamp of latest data point (optional) + span: Number of data points returned, defaults to 0 (all available) + period: Duration between data points (e.g., '1d', '1h', '1w') + search_width: Time range on either side to find price data + session: Optional requests session + time_chunk_days: Number of days per API call chunk (default: 7) + + Returns: + DefiLlamaTokenPrices instance with historical price data + """ + session = session or new_session() + + # If we have a span and it's large, or if we have a large time range, chunk the requests + should_chunk = False + if start_timestamp and end_timestamp: + days_range = (end_timestamp - start_timestamp) / (24 * 3600) + should_chunk = days_range > time_chunk_days + elif span > time_chunk_days: + should_chunk = True + + if should_chunk: + log.info( + "chunking_historical_request", + token_count=len(token_ids), + time_chunk_days=time_chunk_days, + span=span, + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + ) + + return cls._fetch_prices_historical_chunked( + token_ids=token_ids, + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + span=span, + period=period, + search_width=search_width, + session=session, + time_chunk_days=time_chunk_days, + ) + + # For small requests, use the original single-call approach + return cls._fetch_prices_historical_single( + token_ids=token_ids, + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + span=span, + period=period, + search_width=search_width, + session=session, + ) + + @classmethod + def _fetch_prices_historical_single( + cls, + token_ids: list[str], + start_timestamp: int | None = None, + end_timestamp: int | None = None, + span: int = 0, + period: str = "1d", + search_width: str | None = None, + session: requests.Session | None = None, + ) -> "DefiLlamaTokenPrices": + """Fetch historical prices for tokens in a single API call (original implementation).""" + session = session or new_session() + api_key = env_get("DEFILLAMA_API_KEY") + + # Format token IDs for DeFiLlama + formatted_tokens = [format_token_id_for_defillama(tid) for tid in token_ids] + tokens_str = ",".join(formatted_tokens) + + # Build URL with parameters + url = COINS_CHART_ENDPOINT.format(api_key=api_key, coins=tokens_str) + + params: dict[str, int | str] = {} + if start_timestamp is not None: + params["start"] = start_timestamp + # Only include end_timestamp if start_timestamp is not provided + # DeFiLlama API doesn't support both start and end parameters together + if end_timestamp is not None and start_timestamp is None: + params["end"] = end_timestamp + if span > 0: + params["span"] = span + if period != "1d": + params["period"] = period + if search_width is not None: + params["searchWidth"] = search_width + + try: + response = safe_get_data( + session, + url, + params=params, + retry_attempts=3, + emit_log=False, # Don't emit logs because the key is in the URL + ) + + log.info("fetched_historical_prices_single", token_count=len(token_ids), params=params) + + # Parse response into records + records = [] + + if "coins" in response: + for formatted_token_id, price_data in response["coins"].items(): + if price_data and "prices" in price_data: + # Map back from formatted token ID to original token ID + original_token_id = formatted_token_id + if formatted_token_id.startswith("coingecko:"): + original_token_id = formatted_token_id[ + 10: + ] # Remove "coingecko:" prefix + + for price_point in price_data["prices"]: + if ( + price_point + and "timestamp" in price_point + and "price" in price_point + ): + records.append( + { + "token_id": original_token_id, + "dt": dt_fromepoch(price_point["timestamp"]), + "price_usd": float(price_point["price"]), + "last_updated": datetime.now().isoformat(), + } + ) + + df = pl.DataFrame(records, schema=TOKEN_PRICES_SCHEMA, strict=False) + return cls(df=df) + + except Exception as e: + log.error( + "failed_to_fetch_historical_prices_single", error=mask_api_key_in_text(str(e)) + ) + return cls(df=pl.DataFrame([], schema=TOKEN_PRICES_SCHEMA)) + + @classmethod + def _fetch_prices_historical_chunked( + cls, + token_ids: list[str], + start_timestamp: int | None = None, + end_timestamp: int | None = None, + span: int = 0, + period: str = "1d", + search_width: str | None = None, + session: requests.Session | None = None, + time_chunk_days: int = 7, + ) -> "DefiLlamaTokenPrices": + """Fetch historical prices for tokens using time-based chunking.""" + session = session or new_session() + + # Calculate the actual time range to chunk + span_days: int = int(span) if span > 0 else 0 + if start_timestamp and end_timestamp: + # Use provided timestamps + actual_start = start_timestamp + actual_end = end_timestamp + total_days = (actual_end - actual_start) / (24 * 3600) + elif start_timestamp and span > 0: + # Start timestamp + span days + actual_start = start_timestamp + actual_end = start_timestamp + (span_days * 24 * 3600) + total_days = float(span_days) + elif end_timestamp and span > 0: + # End timestamp - span days + actual_start = end_timestamp - (span_days * 24 * 3600) + actual_end = end_timestamp + total_days = float(span_days) + else: + # Default to last 30 days if no clear range + actual_end = int(datetime.now().timestamp()) + actual_start = actual_end - (30 * 24 * 3600) + total_days = 30 + + # Create time chunks using only start timestamps + time_chunks = [] + + current_start = actual_start + remaining_days = total_days + + while remaining_days > 0: + # For each chunk, we'll use start timestamp + span (days to fetch) + chunk_days = min(time_chunk_days, remaining_days) + time_chunks.append((current_start, chunk_days)) + + # Move to next chunk: add chunk_days + 1 day buffer to avoid overlap issues + current_start += int((chunk_days + 1) * 24 * 3600) + remaining_days -= chunk_days + + log.info( + "processing_time_chunks", + total_chunks=len(time_chunks), + chunk_size_days=time_chunk_days, + total_days_requested=total_days, + ) + + # Fetch data for each time chunk + all_records = [] + for chunk_num, (chunk_start, chunk_span) in enumerate(time_chunks, 1): + log.info( + "fetching_time_chunk", + chunk=chunk_num, + total_chunks=len(time_chunks), + chunk_start=dt_fromepoch(chunk_start), + chunk_span_days=chunk_span, + ) + + try: + # Use only start timestamp and span for this chunk + chunk_data = cls._fetch_prices_historical_single( + token_ids=token_ids, + start_timestamp=chunk_start, + end_timestamp=None, # Don't use end timestamp + span=int(chunk_span), + period=period, + search_width=search_width, + session=session, + ) + + if not chunk_data.is_empty(): + # Convert to records to combine later + chunk_records = chunk_data.df.to_dicts() + all_records.extend(chunk_records) + + log.info( + "completed_time_chunk", + chunk=chunk_num, + total_chunks=len(time_chunks), + prices_fetched=len(chunk_data.df) if not chunk_data.is_empty() else 0, + ) + + except Exception as e: + log.error( + "failed_time_chunk", + chunk=chunk_num, + total_chunks=len(time_chunks), + error=mask_api_key_in_text(str(e)), + ) + continue + + log.info("completed_chunked_historical_fetch", total_prices=len(all_records)) + + # Combine all records and remove duplicates + if all_records: + df = pl.DataFrame(all_records, schema=TOKEN_PRICES_SCHEMA, strict=False) + # Remove duplicates based on token_id and dt + df = df.unique(subset=["token_id", "dt"]) + # Sort by token_id and dt + df = df.sort(["token_id", "dt"]) + + # Filter to the original requested time range to remove excess data from buffer + if start_timestamp and end_timestamp: + start_date = dt_fromepoch(start_timestamp) + end_date = dt_fromepoch(end_timestamp) + df = df.filter((pl.col("dt") >= start_date) & (pl.col("dt") <= end_date)) + else: + df = pl.DataFrame([], schema=TOKEN_PRICES_SCHEMA) + + return cls(df=df) + + @classmethod + def fetch_prices_historical_with_progress( + cls, + token_ids: list[str], + start_timestamp: int | None = None, + end_timestamp: int | None = None, + span: int = 0, + period: str = "1d", + search_width: str | None = None, + session: requests.Session | None = None, + batch_size: int = 50, + time_chunk_days: int = 7, + ) -> "DefiLlamaTokenPrices": + """ + Fetch historical prices for tokens with progress logging and time-based chunking. + + Args: + token_ids: List of token IDs (coingecko slugs or chain:address format) + start_timestamp: Unix timestamp of earliest data point (optional) + end_timestamp: Unix timestamp of latest data point (optional) + span: Number of data points returned, defaults to 0 (all available) + period: Duration between data points (e.g., '1d', '1h', '1w') + search_width: Time range on either side to find price data + session: Optional requests session + batch_size: Number of tokens to process per batch + time_chunk_days: Number of days per API call chunk (default: 7) + + Returns: + DefiLlamaTokenPrices instance with historical price data + """ + session = session or new_session() + + # Determine if we need time chunking + should_chunk = False + if start_timestamp and end_timestamp: + days_range = (end_timestamp - start_timestamp) / (24 * 3600) + should_chunk = days_range > time_chunk_days + elif span > time_chunk_days: + should_chunk = True + + if should_chunk: + log.info( + "using_time_chunking_with_progress", + token_count=len(token_ids), + time_chunk_days=time_chunk_days, + batch_size=batch_size, + ) + + return cls._fetch_prices_historical_chunked_with_progress( + token_ids=token_ids, + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + span=span, + period=period, + search_width=search_width, + session=session, + batch_size=batch_size, + time_chunk_days=time_chunk_days, + ) + + # For small requests, use the original batch-based approach + return cls._fetch_prices_historical_batched_only( + token_ids=token_ids, + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + span=span, + period=period, + search_width=search_width, + session=session, + batch_size=batch_size, + ) + + @classmethod + def _fetch_prices_historical_batched_only( + cls, + token_ids: list[str], + start_timestamp: int | None = None, + end_timestamp: int | None = None, + span: int = 0, + period: str = "1d", + search_width: str | None = None, + session: requests.Session | None = None, + batch_size: int = 50, + ) -> "DefiLlamaTokenPrices": + """Fetch historical prices using only token batching (original progress implementation).""" + session = session or new_session() + api_key = env_get("DEFILLAMA_API_KEY") + + all_records = [] + total_tokens = len(token_ids) + + # Process tokens in batches + for i in range(0, total_tokens, batch_size): + batch_tokens = token_ids[i : i + batch_size] + batch_num = (i // batch_size) + 1 + total_batches = (total_tokens + batch_size - 1) // batch_size + + log.info( + "fetching_historical_prices_batch", + batch=batch_num, + total_batches=total_batches, + tokens_in_batch=len(batch_tokens), + progress=f"{min(i + batch_size, total_tokens)}/{total_tokens}", + ) + + # Format token IDs for DeFiLlama + formatted_tokens = [format_token_id_for_defillama(tid) for tid in batch_tokens] + tokens_str = ",".join(formatted_tokens) + + # Build URL with parameters + url = COINS_CHART_ENDPOINT.format(api_key=api_key, coins=tokens_str) + + params: dict[str, int | str] = {} + if start_timestamp is not None: + params["start"] = start_timestamp + if end_timestamp is not None: + params["end"] = end_timestamp + if span > 0: + params["span"] = span + if period != "1d": + params["period"] = period + if search_width is not None: + params["searchWidth"] = search_width + + try: + response = safe_get_data( + session, + url, + params=params, + retry_attempts=3, + emit_log=False, # Don't emit logs because the key is in the URL + ) + + if "coins" in response: + for formatted_token_id, price_data in response["coins"].items(): + if price_data and "prices" in price_data: + # Map back from formatted token ID to original token ID + original_token_id = formatted_token_id + if formatted_token_id.startswith("coingecko:"): + original_token_id = formatted_token_id[ + 10: + ] # Remove "coingecko:" prefix + + for price_point in price_data["prices"]: + if ( + price_point + and "timestamp" in price_point + and "price" in price_point + ): + all_records.append( + { + "token_id": original_token_id, + "dt": dt_fromepoch(price_point["timestamp"]), + "price_usd": float(price_point["price"]), + "last_updated": datetime.now().isoformat(), + } + ) + + log.info( + "completed_historical_prices_batch", + batch=batch_num, + total_batches=total_batches, + prices_fetched=len(response.get("coins", {})), + ) + + except Exception as e: + log.error( + "failed_to_fetch_historical_prices_batch", + batch=batch_num, + total_batches=total_batches, + error=mask_api_key_in_text(str(e)), + ) + continue + + log.info("completed_historical_prices_fetch", total_prices=len(all_records)) + + df = pl.DataFrame(all_records, schema=TOKEN_PRICES_SCHEMA, strict=False) + return cls(df=df) + + @classmethod + def _fetch_prices_historical_chunked_with_progress( + cls, + token_ids: list[str], + start_timestamp: int | None = None, + end_timestamp: int | None = None, + span: int = 0, + period: str = "1d", + search_width: str | None = None, + session: requests.Session | None = None, + batch_size: int = 50, + time_chunk_days: int = 7, + ) -> "DefiLlamaTokenPrices": + """Fetch historical prices using both time chunking and token batching with progress logging.""" + session = session or new_session() + + # Calculate the actual time range to chunk + span_days: int = int(span) if span > 0 else 0 + if start_timestamp and end_timestamp: + actual_start = start_timestamp + actual_end = end_timestamp + total_days = (actual_end - actual_start) / (24 * 3600) + elif start_timestamp and span > 0: + actual_start = start_timestamp + actual_end = start_timestamp + (span_days * 24 * 3600) + total_days = float(span_days) + elif end_timestamp and span > 0: + actual_start = end_timestamp - (span_days * 24 * 3600) + actual_end = end_timestamp + total_days = float(span_days) + else: + actual_end = int(datetime.now().timestamp()) + actual_start = actual_end - (30 * 24 * 3600) + total_days = 30 + + # Create time chunks using only start timestamps + time_chunks = [] + + current_start = actual_start + remaining_days = total_days + + while remaining_days > 0: + # For each chunk, we'll use start timestamp + span (days to fetch) + chunk_days = min(time_chunk_days, remaining_days) + time_chunks.append((current_start, chunk_days)) + + # Move to next chunk: add chunk_days + 1 day buffer to avoid overlap issues + current_start += int((chunk_days + 1) * 24 * 3600) + remaining_days -= chunk_days + + log.info( + "processing_time_chunks_with_progress", + total_time_chunks=len(time_chunks), + chunk_size_days=time_chunk_days, + total_days_requested=total_days, + token_count=len(token_ids), + batch_size=batch_size, + ) + + # Fetch data for each time chunk + all_records = [] + for chunk_num, (chunk_start, chunk_span) in enumerate(time_chunks, 1): + log.info( + "processing_time_chunk_with_batches", + time_chunk=chunk_num, + total_time_chunks=len(time_chunks), + chunk_start=dt_fromepoch(chunk_start), + chunk_span_days=chunk_span, + ) + + try: + # Use batched approach for this time chunk + chunk_data = cls._fetch_prices_historical_batched_only( + token_ids=token_ids, + start_timestamp=chunk_start, + end_timestamp=None, # Don't use end timestamp + span=int(chunk_span), + period=period, + search_width=search_width, + session=session, + batch_size=batch_size, + ) + + if not chunk_data.is_empty(): + chunk_records = chunk_data.df.to_dicts() + all_records.extend(chunk_records) + + log.info( + "completed_time_chunk_with_batches", + time_chunk=chunk_num, + total_time_chunks=len(time_chunks), + prices_fetched=len(chunk_data.df) if not chunk_data.is_empty() else 0, + ) + + except Exception as e: + log.error( + "failed_time_chunk_with_batches", + time_chunk=chunk_num, + total_time_chunks=len(time_chunks), + error=mask_api_key_in_text(str(e)), + ) + continue + + log.info("completed_chunked_historical_fetch_with_progress", total_prices=len(all_records)) + + # Combine all records and remove duplicates + if all_records: + df = pl.DataFrame(all_records, schema=TOKEN_PRICES_SCHEMA, strict=False) + # Remove duplicates based on token_id and dt + df = df.unique(subset=["token_id", "dt"]) + # Sort by token_id and dt + df = df.sort(["token_id", "dt"]) + + # Filter to the original requested time range to remove excess data from buffer + if start_timestamp and end_timestamp: + start_date = dt_fromepoch(start_timestamp) + end_date = dt_fromepoch(end_timestamp) + df = df.filter((pl.col("dt") >= start_date) & (pl.col("dt") <= end_date)) + else: + df = pl.DataFrame([], schema=TOKEN_PRICES_SCHEMA) + + return cls(df=df) + + @classmethod + def fetch_prices_by_days( + cls, + token_ids: list[str], + days: int = 30, + session: requests.Session | None = None, + time_chunk_days: int = 7, + ) -> "DefiLlamaTokenPrices": + """Fetch historical prices for the last N days. + + Args: + token_ids: List of token IDs (coingecko slugs or chain:address format) + days: Number of days of historical data to fetch + session: Optional requests session + time_chunk_days: Number of days per API call chunk (default: 7) + + Returns: + DefiLlamaTokenPrices instance with historical price data + """ + # Calculate start timestamp for N days ago + start_date = datetime.now() - timedelta(days=days) + start_timestamp = int(start_date.timestamp()) + + return cls.fetch_prices_historical( + token_ids=token_ids, + start_timestamp=start_timestamp, + span=days, # Return daily data points for the specified number of days + period="1d", + session=session, + time_chunk_days=time_chunk_days, + ) + + @classmethod + def fetch_first_prices( + cls, + token_ids: list[str], + session: requests.Session | None = None, + ) -> "DefiLlamaTokenPrices": + """Fetch the first recorded prices for tokens. + + Args: + token_ids: List of token IDs (coingecko slugs or chain:address format) + session: Optional requests session + + Returns: + DefiLlamaTokenPrices instance with first price data + """ + session = session or new_session() + api_key = env_get("DEFILLAMA_API_KEY") + + # Format token IDs for DeFiLlama + formatted_tokens = [format_token_id_for_defillama(tid) for tid in token_ids] + tokens_str = ",".join(formatted_tokens) + + # Use chart endpoint with early start timestamp and span=1 to get first price + url = COINS_CHART_ENDPOINT.format(api_key=api_key, coins=tokens_str) + + # Set parameters to get the earliest available data point + params = { + "start": 1, # Very early timestamp (Jan 1, 1970) + "span": 1, # Only get the first data point + } + + try: + response = safe_get_data( + session, + url, + params=params, + retry_attempts=3, + emit_log=False, # Don't emit logs because the key is in the URL + ) + + log.info("fetched_first_prices", token_count=len(token_ids)) + + # Parse response into records + records = [] + + if "coins" in response: + for formatted_token_id, price_data in response["coins"].items(): + if price_data and "prices" in price_data and price_data["prices"]: + # Map back from formatted token ID to original token ID + original_token_id = formatted_token_id + if formatted_token_id.startswith("coingecko:"): + original_token_id = formatted_token_id[ + 10: + ] # Remove "coingecko:" prefix + + # Get the first (earliest) price point + first_price = price_data["prices"][0] + if first_price and "timestamp" in first_price and "price" in first_price: + records.append( + { + "token_id": original_token_id, + "dt": dt_fromepoch(first_price["timestamp"]), + "price_usd": float(first_price["price"]), + "last_updated": datetime.now().isoformat(), + } + ) + + df = pl.DataFrame(records, schema=TOKEN_PRICES_SCHEMA, strict=False) + return cls(df=df) + + except Exception as e: + log.error("failed_to_fetch_first_prices", error=str(e)) + return cls(df=pl.DataFrame([], schema=TOKEN_PRICES_SCHEMA)) + + @classmethod + def fetch_prices_current_with_progress( + cls, + token_ids: list[str], + session: requests.Session | None = None, + batch_size: int = 50, + ) -> "DefiLlamaTokenPrices": + """ + Fetch current prices for tokens with progress logging. + + Args: + token_ids: List of token IDs (coingecko slugs or chain:address format) + session: Optional requests session + batch_size: Number of tokens to process per batch + + Returns: + DefiLlamaTokenPrices instance with current price data + """ + session = session or new_session() + api_key = env_get("DEFILLAMA_API_KEY") + + all_records = [] + total_tokens = len(token_ids) + + # Process tokens in batches + for i in range(0, total_tokens, batch_size): + batch_tokens = token_ids[i : i + batch_size] + batch_num = (i // batch_size) + 1 + total_batches = (total_tokens + batch_size - 1) // batch_size + + log.info( + "fetching_current_prices_batch", + batch=batch_num, + total_batches=total_batches, + tokens_in_batch=len(batch_tokens), + progress=f"{min(i + batch_size, total_tokens)}/{total_tokens}", + ) + + # Format token IDs for DeFiLlama + formatted_tokens = [format_token_id_for_defillama(tid) for tid in batch_tokens] + tokens_str = ",".join(formatted_tokens) + + url = COINS_PRICES_CURRENT_ENDPOINT.format(api_key=api_key, coins=tokens_str) + + try: + response = safe_get_data( + session, + url, + retry_attempts=3, + emit_log=False, # Don't emit logs because the key is in the URL + ) + + current_time = datetime.now().isoformat() + + if "coins" in response: + for formatted_token_id, price_info in response["coins"].items(): + if price_info and "price" in price_info: + # Map back from formatted token ID to original token ID + original_token_id = formatted_token_id + if formatted_token_id.startswith("coingecko:"): + original_token_id = formatted_token_id[ + 10: + ] # Remove "coingecko:" prefix + + all_records.append( + { + "token_id": original_token_id, + "dt": current_time[:10], # YYYY-MM-DD format + "price_usd": float(price_info["price"]), + "last_updated": current_time, + } + ) + + log.info( + "completed_current_prices_batch", + batch=batch_num, + total_batches=total_batches, + prices_fetched=len(response.get("coins", {})), + ) + + except Exception as e: + log.error( + "failed_to_fetch_current_prices_batch", + batch=batch_num, + total_batches=total_batches, + error=mask_api_key_in_text(str(e)), + ) + continue + + log.info("completed_current_prices_fetch", total_prices=len(all_records)) + + df = pl.DataFrame(all_records, schema=TOKEN_PRICES_SCHEMA, strict=False) + return cls(df=df) + + @classmethod + def fetch_first_prices_with_progress( + cls, + token_ids: list[str], + session: requests.Session | None = None, + batch_size: int = 50, + ) -> "DefiLlamaTokenPrices": + """ + Fetch the first recorded prices for tokens with progress logging. + + Args: + token_ids: List of token IDs (coingecko slugs or chain:address format) + session: Optional requests session + batch_size: Number of tokens to process per batch + + Returns: + DefiLlamaTokenPrices instance with first price data + """ + session = session or new_session() + api_key = env_get("DEFILLAMA_API_KEY") + + all_records = [] + total_tokens = len(token_ids) + + # Process tokens in batches + for i in range(0, total_tokens, batch_size): + batch_tokens = token_ids[i : i + batch_size] + batch_num = (i // batch_size) + 1 + total_batches = (total_tokens + batch_size - 1) // batch_size + + log.info( + "fetching_first_prices_batch", + batch=batch_num, + total_batches=total_batches, + tokens_in_batch=len(batch_tokens), + progress=f"{min(i + batch_size, total_tokens)}/{total_tokens}", + ) + + # Format token IDs for DeFiLlama + formatted_tokens = [format_token_id_for_defillama(tid) for tid in batch_tokens] + tokens_str = ",".join(formatted_tokens) + + # Use chart endpoint with early start timestamp and span=1 to get first price + url = COINS_CHART_ENDPOINT.format(api_key=api_key, coins=tokens_str) + + # Set parameters to get the earliest available data point + params = { + "start": 1, # Very early timestamp (Jan 1, 1970) + "span": 1, # Only get the first data point + } + + try: + response = safe_get_data( + session, + url, + params=params, + retry_attempts=3, + emit_log=False, # Don't emit logs because the key is in the URL + ) + + if "coins" in response: + for formatted_token_id, price_data in response["coins"].items(): + if price_data and "prices" in price_data and price_data["prices"]: + # Map back from formatted token ID to original token ID + original_token_id = formatted_token_id + if formatted_token_id.startswith("coingecko:"): + original_token_id = formatted_token_id[ + 10: + ] # Remove "coingecko:" prefix + + # Get the first (earliest) price point + first_price = price_data["prices"][0] + if ( + first_price + and "timestamp" in first_price + and "price" in first_price + ): + all_records.append( + { + "token_id": original_token_id, + "dt": dt_fromepoch(first_price["timestamp"]), + "price_usd": float(first_price["price"]), + "last_updated": datetime.now().isoformat(), + } + ) + + log.info( + "completed_first_prices_batch", + batch=batch_num, + total_batches=total_batches, + prices_fetched=len(response.get("coins", {})), + ) + + except Exception as e: + log.error( + "failed_to_fetch_first_prices_batch", + batch=batch_num, + total_batches=total_batches, + error=mask_api_key_in_text(str(e)), + ) + continue + + log.info("completed_first_prices_fetch", total_prices=len(all_records)) + + df = pl.DataFrame(all_records, schema=TOKEN_PRICES_SCHEMA, strict=False) + return cls(df=df) + + def is_empty(self) -> bool: + """Check if the dataframe is empty.""" + return self.df.is_empty() + + def __len__(self) -> int: + """Return the number of records.""" + return len(self.df)