-
Notifications
You must be signed in to change notification settings - Fork 85
Chain metadata aggregator PR3 - Loaders #1706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
dioptx
wants to merge
33
commits into
main
Choose a base branch
from
chain-metadata-aggregator-pr3
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 19 commits
Commits
Show all changes
33 commits
Select commit
Hold shift + click to select a range
d5e72e9
implement csv loader
dioptx 6ffae5c
trim base loader
dioptx 1f45e62
Implement defillama metadata loader with existing functionality
dioptx f4d892c
implement the bq chain metadata loader
dioptx 1269385
Implement the l2beat metadata loader
dioptx 909ee1e
implement the dune metadata loader
dioptx a236bef
Add a goldsky chain usage loader
dioptx fa25c13
add test files
dioptx d254489
Add a script to test all loaders with real data
dioptx 32b941b
mypy
dioptx 578240e
fix pytests
dioptx 4022a6d
fix pytests
dioptx 80875df
Update loaders to reflect harmonization
dioptx 23881fc
add missing files
dioptx 9662d92
fix mypy
dioptx f2a6b56
fix mypy
dioptx adef449
reintroduce manual chain mappings
dioptx 60aca4a
refactor and simplify
dioptx 9dbe108
removed redundant mappings
dioptx 5a078ab
transition to a functional setup
dioptx 099a801
small cleanup
dioptx 0d6741c
more refactors
dioptx 31ca48c
clean ups
dioptx b17b0a9
nit
dioptx 29cd6c8
nit
dioptx 00e7cc9
migrate to individual dagster assets
dioptx a228410
remove redundant comments
dioptx 9e4cd52
remove redundant inits
dioptx cf5ba1c
fix deduplication logic
dioptx 3782bb6
read from gcs and cleanup
dioptx df72c12
reformat and fix datespec
dioptx f0a9e03
upload integration tests
dioptx f98f40e
ignore integration tests
dioptx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| import os | ||
| import polars as pl | ||
| from op_analytics.datapipeline.chains.aggregator import build_all_chains_metadata | ||
|
|
||
| # WARNING: | ||
| # This script accesses production data sources (BigQuery, Goldsky, DefiLlama, L2Beat, Dune). | ||
| # It requires valid credentials and may incur costs. | ||
| # To run this test, you must set the OPLABS_ENV=PROD environment variable. | ||
| # Example: OPLABS_ENV=PROD python scripts/test_real_loaders.py | ||
|
|
||
|
|
||
| def test_real_aggregator(): | ||
| """ | ||
| Tests the chain metadata aggregator with real, live data sources. | ||
| """ | ||
| print("=== Testing Chain Metadata Aggregator with real data ===") | ||
|
|
||
| # Check if the environment variable is set | ||
| if os.getenv("OPLABS_ENV") != "PROD": | ||
| print("Skipping test: OPLABS_ENV is not set to PROD.") | ||
| print("Please set OPLABS_ENV=PROD to run this test.") | ||
| return | ||
|
|
||
| try: | ||
| # Run the full aggregation pipeline | ||
| aggregated_df = build_all_chains_metadata() | ||
|
|
||
| # Basic validation | ||
| assert aggregated_df is not None, "Aggregated DataFrame should not be None" | ||
| assert isinstance(aggregated_df, pl.DataFrame), "Output should be a Polars DataFrame" | ||
| assert not aggregated_df.is_empty(), "Aggregated DataFrame should not be empty" | ||
| assert "chain_key" in aggregated_df.columns, "chain_key should be in the columns" | ||
|
|
||
| print("Chain metadata aggregator test passed.") | ||
| print("Aggregated DataFrame head:") | ||
| print(aggregated_df.head(5)) | ||
| print(f"Total chains aggregated: {len(aggregated_df)}") | ||
|
|
||
| except Exception as e: | ||
| print(f"Chain Metadata Aggregator Test Error: {e}") | ||
| # Re-raise the exception to make it clear that the test failed | ||
| raise | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| test_real_aggregator() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| import polars as pl | ||
| from op_analytics.coreutils.logger import structlog | ||
| from op_analytics.datapipeline.chains.schemas import ( | ||
| harmonize_to_canonical_schema, | ||
| generate_chain_key, | ||
| ) | ||
|
|
||
| log = structlog.get_logger() | ||
|
|
||
|
|
||
| class BaseChainMetadataLoader: | ||
| """Base class for chain metadata loaders.""" | ||
|
|
||
| def __init__(self, **kwargs): | ||
| for key, value in kwargs.items(): | ||
| setattr(self, key, value) | ||
|
|
||
| def run(self) -> pl.DataFrame: | ||
| """Load and process chain metadata.""" | ||
| df = self.load_data() | ||
|
|
||
| # Ensure chain_key exists | ||
| if df.height > 0 and "chain_key" not in df.columns: | ||
|
dioptx marked this conversation as resolved.
Outdated
|
||
| raise ValueError(f"Loader {self.__class__.__name__} must add 'chain_key' column") | ||
|
|
||
| return harmonize_to_canonical_schema(df) | ||
|
|
||
| def add_metadata_columns( | ||
| self, df: pl.DataFrame, chain_key_col: str, source: str, source_rank: int | ||
| ) -> pl.DataFrame: | ||
| """Helper method to safely add metadata columns, handling empty DataFrames.""" | ||
| if df.height == 0: | ||
| return df | ||
| return df.with_columns( | ||
| [ | ||
| generate_chain_key(chain_key_col), | ||
| pl.lit(source).alias("source"), | ||
| pl.lit(source_rank).alias("source_rank"), | ||
| ] | ||
| ) | ||
|
|
||
| def load_data(self) -> pl.DataFrame: | ||
| """Override this method to load data from your source.""" | ||
| raise NotImplementedError | ||
|
|
||
|
|
||
| # Simple registry | ||
| _LOADERS = {} | ||
|
|
||
|
|
||
| def register_loader(name: str, loader_class): | ||
| """Register a loader class.""" | ||
| _LOADERS[name] = loader_class | ||
|
|
||
|
|
||
| def get_loader(name: str): | ||
| """Get a loader class by name.""" | ||
| return _LOADERS.get(name) | ||
|
|
||
|
|
||
| def list_loaders(): | ||
| """List all registered loader names.""" | ||
| return list(_LOADERS.keys()) | ||
|
|
||
|
|
||
| # For backward compatibility | ||
| class LoaderRegistry: | ||
| @classmethod | ||
| def register(cls, name: str, loader_cls): | ||
| register_loader(name, loader_cls) | ||
|
|
||
| @classmethod | ||
| def get_loader(cls, name: str): | ||
| return get_loader(name) | ||
|
|
||
| @classmethod | ||
| def list_loaders(cls): | ||
| return list_loaders() | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.