diff --git a/notebooks/adhoc/token_transfers_dev.ipynb b/notebooks/adhoc/token_transfers_dev.ipynb new file mode 100644 index 00000000000..4ed7bcfda6a --- /dev/null +++ b/notebooks/adhoc/token_transfers_dev.ipynb @@ -0,0 +1,137 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prepare data reader for a given chain and date" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from op_analytics.coreutils.duckdb_inmem import init_client\n", + "from op_analytics.coreutils.partitioned.reader import DataReader\n", + "from op_analytics.coreutils.partitioned.location import DataLocation\n", + "from op_analytics.datapipeline.etl.intermediate.construct import construct_data_readers\n", + "\n", + "from op_analytics.datapipeline.models.compute.udfs import create_duckdb_macros\n", + "\n", + "\n", + "# Define the input data range.\n", + "read_batches: list[DataReader] = construct_data_readers(\n", + " chains=[\"op\"],\n", + " models=[\"token_transfers\"],\n", + " range_spec=\"@20241030:+1\",\n", + " read_from=DataLocation.GCS\n", + ")\n", + "\n", + "\n", + "# Select input for one date and build the intermediate model inputs.\n", + "batch = read_batches[0]\n", + "\n", + "\n", + "duckdb_client = init_client()\n", + "create_duckdb_macros(duckdb_client)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Run the model\n", + "\n", + "This automatically registers the model outputs as duckdb tables." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from op_analytics.datapipeline.models.compute.testutils import execute_model_in_memory\n", + "\n", + "\n", + "execute_model_in_memory(\n", + " duckdb_client=duckdb_client,\n", + " model=\"token_transfers\",\n", + " data_reader=batch,\n", + ")\n", + "\n", + "# The duckdb database will have the following:\n", + "# - input tables\n", + "# - views used by the model\n", + "# - model outputs\n", + "#\n", + "# You can use duckdb to inspect any of the above results.\n", + "duckdb_client.sql(\"SHOW TABLES\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Verify model results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "duckdb_client.sql(\"SELECT *, value_64/1e18 AS value_native FROM native_transfers_v1 LIMIT 10\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "duckdb_client.sql(\"SELECT * FROM erc20_transfers_v1 LIMIT 10\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### You can also convert the results to dataframes to inspect them in more familiar ways" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "duckdb_client.sql(\"SELECT * FROM native_transfers_v1 LIMIT 10\").pl().head()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/src/op_analytics/datapipeline/models/code/token_transfers.py b/src/op_analytics/datapipeline/models/code/token_transfers.py new file mode 100644 index 00000000000..85ee998429f --- /dev/null +++ b/src/op_analytics/datapipeline/models/code/token_transfers.py @@ -0,0 +1,21 @@ +import duckdb +from duckdb.typing import VARCHAR, BIGINT # Import DuckDB's type annotations + +from op_analytics.datapipeline.models.compute.querybuilder import TemplatedSQLQuery +from op_analytics.datapipeline.models.compute.registry import register_model +from op_analytics.datapipeline.models.compute.types import NamedRelations + +# We can add ERC20/721/115/etc here + +@register_model( + input_datasets=["ingestion/traces_v1", "ingestion/transactions_v1"], + expected_outputs=["native_transfers_v1"], + auxiliary_views=[ + TemplatedSQLQuery(template_name="native_transfers", context={}), + ], +) +def token_transfers(duckdb_client: duckdb.DuckDBPyConnection) -> NamedRelations: + + return { + "native_transfers_v1": duckdb_client.view("native_transfers"), + } diff --git a/src/op_analytics/datapipeline/models/templates/native_transfers.sql.j2 b/src/op_analytics/datapipeline/models/templates/native_transfers.sql.j2 new file mode 100644 index 00000000000..69d54c35c92 --- /dev/null +++ b/src/op_analytics/datapipeline/models/templates/native_transfers.sql.j2 @@ -0,0 +1,35 @@ +SELECT + tr.network + , tr.chain_id + , tr.chain + , tr.dt + , tr.block_timestamp + , tr.block_number + , tr.transaction_hash + , tr.transaction_index + , tr.from_address AS transfer_from_address + , tr.to_address AS transfer_to_address + , hexstr_method_id(tr.input) AS trace_method_id + , tr.value_64 + , tr.value_lossless + , tr.trace_type + , tr.call_type + , tr.gas AS trace_gas_limit + , tr.gas_used AS trace_gas_used + , tr.subtraces + , tr.trace_address + , t.from_address AS tx_from_address + , t.to_address AS tx_to_address + , hexstr_method_id(t.input) AS tx_method_id + +FROM ingestion_traces_v1 AS tr +INNER JOIN ingestion_transactions_v1 AS t + ON + tr.transaction_hash = t.hash + AND tr.block_number = t.block_number + AND tr.chain_id = t.chain_id +WHERE + (tr.call_type NOT IN ('delegatecall', 'callcode', 'staticcall') OR tr.call_type = '') + AND t.receipt_status = 1 + AND tr.status = 1 + AND tr.value_lossless != '0'