diff --git a/notebooks/adhoc/clickhouse_transforms/base_fee_flows_sankey.html b/notebooks/adhoc/clickhouse_transforms/base_fee_flows_sankey.html
new file mode 100644
index 00000000000..982f0f70be2
--- /dev/null
+++ b/notebooks/adhoc/clickhouse_transforms/base_fee_flows_sankey.html
@@ -0,0 +1,3885 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/notebooks/adhoc/clickhouse_transforms/fees_sankey_prototype.ipynb b/notebooks/adhoc/clickhouse_transforms/fees_sankey_prototype.ipynb
new file mode 100644
index 00000000000..d9c71cce6ff
--- /dev/null
+++ b/notebooks/adhoc/clickhouse_transforms/fees_sankey_prototype.ipynb
@@ -0,0 +1,455 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "vscode": {
+ "languageId": "raw"
+ }
+ },
+ "source": [
+ "# Fees Sankey Diagram Prototype\n",
+ "\n",
+ "This notebook demonstrates how to generate and visualize Superchain fee flows using a Sankey diagram.\n",
+ "\n",
+ "We'll focus on **Base** with **28 days** of data as our test case.\n",
+ "\n",
+ "## Overview\n",
+ "\n",
+ "The Sankey diagram shows the hierarchical breakdown of fees:\n",
+ "- **Level 1 (Main categories):** Chain, MEV, Stablecoin, App fees (must sum to 100%)\n",
+ "- **Level 2 (Sub-breakdowns):** Revenue shares and components within each category\n",
+ "\n",
+ "## Revenue Flow Structure\n",
+ "\n",
+ "**Chain fees:** Revenue share to Optimism + Revenue to Chain + Gas costs + Remaining \n",
+ "**MEV fees:** Revenue share to App + Remaining \n",
+ "**Stablecoin fees:** Revenue share to App + Remaining \n",
+ "**App fees:** Revenue to App + Remaining \n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "ā
Imports successful!\n"
+ ]
+ }
+ ],
+ "source": [
+ "import pandas as pd\n",
+ "import plotly.graph_objects as go\n",
+ "from plotly.offline import iplot\n",
+ "import plotly.io as pio\n",
+ "\n",
+ "# Set up plotly for notebook display\n",
+ "pio.renderers.default = \"notebook\"\n",
+ "\n",
+ "# Import our fees sankey function\n",
+ "from op_analytics.transforms.fees_sankey.generate_sankey_fees_dataset import execute_pull\n",
+ "\n",
+ "print(\"ā
Imports successful!\")\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "vscode": {
+ "languageId": "raw"
+ }
+ },
+ "source": [
+ "## Generate Sankey Data\n",
+ "\n",
+ "Let's generate the fee flow data using our transform function with dry-run mode to avoid writing to databases.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "\u001b[2m2025-07-15 16:43:23\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mStarting Sankey fees dataset generation\u001b[0m \u001b[36mdays\u001b[0m=\u001b[35m28\u001b[0m \u001b[36mdry_run\u001b[0m=\u001b[35mTrue\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m411\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:25\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mQuerying source data \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m417\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m \u001b[36mtable\u001b[0m=\u001b[35moplabs-tools-data.materialized_tables.daily_superchain_health_mv\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mRetrieved chains with fee data\u001b[0m \u001b[36mchains_count\u001b[0m=\u001b[35m66\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m429\u001b[0m \u001b[36mmax_rss\u001b[0m=\u001b[35m192.1\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mFee data summary \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m435\u001b[0m \u001b[36mmax_fees_usd\u001b[0m=\u001b[35mnp.float64(676014.7496697985)\u001b[0m \u001b[36mmin_fees_usd\u001b[0m=\u001b[35mnp.float64(0.003217326523783194)\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mProcessing fee flows \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m446\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mGenerated fee flow edges \u001b[0m \u001b[36medges_count\u001b[0m=\u001b[35m316\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m448\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mValidating output dataset \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m315\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mSchema validation passed \u001b[0m \u001b[36mcolumns\u001b[0m=\u001b[35m['chain_set', 'source', 'destination', 'value', 'pct_of_total_fees_usd']\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m324\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mLevel 1 percentage validation \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m331\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mAll Level 1 percentages sum to 100%\u001b[0m \u001b[36mchains_count\u001b[0m=\u001b[35m66\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m338\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mEdge type breakdown \u001b[0m \u001b[36medge_types\u001b[0m=\u001b[35m{'total_fees_usd': 115, 'total_chain_fees_usd': 114, 'total_app_fees_usd_excl_stable_mev': 82, 'total_mev_fees_usd': 4, 'total_stablecoin_fees_usd': 1}\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m342\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m \u001b[36mtotal_edges\u001b[0m=\u001b[35m316\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mSample Level 2 edges \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m350\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m \u001b[36msample_data\u001b[0m=\u001b[35m[{'chain_set': 'Bitcoin', 'source': 'total_chain_fees_usd', 'destination': 'remaining_chain_fees_usd', 'value': 676014.7496697985, 'pct_of_total_fees_usd': 96.31777761377788}, {'chain_set': 'Bitcoin', 'source': 'total_app_fees_usd_excl_stable_mev', 'destination': 'total_app_revenue_usd_excl_stable_mev', 'value': 22969.0, 'pct_of_total_fees_usd': 3.2725958051824757}, {'chain_set': 'Bitcoin', 'source': 'total_app_fees_usd_excl_stable_mev', 'destination': 'remaining_app_fees_usd_excl_stable_mev', 'value': 2875.0, 'pct_of_total_fees_usd': 0.4096265810396455}, {'chain_set': 'Base', 'source': 'total_chain_fees_usd', 'destination': 'revshare_estimated_usd', 'value': 16285.827373132564, 'pct_of_total_fees_usd': 0.6507665790775451}, {'chain_set': 'Base', 'source': 'total_chain_fees_usd', 'destination': 'chain_governor_profit_estimated_usd', 'value': 92286.35511441786, 'pct_of_total_fees_usd': 3.6876772814394227}]\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mSample output preview \u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m458\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mchain_set source destination value pct_of_total_fees_usd\n",
+ " Bitcoin total_fees_usd total_chain_fees_usd 6.760147e+05 96.317778\n",
+ " Bitcoin total_fees_usd total_app_fees_usd_excl_stable_mev 2.584400e+04 3.682222\n",
+ " Bitcoin total_chain_fees_usd remaining_chain_fees_usd 6.760147e+05 96.317778\n",
+ " Bitcoin total_app_fees_usd_excl_stable_mev total_app_revenue_usd_excl_stable_mev 2.296900e+04 3.272596\n",
+ " Bitcoin total_app_fees_usd_excl_stable_mev remaining_app_fees_usd_excl_stable_mev 2.875000e+03 0.409627\n",
+ " Base total_fees_usd total_chain_fees_usd 1.109525e+05 4.433559\n",
+ " Base total_fees_usd total_app_fees_usd_excl_stable_mev 2.391608e+06 95.566441\n",
+ " Base total_chain_fees_usd revshare_estimated_usd 1.628583e+04 0.650767\n",
+ " Base total_chain_fees_usd chain_governor_profit_estimated_usd 9.228636e+04 3.687677\n",
+ " Base total_chain_fees_usd usd_gas_costs_per_day 2.380320e+03 0.095115\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m459\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mDry run complete - no data written\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m466\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "\u001b[2m2025-07-15 16:43:26\u001b[0m [\u001b[32m\u001b[1minfo \u001b[0m] \u001b[1mSankey fees dataset generation complete\u001b[0m \u001b[36mfilename\u001b[0m=\u001b[35mgenerate_sankey_fees_dataset.py\u001b[0m \u001b[36mlineno\u001b[0m=\u001b[35m468\u001b[0m \u001b[36mmax_rss\u001b[0m=\u001b[35m192.1\u001b[0m \u001b[36mpipeline_step\u001b[0m=\u001b[35mfees_sankey\u001b[0m \u001b[36mprocess\u001b[0m=\u001b[35m46820\u001b[0m\n",
+ "Execution Summary:\n",
+ " chains_processed: 66\n",
+ " edges_generated: 316\n",
+ " level1_edges: 115\n",
+ " level2_edges: 201\n",
+ " dry_run: True\n"
+ ]
+ },
+ {
+ "ename": "KeyError",
+ "evalue": "'dataframe'",
+ "output_type": "error",
+ "traceback": [
+ "\u001b[31m---------------------------------------------------------------------------\u001b[39m",
+ "\u001b[31mKeyError\u001b[39m Traceback (most recent call last)",
+ "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[2]\u001b[39m\u001b[32m, line 9\u001b[39m\n\u001b[32m 6\u001b[39m \u001b[38;5;28mprint\u001b[39m(\u001b[33mf\u001b[39m\u001b[33m\"\u001b[39m\u001b[33m \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mkey\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mvalue\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m\"\u001b[39m)\n\u001b[32m 8\u001b[39m \u001b[38;5;66;03m# Get the generated DataFrame\u001b[39;00m\n\u001b[32m----> \u001b[39m\u001b[32m9\u001b[39m df = \u001b[43mresult\u001b[49m\u001b[43m[\u001b[49m\u001b[33;43m'\u001b[39;49m\u001b[33;43mdataframe\u001b[39;49m\u001b[33;43m'\u001b[39;49m\u001b[43m]\u001b[49m\n\u001b[32m 10\u001b[39m \u001b[38;5;28mprint\u001b[39m(\u001b[33mf\u001b[39m\u001b[33m\"\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[33mš Generated \u001b[39m\u001b[38;5;132;01m{\u001b[39;00m\u001b[38;5;28mlen\u001b[39m(df)\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m fee flow edges\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m 11\u001b[39m \u001b[38;5;28mprint\u001b[39m(\u001b[33mf\u001b[39m\u001b[33m\"\u001b[39m\u001b[33mš Total fee value: $\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mdf[\u001b[33m'\u001b[39m\u001b[33mvalue\u001b[39m\u001b[33m'\u001b[39m].sum()\u001b[38;5;132;01m:\u001b[39;00m\u001b[33m,.2f\u001b[39m\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m\"\u001b[39m)\n",
+ "\u001b[31mKeyError\u001b[39m: 'dataframe'"
+ ]
+ }
+ ],
+ "source": [
+ "# Generate the fee flow data (28 days lookback, dry-run mode)\n",
+ "result = execute_pull(days=28, dry_run=True)\n",
+ "\n",
+ "print(\"Execution Summary:\")\n",
+ "for key, value in result.items():\n",
+ " print(f\" {key}: {value}\")\n",
+ "\n",
+ "# Get the generated DataFrame\n",
+ "df = result['dataframe']\n",
+ "print(f\"\\nš Generated {len(df)} fee flow edges\")\n",
+ "print(f\"š Total fee value: ${df['value'].sum():,.2f}\")\n",
+ "\n",
+ "# Show sample of the data\n",
+ "print(\"\\nš Sample data:\")\n",
+ "df.head(10)\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "vscode": {
+ "languageId": "raw"
+ }
+ },
+ "source": [
+ "## Filter for Base\n",
+ "\n",
+ "Let's focus on Base data for our Sankey visualization test case.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Filter for Base\n",
+ "base_df = df[df['chain_set'] == 'Base'].copy()\n",
+ "\n",
+ "print(f\"š Base fee flow edges: {len(base_df)}\")\n",
+ "print(f\"š° Base total fees: ${base_df['value'].sum():,.2f}\")\n",
+ "\n",
+ "# Show breakdown by level\n",
+ "level1_df = base_df[base_df['source'] == 'Total Fees']\n",
+ "level2_df = base_df[base_df['source'] != 'Total Fees']\n",
+ "\n",
+ "print(f\"\\nš Level 1 edges (main categories): {len(level1_df)}\")\n",
+ "print(f\"š Level 2 edges (sub-breakdowns): {len(level2_df)}\")\n",
+ "\n",
+ "print(f\"\\nā
Level 1 percentage check: {level1_df['pct_of_total_fees_usd'].sum():.1f}% (should be 100%)\")\n",
+ "\n",
+ "print(\"\\nš Base data:\")\n",
+ "base_df.sort_values('value', ascending=False)\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "vscode": {
+ "languageId": "raw"
+ }
+ },
+ "source": [
+ "## Create Sankey Diagram\n",
+ "\n",
+ "Now let's create a beautiful Sankey diagram using Plotly. We'll need to:\n",
+ "1. Build node lists (all unique sources and destinations)\n",
+ "2. Create links with proper indices\n",
+ "3. Apply colors and formatting\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def create_sankey_diagram(df, title=\"Superchain Fee Flows\"):\n",
+ " \"\"\"Create a Plotly Sankey diagram from fee flow data.\"\"\"\n",
+ " \n",
+ " # Filter out zero-value flows for cleaner visualization\n",
+ " df_filtered = df[df['value'] > 0].copy()\n",
+ " \n",
+ " # Get all unique nodes (sources and destinations)\n",
+ " all_sources = df_filtered['source'].unique()\n",
+ " all_destinations = df_filtered['destination'].unique()\n",
+ " all_nodes = list(set(list(all_sources) + list(all_destinations)))\n",
+ " \n",
+ " # Create node index mapping\n",
+ " node_indices = {node: i for i, node in enumerate(all_nodes)}\n",
+ " \n",
+ " # Create source and target indices for links\n",
+ " source_indices = [node_indices[source] for source in df_filtered['source']]\n",
+ " target_indices = [node_indices[dest] for dest in df_filtered['destination']]\n",
+ " values = df_filtered['value'].tolist()\n",
+ " \n",
+ " # Define colors for different node types\n",
+ " node_colors = []\n",
+ " for node in all_nodes:\n",
+ " if node == 'Total Fees':\n",
+ " node_colors.append('#1f77b4') # Blue for total\n",
+ " elif 'fees' in node.lower():\n",
+ " node_colors.append('#ff7f0e') # Orange for fee categories\n",
+ " elif 'revenue' in node.lower() or 'optimism' in node.lower():\n",
+ " node_colors.append('#2ca02c') # Green for revenue\n",
+ " elif 'remaining' in node.lower():\n",
+ " node_colors.append('#d62728') # Red for remaining\n",
+ " elif 'gas' in node.lower():\n",
+ " node_colors.append('#9467bd') # Purple for gas costs\n",
+ " else:\n",
+ " node_colors.append('#8c564b') # Brown for others\n",
+ " \n",
+ " # Create labels with values for better readability\n",
+ " node_labels = []\n",
+ " for node in all_nodes:\n",
+ " # Calculate total inflow for this node\n",
+ " inflow = df_filtered[df_filtered['destination'] == node]['value'].sum()\n",
+ " if inflow > 0:\n",
+ " node_labels.append(f\"{node}
${inflow:,.0f}\")\n",
+ " else:\n",
+ " node_labels.append(node)\n",
+ " \n",
+ " # Create the Sankey diagram\n",
+ " fig = go.Figure(data=[go.Sankey(\n",
+ " node=dict(\n",
+ " pad=15,\n",
+ " thickness=20,\n",
+ " line=dict(color=\"black\", width=0.5),\n",
+ " label=node_labels,\n",
+ " color=node_colors\n",
+ " ),\n",
+ " link=dict(\n",
+ " source=source_indices,\n",
+ " target=target_indices,\n",
+ " value=values,\n",
+ " color=['rgba(255,127,14,0.4)' for _ in values] # Semi-transparent orange\n",
+ " )\n",
+ " )])\\n \n",
+ " fig.update_layout(\n",
+ " title_text=f\"{title}
28-day lookback ⢠Values in USD\",\n",
+ " font_size=10,\n",
+ " width=1000,\n",
+ " height=600\n",
+ " )\\n \n",
+ " return fig\n",
+ "\n",
+ "# Create and display the Sankey diagram\n",
+ "sankey_fig = create_sankey_diagram(base_df, \"Base Fee Flows\")\n",
+ "sankey_fig.show()\n"
+ ]
+ },
+ {
+ "cell_type": "raw",
+ "metadata": {
+ "vscode": {
+ "languageId": "raw"
+ }
+ },
+ "source": [
+ "## Analysis Summary\n",
+ "\n",
+ "Let's examine the key insights from our Base fee flow analysis.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Detailed analysis of Base fee flows\n",
+ "print(\"š BASE FEE FLOW ANALYSIS\")\n",
+ "print(\"=\" * 50)\n",
+ "\n",
+ "total_fees = base_df['value'].sum()\n",
+ "print(f\"š° Total Fee Volume: ${total_fees:,.2f}\")\n",
+ "\n",
+ "print(f\"\\nš LEVEL 1 BREAKDOWN (Main Categories):\")\n",
+ "for _, row in level1_df.sort_values('value', ascending=False).iterrows():\n",
+ " print(f\" ⢠{row['destination']:20} ${row['value']:>10,.0f} ({row['pct_of_total_fees_usd']:>5.1f}%)\")\n",
+ "\n",
+ "print(f\"\\nš§ LEVEL 2 BREAKDOWN (Sub-components):\")\n",
+ "for _, row in level2_df.sort_values('value', ascending=False).iterrows():\n",
+ " print(f\" ⢠{row['source']} ā {row['destination']}\")\n",
+ " print(f\" ${row['value']:,.0f}\")\n",
+ "\n",
+ "print(f\"\\nā
VALIDATION:\")\n",
+ "print(f\" ⢠Level 1 percentages sum to: {level1_df['pct_of_total_fees_usd'].sum():.1f}%\")\n",
+ "print(f\" ⢠Total edges generated: {len(base_df)}\")\n",
+ "print(f\" ⢠Zero-value edges: {len(base_df[base_df['value'] == 0])}\")\n",
+ "\n",
+ "print(f\"\\nšÆ READY FOR VISUALIZATION!\")\n",
+ "print(f\" ⢠Data structure validated ā
\")\n",
+ "print(f\" ⢠Percentages correctly calculated ā
\")\n",
+ "print(f\" ⢠Sankey diagram rendered above ā
\")\n"
+ ]
+ },
+ {
+ "cell_type": "raw",
+ "metadata": {},
+ "source": []
+ },
+ {
+ "cell_type": "raw",
+ "metadata": {},
+ "source": []
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "vscode": {
+ "languageId": "raw"
+ }
+ },
+ "source": [
+ "# Fees Sankey Transform Prototype\n",
+ "\n",
+ "This notebook is for prototyping and testing the fees sankey transform that generates datasets for Sankey diagram visualization of Superchain fee flows.\n",
+ "\n",
+ "## Purpose\n",
+ "- Test fee flow logic\n",
+ "- Validate output structure\n",
+ "- Prototype new features\n",
+ "- Backfill historical data if needed\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import sys\n",
+ "import os\n",
+ "sys.path.append('../../../src')\n",
+ "\n",
+ "import pandas as pd\n",
+ "import polars as pl\n",
+ "from op_analytics.coreutils.logger import structlog\n",
+ "from op_analytics.transforms.fees_sankey.generate_sankey_fees_dataset import (\n",
+ " get_source_data, \n",
+ " process_fee_flows, \n",
+ " validate_output\n",
+ ")\n",
+ "\n",
+ "log = structlog.get_logger()\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "vscode": {
+ "languageId": "raw"
+ }
+ },
+ "source": [
+ "## Test Configuration\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Configuration\n",
+ "DAYS = 30 # Look back 30 days for testing\n",
+ "DRY_RUN = True # Don't write to databases during prototyping\n",
+ "\n",
+ "print(f\"Configuration: {DAYS} days lookback, dry_run={DRY_RUN}\")\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "vscode": {
+ "languageId": "raw"
+ }
+ },
+ "source": [
+ "## Run Transform\n",
+ "\n",
+ "Execute the fees sankey transform with dry run for testing:\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Run the transform execute_pull function for testing\n",
+ "from op_analytics.transforms.fees_sankey.generate_sankey_fees_dataset import execute_pull\n",
+ "\n",
+ "# This will run the full pipeline in dry-run mode\n",
+ "try:\n",
+ " result = execute_pull(days=DAYS, dry_run=DRY_RUN)\n",
+ " print(\"ā
Transform completed successfully!\")\n",
+ " print(f\"Summary: {result}\")\n",
+ "except Exception as e:\n",
+ " print(f\"ā Transform failed: {e}\")\n",
+ " import traceback\n",
+ " traceback.print_exc()\n"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": ".venv",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.12.4"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}
diff --git a/src/op_analytics/cli/subcommands/transforms/__init__.py b/src/op_analytics/cli/subcommands/transforms/__init__.py
new file mode 100644
index 00000000000..34f275ed3c9
--- /dev/null
+++ b/src/op_analytics/cli/subcommands/transforms/__init__.py
@@ -0,0 +1,3 @@
+from .app import app
+
+__all__ = ["app"]
diff --git a/src/op_analytics/cli/subcommands/transforms/app.py b/src/op_analytics/cli/subcommands/transforms/app.py
new file mode 100644
index 00000000000..c06c727c804
--- /dev/null
+++ b/src/op_analytics/cli/subcommands/transforms/app.py
@@ -0,0 +1,32 @@
+import typer
+from typing_extensions import Annotated
+
+from op_analytics.coreutils.logger import structlog
+
+log = structlog.get_logger()
+
+app = typer.Typer(
+ help="Transform utilities for data processing.",
+ add_completion=False,
+ pretty_exceptions_show_locals=False,
+)
+
+
+@app.command(name="fees-sankey")
+def fees_sankey_command(
+ days: Annotated[int, typer.Option("--days", help="Number of days to look back")] = 90,
+ dry_run: Annotated[
+ bool, typer.Option("--dry-run", help="Don't write to databases, just validate")
+ ] = False,
+):
+ """Generate Sankey diagram dataset for Superchain fee flows."""
+ from op_analytics.transforms.fees_sankey.generate_sankey_fees_dataset import execute_pull
+
+ log.info("Starting fees Sankey transform", days=days, dry_run=dry_run)
+
+ try:
+ result = execute_pull(days=days, dry_run=dry_run)
+ log.info("Transform completed successfully", **result)
+ except Exception as e:
+ log.error("Transform failed", error=str(e))
+ raise typer.Exit(1)
diff --git a/src/op_analytics/transforms/fees_sankey/README.md b/src/op_analytics/transforms/fees_sankey/README.md
new file mode 100644
index 00000000000..46c75c17b8e
--- /dev/null
+++ b/src/op_analytics/transforms/fees_sankey/README.md
@@ -0,0 +1,81 @@
+# Fees Sankey Transform
+
+## Overview
+
+This transform generates datasets for Sankey diagram visualization of Superchain fee flow breakdowns.
+
+## Purpose
+
+The Sankey diagram shows how fees flow through the Superchain ecosystem with hierarchical breakdowns:
+
+**Level 1 (Primary Categories - sum to 100%):**
+- Chain fees (L2 execution fees)
+- MEV operator fees
+- Stablecoin issuer revenue
+- App-specific fees
+
+**Level 2 (Sub-component breakdowns):**
+- Chain fees ā Revenue share to Optimism, Gas costs, Remaining
+- MEV fees ā Revenue share to Optimism, Remaining
+- Stablecoin fees ā Revenue share to Optimism, Remaining
+- App fees ā Revenue to App, Revenue share to Optimism, Remaining
+
+**Revenue Field Definitions:**
+- `revshare_*` fields = Revenue to Optimism (the Collective)
+- `*_revenue_*` fields = Revenue to the App/Protocol
+
+## Source Data
+
+- **Input**: `oplabs-tools-data.materialized_tables.daily_superchain_health_mv`
+- **Output**: BigQuery test table and ClickHouse analytics table
+
+## Schema
+
+| Column | Type | Description |
+|--------|------|-------------|
+| chain_set | String | Chain display name |
+| source | String | Source node in flow |
+| destination | String | Destination node in flow |
+| value | Float | Fee amount in USD |
+| pct_of_total_fees_usd | Float | Percentage of total fees (Level 1 only) |
+
+## Usage
+
+### Command Line
+
+```bash
+# Run with latest data (90 days lookback by default)
+uv run opdata transforms fees-sankey
+
+# Dry run to validate without writing
+uv run opdata transforms fees-sankey --dry-run
+
+# Custom lookback period
+uv run opdata transforms fees-sankey --days 30
+```
+
+### Programmatic Usage
+
+```python
+from op_analytics.transforms.fees_sankey.generate_sankey_fees_dataset import execute_pull
+
+# Execute with options
+result = execute_pull(days=90, dry_run=False)
+print(f"Processed {result['chains_processed']} chains, generated {result['edges_generated']} edges")
+```
+
+## Implementation Notes
+
+- Uses existing op-analytics utilities for BigQuery and ClickHouse writes
+- Manual execution script (not integrated with Dagster)
+- Writes to test datasets for safety
+- Comprehensive validation ensures Level 1 percentages sum to 100%
+- Returns execution summary following op-analytics patterns
+
+## Prototyping
+
+Use the notebook at `notebooks/adhoc/clickhouse_transforms/fees_sankey_prototype.ipynb` for:
+- Testing changes to fee flow logic
+- Validating with different date ranges
+- Prototyping new fee categories
+- Exporting data for Sankey visualization testing
\ No newline at end of file
diff --git a/src/op_analytics/transforms/fees_sankey/__init__.py b/src/op_analytics/transforms/fees_sankey/__init__.py
new file mode 100644
index 00000000000..9a9e7d5906f
--- /dev/null
+++ b/src/op_analytics/transforms/fees_sankey/__init__.py
@@ -0,0 +1,5 @@
+"""
+Fees Sankey Transform
+
+Generates datasets for Sankey diagram visualization of Superchain fee flows.
+"""
diff --git a/src/op_analytics/transforms/fees_sankey/generate_sankey_fees_dataset.py b/src/op_analytics/transforms/fees_sankey/generate_sankey_fees_dataset.py
new file mode 100644
index 00000000000..a078faf85d6
--- /dev/null
+++ b/src/op_analytics/transforms/fees_sankey/generate_sankey_fees_dataset.py
@@ -0,0 +1,513 @@
+#!/usr/bin/env python3
+"""
+Generate Sankey Diagram Dataset for Superchain Fee Flow Breakdowns
+
+This script reads data from daily_superchain_health_mv table, processes fee flows
+according to the specified logic, and outputs to both BigQuery and ClickHouse.
+
+Final schema:
+- chain_set (from display_name in BQ)
+- source
+- destination
+- value
+- pct_of_total_fees_usd
+
+Example usage:
+ python generate_sankey_fees_dataset.py --days 90
+"""
+
+import os
+import sys
+
+import pandas as pd
+import polars as pl
+from google.cloud import bigquery
+
+# Add src to path for imports
+sys.path.append(os.path.join(os.path.dirname(__file__), "../../../../"))
+from op_analytics.coreutils.bigquery.write import overwrite_unpartitioned_table
+from op_analytics.coreutils.clickhouse.oplabs import run_statememt_oplabs, insert_oplabs
+from op_analytics.coreutils.logger import bound_contextvars, memory_usage, structlog
+
+log = structlog.get_logger()
+
+# Constants
+BQ_SOURCE_TABLE = "oplabs-tools-data.materialized_tables.daily_superchain_health_mv"
+BQ_OUTPUT_DATASET = "test_outputs" # Use test dataset for safety
+BQ_OUTPUT_TABLE = "superchain_fees_sankey_v1"
+CH_OUTPUT_DATABASE = "analytics"
+CH_OUTPUT_TABLE = "superchain_fees_sankey_v1"
+
+
+def get_source_data(days: int) -> str:
+ """
+ Generate SQL query to read source data from daily_superchain_health_mv.
+
+ Args:
+ days: Number of days to look back from latest date for aggregation
+
+ Returns:
+ SQL query string
+ """
+ return f"""
+ WITH date_range AS (
+ SELECT
+ DATE_SUB(MAX(dt), INTERVAL {days} DAY) as start_date,
+ MAX(dt) as end_date
+ FROM `{BQ_SOURCE_TABLE}`
+ )
+ SELECT
+ display_name as chain_set,
+
+ -- Aggregate daily data over the specified period
+ SUM(COALESCE(total_chain_fees_usd, 0)) as total_chain_fees_usd,
+ SUM(COALESCE(revshare_estimated_usd, 0)) as revshare_estimated_usd,
+ SUM(COALESCE(chain_governor_profit_estimated_usd, 0)) as chain_governor_profit_estimated_usd,
+ SUM(COALESCE(usd_gas_costs_per_day, 0)) as usd_gas_costs_per_day,
+
+ -- MEV fee components
+ SUM(COALESCE(total_mev_revenue_usd, 0)) as total_mev_revenue_usd,
+ SUM(COALESCE(total_mev_fees_usd, 0)) as total_mev_fees_usd,
+
+ -- App fee components
+ SUM(COALESCE(total_app_revenue_usd_excl_stable_mev, 0)) as total_app_revenue_usd_excl_stable_mev,
+ SUM(COALESCE(total_app_fees_usd_excl_stable_mev, 0)) as total_app_fees_usd_excl_stable_mev,
+
+ -- Stablecoin fee components
+ SUM(COALESCE(total_stablecoin_revenue_usd, 0)) as total_stablecoin_revenue_usd,
+ SUM(COALESCE(total_stablecoin_fees_usd, 0)) as total_stablecoin_fees_usd,
+
+ -- Validation metadata
+ COUNT(*) as days_of_data,
+ MIN(mv.dt) as start_date,
+ MAX(mv.dt) as end_date
+
+ FROM `{BQ_SOURCE_TABLE}` mv
+ CROSS JOIN date_range dr
+ WHERE mv.dt >= dr.start_date
+ AND mv.dt <= dr.end_date
+ AND display_name IS NOT NULL
+ GROUP BY display_name
+ HAVING total_chain_fees_usd > 0 -- Only include chains with fees
+ """
+
+
+def process_fee_flows(df: pd.DataFrame) -> pd.DataFrame:
+ """
+ Process the aggregated data to create fee flow edges for Sankey diagram.
+
+ Logic per requirements:
+ 1. total_fees_usd = total_chain_fees_usd + total_mev_fees_usd + total_stablecoin_fees_usd + total_app_fees_usd_excl_stable_mev
+ 2. total_chain_fees_usd = revshare_estimated_usd + chain_governor_profit_estimated_usd + usd_gas_costs_per_day + remaining_chain_fees_usd
+ 3. total_app_fees_usd_excl_stable_mev = total_app_revenue_usd_excl_stable_mev + remaining_total_app_fees_usd_excl_stable_mev
+ 4. total_stablecoin_fees_usd = total_stablecoin_revenue_usd + remaining_total_stablecoin_fees_usd
+ 5. total_mev_fees_usd = total_mev_revenue_usd + remaining_mev_fees_usd
+
+ Args:
+ df: DataFrame with aggregated fee data by chain
+
+ Returns:
+ DataFrame with Sankey edges (source, destination, value, pct_of_total_fees_usd)
+ """
+
+ sankey_rows = []
+
+ for _, row in df.iterrows():
+ chain_set = row["chain_set"]
+
+ # Calculate derived totals using actual column names
+ # For now, use the fees directly - can add "remaining" logic later
+ total_mev_fees_usd = row["total_mev_fees_usd"]
+ total_stablecoin_fees_usd = row["total_stablecoin_fees_usd"]
+ total_app_fees_usd_excl_stable_mev = row["total_app_fees_usd_excl_stable_mev"]
+
+ # Calculate remaining chain fees
+ known_chain_fees = (
+ row["revshare_estimated_usd"]
+ + row["chain_governor_profit_estimated_usd"]
+ + row["usd_gas_costs_per_day"]
+ )
+ remaining_chain_fees_usd = max(0, row["total_chain_fees_usd"] - known_chain_fees)
+
+ # Calculate remaining for other categories
+ remaining_mev_fees_usd = max(0, total_mev_fees_usd - row["total_mev_revenue_usd"])
+ remaining_stablecoin_fees_usd = max(
+ 0, total_stablecoin_fees_usd - row["total_stablecoin_revenue_usd"]
+ )
+ remaining_app_fees_usd = max(
+ 0, total_app_fees_usd_excl_stable_mev - row["total_app_revenue_usd_excl_stable_mev"]
+ )
+
+ # Calculate total fees
+ total_fees_usd = (
+ row["total_chain_fees_usd"]
+ + total_mev_fees_usd
+ + total_stablecoin_fees_usd
+ + total_app_fees_usd_excl_stable_mev
+ )
+
+ if total_fees_usd <= 0:
+ continue # Skip chains with no fees
+
+ # Create edges for Sankey diagram
+ # Level 1: Total fees breakdown (these count toward 100%)
+ level1_edges = []
+ if row["total_chain_fees_usd"] > 0:
+ level1_edges.append(
+ (chain_set, "total_fees_usd", "total_chain_fees_usd", row["total_chain_fees_usd"])
+ )
+ if total_mev_fees_usd > 0:
+ level1_edges.append(
+ (chain_set, "total_fees_usd", "total_mev_fees_usd", total_mev_fees_usd)
+ )
+ if total_stablecoin_fees_usd > 0:
+ level1_edges.append(
+ (
+ chain_set,
+ "total_fees_usd",
+ "total_stablecoin_fees_usd",
+ total_stablecoin_fees_usd,
+ )
+ )
+ if total_app_fees_usd_excl_stable_mev > 0:
+ level1_edges.append(
+ (
+ chain_set,
+ "total_fees_usd",
+ "total_app_fees_usd_excl_stable_mev",
+ total_app_fees_usd_excl_stable_mev,
+ )
+ )
+
+ # Level 2: Detailed breakdowns (these do NOT count toward 100%, they're sub-flows)
+ level2_edges = []
+
+ # Chain fee breakdown
+ if row["revshare_estimated_usd"] > 0:
+ level2_edges.append(
+ (
+ chain_set,
+ "total_chain_fees_usd",
+ "revshare_estimated_usd",
+ row["revshare_estimated_usd"],
+ )
+ )
+ if row["chain_governor_profit_estimated_usd"] > 0:
+ level2_edges.append(
+ (
+ chain_set,
+ "total_chain_fees_usd",
+ "chain_governor_profit_estimated_usd",
+ row["chain_governor_profit_estimated_usd"],
+ )
+ )
+ if row["usd_gas_costs_per_day"] > 0:
+ level2_edges.append(
+ (
+ chain_set,
+ "total_chain_fees_usd",
+ "usd_gas_costs_per_day",
+ row["usd_gas_costs_per_day"],
+ )
+ )
+ if remaining_chain_fees_usd > 0:
+ level2_edges.append(
+ (
+ chain_set,
+ "total_chain_fees_usd",
+ "remaining_chain_fees_usd",
+ remaining_chain_fees_usd,
+ )
+ )
+
+ # MEV fee breakdown
+ if row["total_mev_revenue_usd"] > 0:
+ level2_edges.append(
+ (
+ chain_set,
+ "total_mev_fees_usd",
+ "total_mev_revenue_usd",
+ row["total_mev_revenue_usd"],
+ )
+ )
+ if remaining_mev_fees_usd > 0:
+ level2_edges.append(
+ (chain_set, "total_mev_fees_usd", "remaining_mev_fees_usd", remaining_mev_fees_usd)
+ )
+
+ # App fee breakdown
+ if row["total_app_revenue_usd_excl_stable_mev"] > 0:
+ level2_edges.append(
+ (
+ chain_set,
+ "total_app_fees_usd_excl_stable_mev",
+ "total_app_revenue_usd_excl_stable_mev",
+ row["total_app_revenue_usd_excl_stable_mev"],
+ )
+ )
+ if remaining_app_fees_usd > 0:
+ level2_edges.append(
+ (
+ chain_set,
+ "total_app_fees_usd_excl_stable_mev",
+ "remaining_app_fees_usd_excl_stable_mev",
+ remaining_app_fees_usd,
+ )
+ )
+
+ # Stablecoin fee breakdown
+ if row["total_stablecoin_revenue_usd"] > 0:
+ level2_edges.append(
+ (
+ chain_set,
+ "total_stablecoin_fees_usd",
+ "total_stablecoin_revenue_usd",
+ row["total_stablecoin_revenue_usd"],
+ )
+ )
+ if remaining_stablecoin_fees_usd > 0:
+ level2_edges.append(
+ (
+ chain_set,
+ "total_stablecoin_fees_usd",
+ "remaining_stablecoin_fees_usd",
+ remaining_stablecoin_fees_usd,
+ )
+ )
+
+ # Add Level 1 edges (count toward 100%)
+ for chain, source, destination, value in level1_edges:
+ pct_of_total = (value / total_fees_usd) * 100
+ sankey_rows.append(
+ {
+ "chain_set": chain,
+ "source": source,
+ "destination": destination,
+ "value": value,
+ "pct_of_total_fees_usd": pct_of_total,
+ }
+ )
+
+ # Add Level 2 edges (percentage based on their parent category, not total)
+ for chain, source, destination, value in level2_edges:
+ # Find the parent category value to calculate percentage
+ if source == "total_chain_fees_usd":
+ parent_value = row["total_chain_fees_usd"]
+ elif source == "total_mev_fees_usd":
+ parent_value = total_mev_fees_usd
+ elif source == "total_stablecoin_fees_usd":
+ parent_value = total_stablecoin_fees_usd
+ elif source == "total_app_fees_usd_excl_stable_mev":
+ parent_value = total_app_fees_usd_excl_stable_mev
+ else:
+ parent_value = total_fees_usd # fallback
+
+ # Calculate percentage as a sub-component (this represents the breakdown)
+ if parent_value > 0:
+ pct_of_parent = (value / parent_value) * 100
+ # But we want to show what percentage this is of the total fees too
+ pct_of_total = (value / total_fees_usd) * 100
+ sankey_rows.append(
+ {
+ "chain_set": chain,
+ "source": source,
+ "destination": destination,
+ "value": value,
+ "pct_of_total_fees_usd": pct_of_total, # This shows the sub-component as % of total
+ }
+ )
+
+ return pd.DataFrame(sankey_rows)
+
+
+def validate_output(df: pd.DataFrame) -> None:
+ """Validate the output dataset structure and percentages."""
+ log.info("Validating output dataset")
+
+ # Basic structure validation
+ required_columns = ["chain_set", "source", "destination", "value", "pct_of_total_fees_usd"]
+ missing_columns = [col for col in required_columns if col not in df.columns]
+ if missing_columns:
+ log.error("Missing required columns", missing_columns=missing_columns)
+ raise ValueError(f"Missing required columns: {missing_columns}")
+
+ log.info("Schema validation passed", columns=list(df.columns))
+
+ # Percentage validation - Level 1 edges (main categories) should sum to 100% per chain
+ level1_df = df[df["source"] == "total_fees_usd"]
+ if len(level1_df) > 0:
+ percentage_summary = level1_df.groupby("chain_set")["pct_of_total_fees_usd"].sum().round(1)
+
+ log.info("Level 1 percentage validation")
+ chains_not_100 = percentage_summary[percentage_summary != 100.0]
+ if len(chains_not_100) > 0:
+ log.error(
+ "Level 1 percentages don't sum to 100%", invalid_chains=chains_not_100.to_dict()
+ )
+ else:
+ log.info("All Level 1 percentages sum to 100%", chains_count=len(percentage_summary))
+
+ # Edge type breakdown
+ edge_types = df["source"].value_counts().to_dict()
+ log.info("Edge type breakdown", total_edges=len(df), edge_types=edge_types)
+
+ # Show sample Level 2 edges
+ level2_df = df[df["source"] != "total_fees_usd"]
+ if len(level2_df) > 0:
+ sample_edges = level2_df.head(5)[
+ ["chain_set", "source", "destination", "value", "pct_of_total_fees_usd"]
+ ]
+ log.info("Sample Level 2 edges", sample_data=sample_edges.to_dict("records"))
+
+
+def write_to_bigquery(df: pd.DataFrame) -> None:
+ """Write output to BigQuery."""
+ log.info(
+ "Writing to BigQuery",
+ dataset=BQ_OUTPUT_DATASET,
+ table=f"{BQ_OUTPUT_TABLE}_latest",
+ rows=len(df),
+ )
+
+ # Convert to Polars for BigQuery write utility
+ pl_df = pl.from_pandas(df)
+
+ try:
+ overwrite_unpartitioned_table(pl_df, BQ_OUTPUT_DATASET, f"{BQ_OUTPUT_TABLE}_latest")
+ log.info("Successfully wrote to BigQuery")
+ except Exception as e:
+ log.error("Error writing to BigQuery", error=str(e))
+
+
+def write_to_clickhouse(df: pd.DataFrame) -> None:
+ """Write output to ClickHouse."""
+ log.info(
+ "Writing to ClickHouse", database=CH_OUTPUT_DATABASE, table=CH_OUTPUT_TABLE, rows=len(df)
+ )
+
+ # Create table if not exists
+ create_table_sql = f"""
+ CREATE TABLE IF NOT EXISTS {CH_OUTPUT_DATABASE}.{CH_OUTPUT_TABLE} (
+ chain_set String,
+ source String,
+ destination String,
+ value Float64,
+ pct_of_total_fees_usd Float64,
+ created_at DateTime DEFAULT now()
+ ) ENGINE = ReplacingMergeTree(created_at)
+ ORDER BY (chain_set, source, destination)
+ """
+
+ try:
+ # Create table
+ run_statememt_oplabs(create_table_sql)
+
+ # Convert DataFrame to Arrow for ClickHouse
+ import pyarrow as pa
+
+ arrow_table = pa.Table.from_pandas(df)
+
+ # Insert data
+ insert_oplabs(CH_OUTPUT_DATABASE, CH_OUTPUT_TABLE, arrow_table)
+ log.info("Successfully wrote to ClickHouse")
+ except Exception as e:
+ log.error("Error writing to ClickHouse", error=str(e))
+
+
+def execute_pull(days: int = 90, dry_run: bool = False):
+ """Generate Sankey diagram dataset for Superchain fee flows."""
+
+ with bound_contextvars(pipeline_step="fees_sankey"):
+ log.info("Starting Sankey fees dataset generation", days=days, dry_run=dry_run)
+
+ # Initialize BigQuery client
+ client = bigquery.Client()
+
+ # Get source data
+ log.info("Querying source data", table=BQ_SOURCE_TABLE)
+ query = get_source_data(days)
+
+ try:
+ query_job = client.query(query)
+ # Convert to pandas without db-dtypes
+ rows = query_job.result()
+ data = []
+ for row in rows:
+ data.append(dict(row))
+ source_df = pd.DataFrame(data)
+
+ log.info(
+ "Retrieved chains with fee data",
+ chains_count=len(source_df),
+ max_rss=memory_usage(),
+ )
+ if len(source_df) > 0:
+ log.info(
+ "Date range aggregation",
+ start_date=source_df["start_date"].min(),
+ end_date=source_df["end_date"].max(),
+ avg_days_per_chain=source_df["days_of_data"].mean(),
+ requested_days=days,
+ )
+ log.info(
+ "Fee data summary",
+ min_fees_usd=source_df["total_chain_fees_usd"].min(),
+ max_fees_usd=source_df["total_chain_fees_usd"].max(),
+ total_aggregated_fees=source_df["total_chain_fees_usd"].sum(),
+ )
+
+ except Exception as e:
+ log.error("Error querying source data", error=str(e))
+ raise
+
+ # Process fee flows
+ log.info("Processing fee flows")
+ sankey_df = process_fee_flows(source_df)
+ log.info("Generated fee flow edges", edges_count=len(sankey_df))
+
+ if len(sankey_df) == 0:
+ log.error("No fee flow edges generated")
+ raise ValueError("No fee flow edges generated")
+
+ # Validate output
+ validate_output(sankey_df)
+
+ # Show sample output
+ log.info("Sample output preview")
+ log.info(sankey_df.head(10).to_string(index=False))
+
+ if not dry_run:
+ # Write to databases
+ write_to_bigquery(sankey_df)
+ write_to_clickhouse(sankey_df)
+ else:
+ log.info("Dry run complete - no data written")
+
+ log.info("Sankey fees dataset generation complete", max_rss=memory_usage())
+
+ # Return summary following datasources pattern
+ return {
+ "chains_processed": len(source_df),
+ "edges_generated": len(sankey_df),
+ "level1_edges": len(sankey_df[sankey_df["source"] == "total_fees_usd"]),
+ "level2_edges": len(sankey_df[sankey_df["source"] != "total_fees_usd"]),
+ "dry_run": dry_run,
+ "days_requested": days,
+ "dataframe": sankey_df, # Include DataFrame for prototyping/analysis
+ }
+
+
+if __name__ == "__main__":
+ import argparse
+
+ parser = argparse.ArgumentParser(
+ description="Generate Sankey diagram dataset for Superchain fee flows"
+ )
+ parser.add_argument("--days", type=int, default=90, help="Number of days to look back")
+ parser.add_argument(
+ "--dry-run", action="store_true", help="Don't write to databases, just validate"
+ )
+
+ args = parser.parse_args()
+ result = execute_pull(args.days, args.dry_run)
+ log.info("Execution summary", **result)