Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d5e72e9
implement csv loader
dioptx Jul 22, 2025
6ffae5c
trim base loader
dioptx Jul 22, 2025
1f45e62
Implement defillama metadata loader with existing functionality
dioptx Jul 22, 2025
f4d892c
implement the bq chain metadata loader
dioptx Jul 22, 2025
1269385
Implement the l2beat metadata loader
dioptx Jul 22, 2025
909ee1e
implement the dune metadata loader
dioptx Jul 22, 2025
a236bef
Add a goldsky chain usage loader
dioptx Jul 22, 2025
fa25c13
add test files
dioptx Jul 22, 2025
d254489
Add a script to test all loaders with real data
dioptx Jul 22, 2025
32b941b
mypy
dioptx Jul 22, 2025
578240e
fix pytests
dioptx Jul 22, 2025
4022a6d
fix pytests
dioptx Jul 22, 2025
80875df
Update loaders to reflect harmonization
dioptx Jul 24, 2025
23881fc
add missing files
dioptx Jul 24, 2025
9662d92
fix mypy
dioptx Jul 24, 2025
f2a6b56
fix mypy
dioptx Jul 24, 2025
adef449
reintroduce manual chain mappings
dioptx Jul 24, 2025
60aca4a
refactor and simplify
dioptx Jul 28, 2025
9dbe108
removed redundant mappings
dioptx Jul 28, 2025
5a078ab
transition to a functional setup
dioptx Jul 29, 2025
099a801
small cleanup
dioptx Jul 29, 2025
0d6741c
more refactors
dioptx Jul 29, 2025
31ca48c
clean ups
dioptx Jul 29, 2025
b17b0a9
nit
dioptx Jul 29, 2025
29cd6c8
nit
dioptx Jul 29, 2025
00e7cc9
migrate to individual dagster assets
dioptx Jul 30, 2025
a228410
remove redundant comments
dioptx Jul 30, 2025
9e4cd52
remove redundant inits
dioptx Jul 30, 2025
cf5ba1c
fix deduplication logic
dioptx Jul 30, 2025
3782bb6
read from gcs and cleanup
dioptx Jul 31, 2025
df72c12
reformat and fix datespec
dioptx Jul 31, 2025
f0a9e03
upload integration tests
dioptx Aug 1, 2025
f98f40e
ignore integration tests
dioptx Aug 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ filterwarnings = [
'ignore:Type google._upb._message:',
]
addopts = "--cov=src/ --cov-report html"
norecursedirs = "tests/integration"

[build-system]
requires = ["hatchling"]
Expand Down
27 changes: 27 additions & 0 deletions resources/manual_chain_mappings.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
mapping_type,identifier_type,identifier_value,source_filter,target_field,new_value,conditions,start_date,end_date,description,enabled
chain_id_override,chain,molten,,chain_id,360,,,,Handle molten chain ID collision by setting to 360,true
field_override,chain,celol2,,chain_id,{chain_id}-l2,,,,Add -l2 suffix for Celo L2 transition,true
field_override,chain,celo,,chain_id,{chain_id}-l2,layer=L2,,,Add -l2 suffix for Celo chains when layer is L2,true
field_override,chain,kardia,,chain_id,24,,,,Special case: Kardia chain ID override,true
display_name_preference,display_name,arbitrum,,display_name,Arbitrum One,,,,Prefer 'Arbitrum One' over 'Arbitrum',true
display_name_preference,display_name,optimism,,display_name,OP Mainnet,,,,Prefer 'OP Mainnet' over 'Optimism',true
display_name_preference,display_name,polygon,,display_name,Polygon PoS,,,,Prefer 'Polygon PoS' over 'Polygon',true
field_override,display_name,Polygon PoS,,provider,Polygon,,,,Override provider for Polygon PoS,true
field_override,display_name,Polygon PoS,,provider_entity,Polygon: CDK,,,,Override provider entity for Polygon PoS,true
display_name_preference,display_name,Base,,display_name,Base Mainnet,,,,Prefer 'Base Mainnet' over 'Base',true
display_name_preference,display_name,Zora,,display_name,Zora Network,,,,Prefer 'Zora Network' over 'Zora',true
display_name_preference,display_name,Mode,,display_name,Mode Network,,,,Prefer 'Mode Network' over 'Mode',true
field_override,chain,ethereum,,layer,L1,,,,Set Ethereum as L1,true
field_override,chain,bitcoin,,layer,L1,,,,Set Bitcoin as L1,true
field_override,provider,OP Stack,,provider_entity,Optimism: OP Stack,,,,Set provider_entity for chains with OP Stack provider,true
field_override,chain,*mainnet*,,is_mainnet,true,,,,Set mainnet flag for chains containing 'mainnet',false
field_override,chain,*testnet*,,is_testnet,true,,,,Set testnet flag for chains containing 'testnet',false
field_override,chain_id,*,,chain_id_str,{chain_id},,,,Convert chain_id to string type,true
field_override,provider,ZK Stack,,provider_entity,zkSync: ZK Stack,,,,Set provider_entity for chains with ZK Stack provider,true
field_override,provider,Polygon,,provider_entity,Polygon: CDK,,,,Set provider_entity for chains with Polygon provider,true
field_override,provider,Arbitrum,,provider_entity,Arbitrum: Orbit,,,,Set provider_entity for chains with Arbitrum provider,true
field_override,provider,Starkware,,provider_entity,Starkware: SN Stack,,,,Set provider_entity for chains with Starkware provider,true
field_override,provider,Starknet,,provider_entity,Starkware: SN Stack,,,,Set provider_entity for chains with Starknet provider,true
field_override,provider,StarkEx,,provider_entity,Starkware: SN Stack,,,,Set provider_entity for chains with StarkEx provider,true
field_override,provider,SN Stack,,provider_entity,Starkware: SN Stack,,,,Set provider_entity for chains with SN Stack provider,true
field_override,provider,OVM,,provider_entity,Optimism: OP Stack,,,,Set provider_entity for chains with OVM provider,true
46 changes: 46 additions & 0 deletions scripts/test_real_loaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import polars as pl
from op_analytics.datapipeline.chains.aggregator import build_all_chains_metadata

# WARNING:
# This script accesses production data sources (BigQuery, Goldsky, DefiLlama, L2Beat, Dune).
# It requires valid credentials and may incur costs.
# To run this test, you must set the OPLABS_ENV=PROD environment variable.
# Example: OPLABS_ENV=PROD python scripts/test_real_loaders.py


def test_real_aggregator():
"""
Tests the chain metadata aggregator with real, live data sources.
"""
print("=== Testing Chain Metadata Aggregator with real data ===")

# Check if the environment variable is set
if os.getenv("OPLABS_ENV") != "PROD":
print("Skipping test: OPLABS_ENV is not set to PROD.")
print("Please set OPLABS_ENV=PROD to run this test.")
return

try:
# Run the full aggregation pipeline
aggregated_df = build_all_chains_metadata()

# Basic validation
assert aggregated_df is not None, "Aggregated DataFrame should not be None"
assert isinstance(aggregated_df, pl.DataFrame), "Output should be a Polars DataFrame"
assert not aggregated_df.is_empty(), "Aggregated DataFrame should not be empty"
assert "chain_key" in aggregated_df.columns, "chain_key should be in the columns"

print("Chain metadata aggregator test passed.")
print("Aggregated DataFrame head:")
print(aggregated_df.head(5))
print(f"Total chains aggregated: {len(aggregated_df)}")

except Exception as e:
print(f"Chain Metadata Aggregator Test Error: {e}")
# Re-raise the exception to make it clear that the test failed
raise


if __name__ == "__main__":
test_real_aggregator()
87 changes: 55 additions & 32 deletions src/op_analytics/cli/subcommands/chains/app.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import json
import os
import json
from datetime import datetime, timedelta
from pathlib import Path

import typer
from rich import print
from typing_extensions import Annotated

import op_analytics.datapipeline.rpcs as rpcs
from op_analytics.coreutils.logger import structlog
from op_analytics.coreutils.partitioned.location import DataLocation
from op_analytics.coreutils.rangeutils.blockrange import BlockRange
Expand All @@ -29,6 +29,7 @@
)
from op_analytics.datapipeline.schemas import ONCHAIN_CURRENT_VERSION
from op_analytics.datapipeline.orchestrate import normalize_chains, normalize_blockbatch_models
from op_analytics.datapipeline import rpcs

log = structlog.get_logger()

Expand All @@ -39,6 +40,8 @@
pretty_exceptions_show_locals=False,
)

chains_app = typer.Typer(help="Chain metadata pipeline commands")


@app.command()
def health():
Expand Down Expand Up @@ -66,47 +69,67 @@ def get_receipts(chain: str, tx_hashes: list[str]):
print(json.dumps(txs, indent=2))


@app.command(name="build-metadata")
def build_metadata_command(
output_bq_table: Annotated[
str,
typer.Option(
"--output-bq-table", help="Target BigQuery table name for aggregated metadata output"
),
],
manual_mappings_file: Annotated[
str,
typer.Option("--manual-mappings-file", help="Path to manual mappings configuration file"),
],
bq_project_id: Annotated[
str, typer.Option("--bq-project-id", help="BigQuery project ID for data operations")
],
bq_dataset_id: Annotated[
str, typer.Option("--bq-dataset-id", help="BigQuery dataset ID for table operations")
],
@chains_app.command()
def build_metadata(
output_bq_table: str = typer.Option(
"analytics.chain_metadata",
help="Target BigQuery table for aggregated metadata (format: dataset.table)",
),
bq_project_id: str = typer.Option(
"oplabs-tools-data", help="BigQuery project ID for data operations"
),
bq_dataset_id: str = typer.Option("raw_data", help="BigQuery dataset ID for table operations"),
):
"""
Build aggregated metadata for all chains.
Build aggregated chain metadata from multiple sources.

This command orchestrates the complete chain metadata aggregation pipeline,
including data loading, preprocessing, entity resolution, deduplication,
enrichment, and output to BigQuery.
including data loading, preprocessing, combination, entity resolution,
deduplication, enrichment, validation, and output to BigQuery.
"""
print("Building chain metadata with the following parameters:")
print(f" Output BigQuery Table: {output_bq_table}")
print(f" Manual Mappings File: {manual_mappings_file}")
print(f" BigQuery Project ID: {bq_project_id}")
print(f" BigQuery Dataset ID: {bq_dataset_id}")
print()

# Call the aggregator function
build_all_chains_metadata(
log.info("Starting chain metadata aggregation pipeline")

manual_mappings_file = "resources/manual_chain_mappings.csv"
log.info(f"Using manual mappings from: {manual_mappings_file}")

result_df = build_all_chains_metadata(
output_bq_table=output_bq_table,
manual_mappings_filepath=manual_mappings_file,
bq_project_id=bq_project_id,
bq_dataset_id=bq_dataset_id,
)

log.info(f"Chain metadata aggregation completed successfully: {result_df.height} records")


def build_metadata_command(
output_bq_table: str,
manual_mappings_file: str,
bq_project_id: str,
bq_dataset_id: str,
):
"""
Legacy function for backward compatibility.

Builds aggregated chain metadata from multiple sources.
"""
log.info("Starting chain metadata aggregation pipeline (legacy)")

# Use provided manual mappings file or fallback to hardcoded path
mappings_file = (
manual_mappings_file
if Path(manual_mappings_file).exists()
else "resources/manual_chain_mappings.csv"
)
log.info(f"Using manual mappings from: {mappings_file}")

build_all_chains_metadata(
output_bq_table=output_bq_table,
manual_mappings_filepath=mappings_file,
bq_project_id=bq_project_id,
bq_dataset_id=bq_dataset_id,
)


@app.command()
def goldsky_sql(
Expand Down
Loading