Skip to content
Draft
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d5e72e9
implement csv loader
dioptx Jul 22, 2025
6ffae5c
trim base loader
dioptx Jul 22, 2025
1f45e62
Implement defillama metadata loader with existing functionality
dioptx Jul 22, 2025
f4d892c
implement the bq chain metadata loader
dioptx Jul 22, 2025
1269385
Implement the l2beat metadata loader
dioptx Jul 22, 2025
909ee1e
implement the dune metadata loader
dioptx Jul 22, 2025
a236bef
Add a goldsky chain usage loader
dioptx Jul 22, 2025
fa25c13
add test files
dioptx Jul 22, 2025
d254489
Add a script to test all loaders with real data
dioptx Jul 22, 2025
32b941b
mypy
dioptx Jul 22, 2025
578240e
fix pytests
dioptx Jul 22, 2025
4022a6d
fix pytests
dioptx Jul 22, 2025
80875df
Update loaders to reflect harmonization
dioptx Jul 24, 2025
23881fc
add missing files
dioptx Jul 24, 2025
9662d92
fix mypy
dioptx Jul 24, 2025
f2a6b56
fix mypy
dioptx Jul 24, 2025
adef449
reintroduce manual chain mappings
dioptx Jul 24, 2025
60aca4a
refactor and simplify
dioptx Jul 28, 2025
9dbe108
removed redundant mappings
dioptx Jul 28, 2025
5a078ab
transition to a functional setup
dioptx Jul 29, 2025
099a801
small cleanup
dioptx Jul 29, 2025
0d6741c
more refactors
dioptx Jul 29, 2025
31ca48c
clean ups
dioptx Jul 29, 2025
b17b0a9
nit
dioptx Jul 29, 2025
29cd6c8
nit
dioptx Jul 29, 2025
00e7cc9
migrate to individual dagster assets
dioptx Jul 30, 2025
a228410
remove redundant comments
dioptx Jul 30, 2025
9e4cd52
remove redundant inits
dioptx Jul 30, 2025
cf5ba1c
fix deduplication logic
dioptx Jul 30, 2025
3782bb6
read from gcs and cleanup
dioptx Jul 31, 2025
df72c12
reformat and fix datespec
dioptx Jul 31, 2025
f0a9e03
upload integration tests
dioptx Aug 1, 2025
f98f40e
ignore integration tests
dioptx Aug 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions scripts/test_real_loaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import polars as pl
from op_analytics.datapipeline.chains.aggregator import build_all_chains_metadata

# WARNING:
# This script accesses production data sources (BigQuery, Goldsky, DefiLlama, L2Beat, Dune).
# It requires valid credentials and may incur costs.
# To run this test, you must set the OPLABS_ENV=PROD environment variable.
# Example: OPLABS_ENV=PROD python scripts/test_real_loaders.py


def test_real_aggregator():
"""
Tests the chain metadata aggregator with real, live data sources.
"""
print("=== Testing Chain Metadata Aggregator with real data ===")

# Check if the environment variable is set
if os.getenv("OPLABS_ENV") != "PROD":
print("Skipping test: OPLABS_ENV is not set to PROD.")
print("Please set OPLABS_ENV=PROD to run this test.")
return

try:
# Run the full aggregation pipeline
aggregated_df = build_all_chains_metadata()

# Basic validation
assert aggregated_df is not None, "Aggregated DataFrame should not be None"
assert isinstance(aggregated_df, pl.DataFrame), "Output should be a Polars DataFrame"
assert not aggregated_df.is_empty(), "Aggregated DataFrame should not be empty"
assert "chain_key" in aggregated_df.columns, "chain_key should be in the columns"

print("Chain metadata aggregator test passed.")
print("Aggregated DataFrame head:")
print(aggregated_df.head(5))
print(f"Total chains aggregated: {len(aggregated_df)}")

except Exception as e:
print(f"Chain Metadata Aggregator Test Error: {e}")
# Re-raise the exception to make it clear that the test failed
raise


if __name__ == "__main__":
test_real_aggregator()
1 change: 1 addition & 0 deletions src/op_analytics/cli/subcommands/chains/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def build_metadata_command(
manual_mappings_filepath=manual_mappings_file,
bq_project_id=bq_project_id,
bq_dataset_id=bq_dataset_id,
csv_path="src/op_analytics/datapipeline/chains/resources/chain_metadata.csv",
Comment thread
dioptx marked this conversation as resolved.
Outdated
)


Expand Down
30 changes: 14 additions & 16 deletions src/op_analytics/dagster/assets/chain_metadata.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from dagster import AssetExecutionContext, Config, asset
from pydantic import Field

from op_analytics.coreutils.logger import structlog
from op_analytics.datapipeline.chains.aggregator import build_all_chains_metadata
import polars as pl

log = structlog.get_logger()
from op_analytics.datapipeline.chains.aggregator import build_all_chains_metadata


class ChainMetadataConfig(Config):
Expand All @@ -14,25 +13,24 @@ class ChainMetadataConfig(Config):
manual_mappings_filepath: str = Field(description="Path to manual mappings configuration file")
bq_project_id: str = Field(description="BigQuery project ID for data operations")
bq_dataset_id: str = Field(description="BigQuery dataset ID for table operations")
csv_path: str = Field(
default="src/op_analytics/datapipeline/chains/resources/manual_chain_mappings.csv",
description="Path to the chain metadata CSV file (base chain data)",
)


@asset
def all_chains_metadata_asset(context: AssetExecutionContext, config: ChainMetadataConfig):
log.info("Chain metadata aggregation asset started")

log.info(
"Asset configuration",
output_bq_table=config.output_bq_table,
manual_mappings_filepath=config.manual_mappings_filepath,
bq_project_id=config.bq_project_id,
bq_dataset_id=config.bq_dataset_id,
)

build_all_chains_metadata(
def all_chains_metadata_asset(
context: AssetExecutionContext, config: ChainMetadataConfig
) -> pl.DataFrame:
"""Asset that aggregates chain metadata from multiple sources."""
result_df: pl.DataFrame = build_all_chains_metadata(
output_bq_table=config.output_bq_table,
manual_mappings_filepath=config.manual_mappings_filepath,
bq_project_id=config.bq_project_id,
bq_dataset_id=config.bq_dataset_id,
csv_path=config.csv_path,
)

log.info("Chain metadata aggregation asset completed successfully")
context.log.info(f"Chain metadata aggregation completed: {result_df.height} records")
return result_df
219 changes: 56 additions & 163 deletions src/op_analytics/datapipeline/chains/aggregator.py
Original file line number Diff line number Diff line change
@@ -1,182 +1,75 @@
"""
ChainMetadataAggregator module for op_analytics.datapipeline.chains.
ChainMetadataAggregator for op_analytics.datapipeline.chains

This module provides functionality to aggregate and process metadata from multiple chains,
performing entity resolution, deduplication, and enrichment before outputting to BigQuery.
This module orchestrates the aggregation of chain metadata from various sources.
"""

from typing import Any, Callable

import polars as pl

from op_analytics.coreutils.bigquery.write import overwrite_unpartitioned_table
from op_analytics.coreutils.logger import structlog
from op_analytics.datapipeline.chains.mapping_utils import load_manual_mappings
from op_analytics.datapipeline.chains import ingestors
from op_analytics.datapipeline.chains.mapping_utils import (
apply_mapping_rules,
load_manual_mappings,
)

log = structlog.get_logger()


def _run_ingestors(bq_project_id: str, bq_dataset_id: str, csv_path: str) -> list[pl.DataFrame]:
"""Run all ingestors and return successful results."""
ingestor_configs: list[tuple[str, Callable[[], pl.DataFrame]]] = [
("CSV", lambda: ingestors.ingest_from_csv(csv_path)),
("L2Beat", ingestors.ingest_from_l2beat),
("DefiLlama", ingestors.ingest_from_defillama),
("Dune", ingestors.ingest_from_dune),
("BQ OP Stack", lambda: ingestors.ingest_from_bq_op_stack(bq_project_id, bq_dataset_id)),
("BQ Goldsky", lambda: ingestors.ingest_from_bq_goldsky(bq_project_id, bq_dataset_id)),
]

dataframes: list[pl.DataFrame] = []
for name, ingestor in ingestor_configs:
df: pl.DataFrame = ingestor()
log.info(f"Ingested {df.height} records from {name}")
dataframes.append(df)

return dataframes


def build_all_chains_metadata(
output_bq_table: str,
manual_mappings_filepath: str,
bq_project_id: str,
bq_dataset_id: str,
) -> None:
csv_path: str,
) -> pl.DataFrame:
"""
Build aggregated metadata for all chains with comprehensive processing pipeline.
Orchestrates the complete chain metadata aggregation pipeline.

This function orchestrates the complete chain metadata aggregation pipeline,
including data loading, preprocessing, combination, entity resolution, deduplication,
enrichment, validation, and output to BigQuery.

Args:
output_bq_table (str): Target BigQuery table name for aggregated metadata
manual_mappings_filepath (str): Path to manual mappings configuration file
bq_project_id (str): BigQuery project ID for data operations
bq_dataset_id (str): BigQuery dataset ID for table operations

Returns:
None
Pipeline: ingest → concat → dedupe → map → write
"""
log.info("Pipeline execution started")

log.info(
"Pipeline configuration parameters",
output_bq_table=output_bq_table,
manual_mappings_filepath=manual_mappings_filepath,
bq_project_id=bq_project_id,
bq_dataset_id=bq_dataset_id,
)
# Ingest from all sources
dataframes: list[pl.DataFrame] = _run_ingestors(bq_project_id, bq_dataset_id, csv_path)

# Load manual mappings configuration early in the pipeline
try:
manual_mappings = load_manual_mappings(manual_mappings_filepath)
log.info(
"Manual mappings loaded successfully",
mapping_count=len(manual_mappings),
filepath=manual_mappings_filepath,
)
except Exception as e:
log.error(
"Failed to load manual mappings",
error=str(e),
filepath=manual_mappings_filepath,
)
raise RuntimeError(
f"Failed to load manual mappings from {manual_mappings_filepath}: {e}"
) from e

# TODO: Implement the following pipeline steps:

# Step 1: Load Data Sources
# Intent: Load chain metadata from multiple BigQuery data sources using standardized loaders:
# - op_stack_chain_metadata from api_table_uploads (OP Labs source, source_rank=1)
# - Goldsky chain usage data from daily_aggegate_l2_chain_usage_goldsky (OP Labs source, source_rank=1)
# - L2Beat activity data from daily_l2beat_l2_activity (L2Beat source, source_rank=2)
# - L2Beat metadata extended from l2beat_metadata_extended (L2Beat source, source_rank=1.9)
# - GrowThePie activity data from daily_growthepie_l2_activity (GrowThePie source, source_rank=3)
# - Dune transaction data from dune_all_txs (Dune source, source_rank=4.5)
# - DefiLlama chain TVL data from daily_defillama_chain_tvl (DefiLlama source, source_rank=5)
# - Manual mappings file for known corrections/overrides and special case handling
# Each source loader should return standardized DataFrames with consistent core columns
# Design allows easy addition of new data sources by implementing the standardized interface
# Note: Some sources will have unique metadata fields (e.g., L2Beat stage, DA layer) that others lack
log.info("Step 1: Load Data Sources - Not yet implemented")

# Step 2: Preprocess Individual Sources
# Intent: Clean and standardize each data source individually before combining:
# - Generate chain_key column (hash of chain_name for grouping similar names)
# - Standardize chain_id column (ensure consistent format, handle known collisions)
# - Apply source-specific transformations and data type conversions
# - Normalize chain names and symbols for consistency across sources
# - Apply "best display name" selection logic (e.g., prefer "Arbitrum One" over "Arbitrum")
# - Handle special cases through repeatable functions (e.g., Celo L1->L2 transition)
# This preprocessing ensures each source is standardized before entity resolution
log.info("Step 2: Preprocess Individual Sources - Not yet implemented")

# Step 3: Combine Preprocessed Sources
# Intent: Concatenate all preprocessed source DataFrames into unified dataset:
# - Union all preprocessed source_dfs into all_sources_df using pd.concat()
# - Maintain source attribution and ranking for later prioritization
# - Log total records combined from all sources
# - Result is a comprehensive dataset ready for entity resolution
log.info("Step 3: Combine Preprocessed Sources - Not yet implemented")

# Step 4: Entity Resolution (Most Complex Step)
# Intent: Generate unified_key and apply manual mappings to resolve chain entities:
# - Create unified_key column combining chain_id, chain_key, and display_name logic
# - Apply manual mappings to correct data inconsistencies and handle edge cases
# - Use metadata file approach for special cases (e.g., Celo -l2 suffix) rather than hardcoded logic
# - Handle chain_id collisions through sophisticated matching algorithms
# - Manual mappings applied BEFORE grouping to ensure correct entity resolution
# Success of this step depends heavily on quality of preprocessing in Steps 1-2
log.info("Step 4: Entity Resolution - Starting implementation with manual mappings")

# Example of how manual mappings would be integrated:
# When Step 4 is implemented, it would include:
# combined_df = ... # Result from Step 3
#
# # Apply manual mappings to resolve special cases and data inconsistencies
# resolved_df = apply_mapping_rules(combined_df, manual_mappings)
#
# # Continue with unified_key generation and entity resolution
# resolved_df = resolved_df.with_columns([
# # Generate unified_key for entity resolution
# pl.coalesce([
# pl.col("chain_id"),
# pl.col("chain_key"),
# pl.col("display_name").str.to_lowercase().str.replace_all(r"[^\w]+", "")
# ]).alias("unified_key")
# ])

log.info("Step 4: Entity Resolution - Manual mapping integration ready")

# Step 5: Deduplication and Attribute Merging
# Intent: Group by unified_key and merge attributes using source prioritization:
# - Group records by unified_key (represents same chain entity)
# - Within each group, sort by source_rank (lower rank = higher priority)
# - Select primary record from highest priority source
# - Aggregate temporal fields: min_dt_day (min), max_dt_day (max)
# - Concatenate tracking fields: data_sources, all_chain_keys
# - Apply secondary deduplication using is_dupe logic for remaining duplicates
# - Results in one canonical record per unique chain entity
log.info("Step 5: Deduplication and Attribute Merging - Not yet implemented")

# Step 6: Field Enrichment
# Intent: Enhance deduplicated data with additional metadata fields:
# - Load op_stack_chain_metadata specifically for enrichment (not as competing source)
# - Left join with deduplicated data on chain_id using flexible matching logic
# - Coalesce/add fields: gas_token, da_layer, public_mainnet_launch_date, etc.
# - Calculate derived fields: provider_entity_w_superchain, eth_eco_l2l3, etc.
# - Alternative approach: Join back to original datasets using chain aliases for field extraction
# - Integration point for registry ingestion and manual mapping outputs
# - Apply business logic overrides using metadata file configurations
log.info("Step 6: Field Enrichment - Not yet implemented")

# Step 7: Data Quality Validation
# Intent: Perform comprehensive error checking and data quality validation:
# - Detect potential duplicates (exact matches or high similarity across columns)
# - Validate data consistency and completeness
# - Check for unexpected chain_id collisions or mapping conflicts
# - Generate data quality reports and warnings
# - Flag records requiring manual review
# - Ensure final dataset meets quality standards before output
log.info("Step 7: Data Quality Validation - Not yet implemented")

# Step 8: Output to BigQuery
# Intent: Write the final validated metadata to BigQuery with comprehensive logging:
# - Format final DataFrame according to target BigQuery schema requirements
# - Perform final data validation before writing
# - Write to specified BigQuery table using write_full_df_to_bq with if_exists='replace'
# - Log detailed success/failure status and record counts
# - Update processing metadata and create comprehensive audit logs
# - Generate summary statistics and data lineage information
log.info("Step 8: Output to BigQuery - Not yet implemented")

log.info("Pipeline execution finished")


if __name__ == "__main__":
# Example usage with enhanced manual mappings support
# In production, these would come from command line arguments or configuration
build_all_chains_metadata(
output_bq_table="aggregated_chains_metadata",
manual_mappings_filepath="src/op_analytics/datapipeline/chains/resources/manual_chain_mappings.csv",
bq_project_id="op-analytics-dev",
bq_dataset_id="chains_metadata",
# Combine and deduplicate
all_chains_df: pl.DataFrame = pl.concat(dataframes, how="vertical")
unique_df: pl.DataFrame = all_chains_df.unique(
subset=["chain_key", "source_name"], keep="first"
)
log.info(f"Aggregated {unique_df.height} unique records")

# Apply manual mappings
mapping_rules: list[dict[str, Any]] = load_manual_mappings(manual_mappings_filepath)
final_df: pl.DataFrame = apply_mapping_rules(unique_df, mapping_rules)

# Write to BigQuery
dataset: str
table_name: str
dataset, table_name = output_bq_table.split(".", 1)
overwrite_unpartitioned_table(final_df, dataset, table_name)
log.info(f"Wrote {final_df.height} records to {output_bq_table}")

return final_df
Loading