diff --git a/dbt/models/eia861/core_eia861__yearly_demand_response/schema.yml b/dbt/models/eia861/core_eia861__yearly_demand_response/schema.yml index a03cdcdcea..9f4418a329 100644 --- a/dbt/models/eia861/core_eia861__yearly_demand_response/schema.yml +++ b/dbt/models/eia861/core_eia861__yearly_demand_response/schema.yml @@ -4,7 +4,10 @@ sources: tables: - name: core_eia861__yearly_demand_response data_tests: - - expect_columns_not_all_null + - expect_columns_not_all_null: + arguments: + row_conditions: + short_form: EXTRACT(year FROM report_date) = 2019 - check_row_counts_per_partition: arguments: table_name: core_eia861__yearly_demand_response diff --git a/dbt/models/eia861/core_eia861__yearly_distribution_systems/schema.yml b/dbt/models/eia861/core_eia861__yearly_distribution_systems/schema.yml index c2d146c6fa..95a539c241 100644 --- a/dbt/models/eia861/core_eia861__yearly_distribution_systems/schema.yml +++ b/dbt/models/eia861/core_eia861__yearly_distribution_systems/schema.yml @@ -4,7 +4,10 @@ sources: tables: - name: core_eia861__yearly_distribution_systems data_tests: - - expect_columns_not_all_null + - expect_columns_not_all_null: + arguments: + row_conditions: + short_form: EXTRACT(year FROM report_date) = 2019 - check_row_counts_per_partition: arguments: table_name: core_eia861__yearly_distribution_systems diff --git a/dbt/models/eia861/core_eia861__yearly_energy_efficiency/schema.yml b/dbt/models/eia861/core_eia861__yearly_energy_efficiency/schema.yml index b8d006f9a5..d103c0faf6 100644 --- a/dbt/models/eia861/core_eia861__yearly_energy_efficiency/schema.yml +++ b/dbt/models/eia861/core_eia861__yearly_energy_efficiency/schema.yml @@ -4,7 +4,10 @@ sources: tables: - name: core_eia861__yearly_energy_efficiency data_tests: - - expect_columns_not_all_null + - expect_columns_not_all_null: + arguments: + row_conditions: + short_form: EXTRACT(year FROM report_date) = 2019 - check_row_counts_per_partition: arguments: table_name: core_eia861__yearly_energy_efficiency diff --git a/dbt/models/eia861/core_eia861__yearly_mergers/schema.yml b/dbt/models/eia861/core_eia861__yearly_mergers/schema.yml index b94a6e4519..92c0cecd81 100644 --- a/dbt/models/eia861/core_eia861__yearly_mergers/schema.yml +++ b/dbt/models/eia861/core_eia861__yearly_mergers/schema.yml @@ -4,7 +4,12 @@ sources: tables: - name: core_eia861__yearly_mergers data_tests: - - expect_columns_not_all_null + - expect_columns_not_all_null: + arguments: + row_conditions: + entity_type: EXTRACT(year FROM report_date) <= 2011 + state: EXTRACT(year FROM report_date) <= 2012 + zip_code_4: EXTRACT(year FROM report_date) <= 2010 - check_row_counts_per_partition: arguments: table_name: core_eia861__yearly_mergers diff --git a/dbt/models/eia861/core_eia861__yearly_net_metering_customer_fuel_class/schema.yml b/dbt/models/eia861/core_eia861__yearly_net_metering_customer_fuel_class/schema.yml index feb35afa16..5e4666c911 100644 --- a/dbt/models/eia861/core_eia861__yearly_net_metering_customer_fuel_class/schema.yml +++ b/dbt/models/eia861/core_eia861__yearly_net_metering_customer_fuel_class/schema.yml @@ -4,7 +4,13 @@ sources: tables: - name: core_eia861__yearly_net_metering_customer_fuel_class data_tests: - - expect_columns_not_all_null + - expect_columns_not_all_null: + arguments: + row_conditions: + capacity_mw: EXTRACT(year FROM report_date) >= 2010 + energy_capacity_mwh: EXTRACT(year FROM report_date) >= 2023 + short_form: EXTRACT(year FROM report_date) = 2019 + sold_to_utility_mwh: EXTRACT(year FROM report_date) >= 2007 - check_row_counts_per_partition: arguments: table_name: core_eia861__yearly_net_metering_customer_fuel_class diff --git a/dbt/models/eia861/core_eia861__yearly_operational_data_misc/schema.yml b/dbt/models/eia861/core_eia861__yearly_operational_data_misc/schema.yml index c03e008f81..20e4070bd1 100644 --- a/dbt/models/eia861/core_eia861__yearly_operational_data_misc/schema.yml +++ b/dbt/models/eia861/core_eia861__yearly_operational_data_misc/schema.yml @@ -4,7 +4,11 @@ sources: tables: - name: core_eia861__yearly_operational_data_misc data_tests: - - expect_columns_not_all_null + - expect_columns_not_all_null: + arguments: + row_conditions: + consumed_by_facility_mwh: EXTRACT(year FROM report_date) <= 2003 + short_form: EXTRACT(year FROM report_date) = 2019 - check_row_counts_per_partition: arguments: table_name: core_eia861__yearly_operational_data_misc diff --git a/dbt/models/eia861/core_eia861__yearly_reliability/schema.yml b/dbt/models/eia861/core_eia861__yearly_reliability/schema.yml index d0f0633f40..cfbd21d056 100644 --- a/dbt/models/eia861/core_eia861__yearly_reliability/schema.yml +++ b/dbt/models/eia861/core_eia861__yearly_reliability/schema.yml @@ -4,7 +4,10 @@ sources: tables: - name: core_eia861__yearly_reliability data_tests: - - expect_columns_not_all_null + - expect_columns_not_all_null: + arguments: + row_conditions: + short_form: EXTRACT(year FROM report_date) = 2019 - check_row_counts_per_partition: arguments: table_name: core_eia861__yearly_reliability diff --git a/dbt/models/eia861/core_eia861__yearly_sales/schema.yml b/dbt/models/eia861/core_eia861__yearly_sales/schema.yml index 7e101e7bde..2ade61074f 100644 --- a/dbt/models/eia861/core_eia861__yearly_sales/schema.yml +++ b/dbt/models/eia861/core_eia861__yearly_sales/schema.yml @@ -4,7 +4,10 @@ sources: tables: - name: core_eia861__yearly_sales data_tests: - - expect_columns_not_all_null + - expect_columns_not_all_null: + arguments: + row_conditions: + short_form: EXTRACT(year FROM report_date) = 2019 - check_row_counts_per_partition: arguments: table_name: core_eia861__yearly_sales diff --git a/dbt/models/eia861/core_eia861__yearly_short_form/schema.yml b/dbt/models/eia861/core_eia861__yearly_short_form/schema.yml index dfba072b6a..d47fe22408 100644 --- a/dbt/models/eia861/core_eia861__yearly_short_form/schema.yml +++ b/dbt/models/eia861/core_eia861__yearly_short_form/schema.yml @@ -4,7 +4,11 @@ sources: tables: - name: core_eia861__yearly_short_form data_tests: - - expect_columns_not_all_null + - expect_columns_not_all_null: + arguments: + row_conditions: + entity_type: EXTRACT(year FROM report_date) NOT IN (2012, 2013, 2014) + has_green_pricing: EXTRACT(year FROM report_date) = 2012 - check_row_counts_per_partition: arguments: table_name: core_eia861__yearly_short_form diff --git a/src/pudl/extract/eia860.py b/src/pudl/extract/eia860.py index 33d47f1720..cd3dae8a90 100644 --- a/src/pudl/extract/eia860.py +++ b/src/pudl/extract/eia860.py @@ -30,7 +30,7 @@ def __init__(self, *args, **kwargs): self.cols_added = [] super().__init__(*args, **kwargs) - def process_raw(self, df, page, **partition): + def process_raw(self, df: pd.DataFrame, page: str, **partition): """Apply necessary pre-processing to the dataframe. * Rename columns based on our compiled spreadsheet metadata @@ -50,7 +50,7 @@ def process_raw(self, df, page, **partition): return df @staticmethod - def get_dtypes(page, **partition): + def get_dtypes(page: str, **partition): """Returns dtypes for plant id columns.""" return { "Plant ID": pd.Int64Dtype(), diff --git a/src/pudl/extract/eia861.py b/src/pudl/extract/eia861.py index c553637d82..72529af196 100644 --- a/src/pudl/extract/eia861.py +++ b/src/pudl/extract/eia861.py @@ -36,7 +36,7 @@ def __init__(self, *args, **kwargs): stacklevel=1, ) - def process_raw(self, df, page, **partition): + def process_raw(self, df: pd.DataFrame, page: str, **partition): """Rename columns with location.""" # for 2024 we began mapping the columns using the string names instead of # the numeric location. @@ -62,14 +62,14 @@ def process_raw(self, df, page, **partition): return df @staticmethod - def process_renamed(df, page, **partition): + def process_renamed(df: pd.DataFrame, page: str, **partition): """Adds report_year column if missing.""" if "report_year" not in df.columns: df["report_year"] = list(partition.values())[0] return df @staticmethod - def get_dtypes(page, **partition): + def get_dtypes(page: str, **partition): """Returns dtypes for plant id columns.""" return { "Plant ID": pd.Int64Dtype(), diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index 807084e016..4b9e32f390 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -8,6 +8,7 @@ import pudl from pudl.extract.extractor import GenericExtractor, GenericMetadata, PartitionSelection +from pudl.workspace.datastore import Datastore logger = pudl.logging_helpers.get_logger(__name__) @@ -126,11 +127,11 @@ class ExcelExtractor(GenericExtractor): METADATA: ExcelMetadata = None - def __init__(self, ds): + def __init__(self, ds: Datastore): """Create new extractor object and load metadata. Args: - ds (datastore.Datastore): An initialized datastore, or subclass + ds: An initialized PUDL datastore, or subclass thereof. """ super().__init__(ds) self._metadata = self.METADATA diff --git a/src/pudl/output/ferc714.py b/src/pudl/output/ferc714.py index 0857dcf0bb..8ebad9f21b 100644 --- a/src/pudl/output/ferc714.py +++ b/src/pudl/output/ferc714.py @@ -47,17 +47,17 @@ The changes are applied locally to EIA 861 tables. -* `id` (int): EIA balancing authority identifier (`balancing_authority_id_eia`). -* `from` (int): Reference year, to use as a template for target years. -* `to` (List[int]): Target years, in the closed interval format [minimum, maximum]. - Rows in `core_eia861__yearly_balancing_authority` are added (if missing) for every target year +* ``id`` (int): EIA balancing authority identifier (``balancing_authority_id_eia``). +* ``from`` (int): Reference year, to use as a template for target years. +* ``to`` (List[int]): Target years, in the closed interval format [minimum, maximum]. + Rows in ``core_eia861__yearly_balancing_authority`` are added (if missing) for every target year with the attributes from the reference year. - Rows in `core_eia861__assn_balancing_authority` are added (or replaced, if existing) + Rows in ``core_eia861__assn_balancing_authority`` are added (or replaced, if existing) for every target year with the utility associations from the reference year. - Rows in `core_eia861__yearly_service_territory` are added (if missing) for every target year + Rows in ``core_eia861__yearly_service_territory`` are added (if missing) for every target year with the nearest year's associated utilities' counties. -* `exclude` (Optional[List[str]]): Utilities to exclude, by state (two-letter code). - Rows are excluded from `core_eia861__assn_balancing_authority` with target year and state. +* ``exclude`` (Optional[List[str]]): Utilities to exclude, by state (two-letter code). + Rows are excluded from ``core_eia861__assn_balancing_authority`` with target year and state. """ UTILITIES: list[dict[str, Any]] = [ @@ -76,14 +76,14 @@ The changes are applied locally to EIA 861 tables. -* `id` (int): EIA balancing authority (BA) identifier (`balancing_authority_id_eia`). - Rows for `id` are removed from `core_eia861__yearly_balancing_authority`. -* `reassign` (Optional[bool]): Whether to reassign utilities to parent BAs. - Rows for `id` as BA in `core_eia861__assn_balancing_authority` are removed. - Utilities assigned to `id` for a given year are reassigned - to the BAs for which `id` is an associated utility. -* `replace` (Optional[bool]): Whether to remove rows where `id` is a utility in - `core_eia861__assn_balancing_authority`. Applies only if `reassign=True`. +* ``id`` (int): EIA balancing authority (BA) identifier (``balancing_authority_id_eia``). + Rows for ``id`` are removed from ``core_eia861__yearly_balancing_authority``. +* ``reassign`` (Optional[bool]): Whether to reassign utilities to parent BAs. + Rows for ``id`` as BA in ``core_eia861__assn_balancing_authority`` are removed. + Utilities assigned to ``id`` for a given year are reassigned + to the BAs for which ``id`` is an associated utility. +* ``replace`` (Optional[bool]): Whether to remove rows where ``id`` is a utility in + ``core_eia861__assn_balancing_authority``. Applies only if ``reassign=True``. """ ################################################################################ @@ -186,28 +186,35 @@ def filled_core_eia861__yearly_balancing_authority( """Modified core_eia861__yearly_balancing_authority table. This function adds rows for each balancing authority-year pair missing from the - cleaned core_eia861__yearly_balancing_authority table, using a dictionary of manual fixes. It - uses the reference year as a template. The function also removes balancing - authorities that are manually categorized as utilities. + cleaned :ref:`core_eia861__yearly_balancing_authority` table, using a dictionary + of manual fixes. It uses the reference year as a template. The function also removes + balancing authorities that are manually categorized as utilities. """ df = core_eia861__yearly_balancing_authority index = ["balancing_authority_id_eia", "report_date"] dfi = df.set_index(index) # Prepare reference rows - keys = [(fix["id"], pd.Timestamp(fix["from"], 1, 1)) for fix in ASSOCIATIONS] + eia861_years = df["report_date"].dt.year.unique() + keys = [ + (fix["id"], pd.Timestamp(fix["from"], 1, 1)) + for fix in ASSOCIATIONS + if fix["from"] in eia861_years + ] refs = dfi.loc[keys].reset_index().to_dict("records") # Build table of new rows # Insert row for each target balancing authority-year pair # missing from the original table, using the reference year as a template. rows: list[dict[str, Any]] = [] - for ref, fix in zip(refs, ASSOCIATIONS, strict=True): + for ref, fix in zip( + refs, [fx for fx in ASSOCIATIONS if fx["from"] in eia861_years], strict=True + ): for year in range(fix["to"][0], fix["to"][1] + 1): key = (fix["id"], pd.Timestamp(year, 1, 1)) if key not in dfi.index: rows.append({**ref, "report_date": key[1]}) - df = pd.concat( - [df, apply_pudl_dtypes(pd.DataFrame(rows), group="eia")], axis="index" - ) + new_rows = apply_pudl_dtypes(pd.DataFrame(rows), group="eia") + new_rows = new_rows[new_rows["report_date"].dt.year.isin(eia861_years)] + df = pd.concat([df, new_rows], axis="index") # Remove balancing authorities treated as utilities mask = df["balancing_authority_id_eia"].isin([util["id"] for util in UTILITIES]) return apply_pudl_dtypes(df[~mask], group="eia") @@ -219,10 +226,10 @@ def filled_core_eia861__assn_balancing_authority( """Modified core_eia861__assn_balancing_authority table. This function adds rows for each balancing authority-year pair missing from the - cleaned core_eia861__assn_balancing_authority table, using a dictionary of manual fixes. - It uses the reference year as a template. The function also reassigns balancing - authorities that are manually categorized as utilities to their parent balancing - authorities. + cleaned :ref:`core_eia861__assn_balancing_authority` table, using a dictionary of + manual fixes. It uses the reference year as a template. The function also reassigns + balancing authorities that are manually categorized as utilities to their parent + balancing authorities. """ df = core_eia861__assn_balancing_authority # Prepare reference rows @@ -249,7 +256,10 @@ def filled_core_eia861__assn_balancing_authority( tables.append(ref.assign(report_date=key[1])) replaced |= mask # Append to original table with matching rows removed - df = pd.concat([df[~replaced], apply_pudl_dtypes(pd.concat(tables), group="eia")]) + new_rows = apply_pudl_dtypes(pd.concat(tables), group="eia") + eia861_years = df["report_date"].dt.year.unique() + new_rows = new_rows[new_rows["report_date"].dt.year.isin(eia861_years)] + df = pd.concat([df[~replaced], new_rows], axis="index") # Remove balancing authorities treated as utilities mask = np.zeros(df.shape[0], dtype=bool) tables = [] @@ -300,20 +310,22 @@ def filled_service_territory_eia861( """Modified core_eia861__yearly_service_territory table. This function adds rows for each balancing authority-year pair missing from the - cleaned core_eia861__yearly_service_territory table, using a dictionary of manual fixes. It also - drops utility-state combinations which are missing counties across all years of - data, fills records missing counties with the nearest year of county data for the - same utility and state. + cleaned :ref:`core_eia861__yearly_service_territory` table, using a dictionary of + manual fixes. It also drops utility-state combinations which are missing counties + across all years of data, fills records missing counties with the nearest year of + county data for the same utility and state. + """ index = ["utility_id_eia", "state", "report_date"] # Select relevant balancing authority-utility associations assn = filled_core_eia861__assn_balancing_authority( core_eia861__assn_balancing_authority ) + eia861_years = core_eia861__yearly_service_territory["report_date"].dt.year.unique() selected = np.zeros(assn.shape[0], dtype=bool) for fix in ASSOCIATIONS: years = [fix["from"], *range(fix["to"][0], fix["to"][1] + 1)] - dates = [pd.Timestamp(year, 1, 1) for year in years] + dates = [pd.Timestamp(year, 1, 1) for year in years if year in eia861_years] mask = assn["balancing_authority_id_eia"].eq(fix["id"]).to_numpy(bool) mask[mask] = assn["report_date"][mask].isin(dates) selected |= mask diff --git a/src/pudl/package_data/settings/etl_fast.yml b/src/pudl/package_data/settings/etl_fast.yml index 46ef7bf7c4..ac7a3f00aa 100644 --- a/src/pudl/package_data/settings/etl_fast.yml +++ b/src/pudl/package_data/settings/etl_fast.yml @@ -53,35 +53,9 @@ datasets: eia860m: year_months: ["2025-10", "2025-11"] eia861: - # eia861 runs fast. Discontinued tables break single-year ETL. - # This is a temporary hack to make the tests pass! - years: - [ - 2001, - 2002, - 2003, - 2004, - 2005, - 2006, - 2007, - 2008, - 2009, - 2010, - 2011, - 2012, - 2013, - 2014, - 2015, - 2016, - 2017, - 2018, - 2019, - 2020, - 2021, - 2022, - 2023, - 2024, - ] + # These years must overlap with FERC-714 to allow assets that depend on both to + # be materialized. + years: [2020, 2024] eia930: half_years: ["2025half2"] eiaaeo: diff --git a/src/pudl/transform/eia861.py b/src/pudl/transform/eia861.py index 5162dc9afe..39b06636e9 100644 --- a/src/pudl/transform/eia861.py +++ b/src/pudl/transform/eia861.py @@ -543,16 +543,18 @@ def _pre_process(df: pd.DataFrame, idx_cols: list[str]) -> pd.DataFrame: data is an early release, and we extract this information from the filenames, as it's uniform across the whole dataset. * Convert report_year column to report_date. - * Aggregate values for rows with utility id 88888 (anonymized) - see _combine_88888_values - for details. + * If we've gotten an empty dataframe, make sure it has a data_maturity column. """ - prep_df = ( + # If we're only processing some years of data, we may have entirely empty dataframes + # in the extraction phase, in which case the data_maturity field doesn't get added. + if df.empty: + df["data_maturity"] = pd.NA + return ( standardize_na_values(df) .drop(columns=["early_release"], errors="ignore") .pipe(convert_to_date) .pipe(_combine_88888_values, idx_cols) ) - return prep_df def _post_process(df: pd.DataFrame) -> pd.DataFrame: @@ -736,10 +738,10 @@ def _tidy_class_dfs( # of tables. data_dupe_mask = data_cols.duplicated(subset=idx_cols + [class_type], keep=False) data_dupes = data_cols[data_dupe_mask] - fraction_data_dupes = len(data_dupes) / len(data_cols) + fraction_data_dupes = len(data_dupes) / len(data_cols) if len(data_cols) else 0 denorm_dupe_mask = denorm_cols.duplicated(subset=idx_cols, keep=False) denorm_dupes = denorm_cols[denorm_dupe_mask] - fraction_denorm_dupes = len(denorm_dupes) / len(data_cols) + fraction_denorm_dupes = len(denorm_dupes) / len(data_cols) if len(data_cols) else 0 err_msg = ( f"{df_name} table: Found {len(data_dupes)}/{len(data_cols)} " f"({fraction_data_dupes:0.2%}) records with duplicated PKs. " @@ -778,7 +780,7 @@ def _drop_dupes(df, df_name, subset): logger.info( f"Dropped {tidy_nrows - deduped_nrows} duplicate records from EIA 861 " f"{df_name} table, out of a total of {tidy_nrows} records " - f"({(tidy_nrows - deduped_nrows) / tidy_nrows:.4%} of all records). " + f"({(tidy_nrows - deduped_nrows) / tidy_nrows if tidy_nrows else 0:.4%} of all records). " ) return deduped_df @@ -1009,7 +1011,11 @@ def _harvest_associations(dfs: list[pd.DataFrame], cols: list[str]) -> pd.DataFr if set(df.columns).issuperset(set(cols)): assn = pd.concat([assn, df[cols]]) assn = assn.dropna().drop_duplicates() - if assn.empty: + # If we found no associations AND any of our dfs were non-empty, we raise an error. + # We need to check for non-empty dataframes because in some cases we separately + # harvest associations for early vs. late reporting periods, and in the fast ETL + # we don't have any of the early years. + if assn.empty and any(not df.empty for df in dfs): raise ValueError( f"These dataframes contain no associations for the columns: {cols}" )