diff --git a/changelog.d/702.fixed b/changelog.d/702.fixed new file mode 100644 index 000000000..6e2bafc59 --- /dev/null +++ b/changelog.d/702.fixed @@ -0,0 +1 @@ +Aligned clone-half enhanced CPS generation with the modeled SPM pipeline by rebuilding cloned SPM thresholds deterministically and keeping zero-weight synthetic households near zero in sparse reweighting priors. diff --git a/changelog.d/703.fixed b/changelog.d/703.fixed new file mode 100644 index 000000000..b3dd0c7d0 --- /dev/null +++ b/changelog.d/703.fixed @@ -0,0 +1 @@ +Added explicit clone-origin flags to extended/enhanced CPS datasets and saved ECPS clone diagnostics for clone weight share, modeled-only-poor share, and extreme childcare/tax checks. diff --git a/changelog.d/705.fixed b/changelog.d/705.fixed new file mode 100644 index 000000000..4c60c9bff --- /dev/null +++ b/changelog.d/705.fixed @@ -0,0 +1 @@ +Use Census work-and-childcare capping inputs for clone-half SPM childcare expenses instead of donor capping shares. diff --git a/changelog.d/713.fixed b/changelog.d/713.fixed new file mode 100644 index 000000000..5fea41f40 --- /dev/null +++ b/changelog.d/713.fixed @@ -0,0 +1 @@ +Modeled Medicare Part B premiums from enrollment and premium schedules, netted a cycle-free MSP standard-premium offset, and documented the national Part B calibration target as an approximate beneficiary-paid out-of-pocket benchmark rather than gross CMS premium income. diff --git a/policyengine_us_data/calibration/calibration_utils.py b/policyengine_us_data/calibration/calibration_utils.py index 8af1bab7a..a4c489bab 100644 --- a/policyengine_us_data/calibration/calibration_utils.py +++ b/policyengine_us_data/calibration/calibration_utils.py @@ -7,10 +7,12 @@ import numpy as np import pandas as pd -from spm_calculator import SPMCalculator, spm_equivalence_scale -from spm_calculator.geoadj import calculate_geoadj_from_rent - -from policyengine_us_data.utils.spm import TENURE_CODE_MAP +from policyengine_us_data.utils.spm import ( + TENURE_CODE_MAP, + calculate_geoadj_from_rent, + get_spm_reference_thresholds, + spm_equivalence_scale, +) from policyengine_us.variables.household.demographic.geographic.state_name import ( StateName, ) @@ -491,6 +493,7 @@ def get_cd_index_mapping(db_uri: str = None): tuple: (cd_to_index dict, index_to_cd dict, cds_ordered list) """ from sqlalchemy import create_engine, text + from pathlib import Path from policyengine_us_data.storage import STORAGE_FOLDER if db_uri is None: @@ -531,7 +534,7 @@ def load_geo_labels(path) -> List[str]: def load_cd_geoadj_values( cds_to_calibrate: List[str], -) -> Dict[str, float]: +) -> Dict[str, Dict[str, float]]: """ Load geographic adjustment factors from rent data CSV. Uses median 2BR rent by CD vs national median to compute geoadj. @@ -550,11 +553,18 @@ def load_cd_geoadj_values( # Build lookup from rent data rent_lookup = {} for _, row in rent_df.iterrows(): - geoadj = calculate_geoadj_from_rent( - local_rent=row["median_2br_rent"], - national_rent=row["national_median_2br_rent"], - ) - rent_lookup[row["cd_geoid"]] = geoadj + rent_lookup[row["cd_geoid"]] = { + tenure: calculate_geoadj_from_rent( + local_rent=row["median_2br_rent"], + national_rent=row["national_median_2br_rent"], + tenure=tenure, + ) + for tenure in ( + "owner_with_mortgage", + "owner_without_mortgage", + "renter", + ) + } geoadj_dict = {} for cd in cds_to_calibrate: @@ -562,7 +572,11 @@ def load_cd_geoadj_values( geoadj_dict[cd] = rent_lookup[cd] else: print(f"Warning: No rent data for CD {cd}, using geoadj=1.0") - geoadj_dict[cd] = 1.0 + geoadj_dict[cd] = { + "owner_with_mortgage": 1.0, + "owner_without_mortgage": 1.0, + "renter": 1.0, + } return geoadj_dict @@ -593,6 +607,11 @@ def calculate_spm_thresholds_vectorized( Returns: Float32 array of SPM thresholds, one per SPM unit. """ + person_ages = np.asarray(person_ages) + person_spm_unit_ids = np.asarray(person_spm_unit_ids) + spm_unit_tenure_types = np.asarray(spm_unit_tenure_types) + spm_unit_geoadj = np.asarray(spm_unit_geoadj, dtype=np.float64) + n_units = len(spm_unit_tenure_types) # Count adults and children per SPM unit @@ -614,8 +633,7 @@ def calculate_spm_thresholds_vectorized( tenure_codes[mask] = code # Look up base thresholds - calc = SPMCalculator(year=year) - base_thresholds = calc.get_base_thresholds() + base_thresholds = get_spm_reference_thresholds(year) thresholds = np.zeros(n_units, dtype=np.float32) for i in range(n_units): diff --git a/policyengine_us_data/calibration/publish_local_area.py b/policyengine_us_data/calibration/publish_local_area.py index 68ce32cee..0429c933f 100644 --- a/policyengine_us_data/calibration/publish_local_area.py +++ b/policyengine_us_data/calibration/publish_local_area.py @@ -8,11 +8,6 @@ python publish_local_area.py [--skip-download] [--states-only] [--upload] """ -import hashlib -import json -import shutil - - import numpy as np from pathlib import Path from typing import List @@ -31,6 +26,9 @@ from policyengine_us_data.calibration.block_assignment import ( derive_geography_from_blocks, ) +from policyengine_us_data.calibration.county_assignment import ( + get_county_filter_probability, +) from policyengine_us_data.calibration.clone_and_assign import ( GeographyAssignment, assign_random_geography, @@ -45,76 +43,28 @@ CHECKPOINT_FILE_CITIES = Path("completed_cities.txt") WORK_DIR = Path("local_area_build") -NYC_COUNTY_FIPS = {"36005", "36047", "36061", "36081", "36085"} - - -META_FILE = WORK_DIR / "checkpoint_meta.json" - - -def compute_input_fingerprint( - weights_path: Path, dataset_path: Path, n_clones: int, seed: int -) -> str: - h = hashlib.sha256() - for p in [weights_path, dataset_path]: - with open(p, "rb") as f: - while chunk := f.read(8192): - h.update(chunk) - h.update(f"{n_clones}:{seed}".encode()) - return h.hexdigest()[:16] - - -def validate_or_clear_checkpoints(fingerprint: str): - if META_FILE.exists(): - stored = json.loads(META_FILE.read_text()) - if stored.get("fingerprint") == fingerprint: - print(f"Inputs unchanged ({fingerprint}), resuming...") - return - print( - f"Inputs changed " - f"({stored.get('fingerprint')} -> {fingerprint}), " - f"clearing..." - ) - else: - print(f"No checkpoint metadata, starting fresh ({fingerprint})") - h5_count = sum( - 1 - for subdir in ["states", "districts", "cities"] - if (WORK_DIR / subdir).exists() - for _ in (WORK_DIR / subdir).rglob("*.h5") - ) - if h5_count > 0: - print( - f"WARNING: {h5_count} H5 files exist. " - f"Clearing only checkpoint files, preserving H5s." - ) - for cp in [ - CHECKPOINT_FILE, - CHECKPOINT_FILE_DISTRICTS, - CHECKPOINT_FILE_CITIES, - ]: - if cp.exists(): - cp.unlink() - else: - for cp in [ - CHECKPOINT_FILE, - CHECKPOINT_FILE_DISTRICTS, - CHECKPOINT_FILE_CITIES, - ]: - if cp.exists(): - cp.unlink() - for subdir in ["states", "districts", "cities"]: - d = WORK_DIR / subdir - if d.exists(): - shutil.rmtree(d) - META_FILE.parent.mkdir(parents=True, exist_ok=True) - META_FILE.write_text(json.dumps({"fingerprint": fingerprint})) - - -SUB_ENTITIES = [ - "tax_unit", - "spm_unit", - "family", - "marital_unit", +NYC_COUNTIES = { + "QUEENS_COUNTY_NY", + "BRONX_COUNTY_NY", + "RICHMOND_COUNTY_NY", + "NEW_YORK_COUNTY_NY", + "KINGS_COUNTY_NY", +} + +NYC_CDS = [ + "3603", + "3605", + "3606", + "3607", + "3608", + "3609", + "3610", + "3611", + "3612", + "3613", + "3614", + "3615", + "3616", ] @@ -163,7 +113,7 @@ def build_h5( dataset_path: Path, output_path: Path, cd_subset: List[str] = None, - county_fips_filter: set = None, + county_filter: set = None, takeup_filter: List[str] = None, ) -> Path: """Build an H5 file by cloning records for each nonzero weight. @@ -174,8 +124,8 @@ def build_h5( dataset_path: Path to base dataset H5 file. output_path: Where to write the output H5 file. cd_subset: If provided, only include clones for these CDs. - county_fips_filter: If provided, zero out weights for clones - whose county FIPS is not in this set. + county_filter: If provided, scale weights by P(target|CD) + for city datasets. takeup_filter: List of takeup vars to apply. Returns: @@ -216,11 +166,17 @@ def build_h5( cd_mask = np.vectorize(lambda cd: cd in cd_subset_set)(clone_cds_matrix) W[~cd_mask] = 0 - # County FIPS filtering: zero out clones not in target counties - if county_fips_filter is not None: - fips_array = np.asarray(geography.county_fips).reshape(n_clones_total, n_hh) - fips_mask = np.isin(fips_array, list(county_fips_filter)) - W[~fips_mask] = 0 + # County filtering: scale weights by P(target_counties | CD) + if county_filter is not None: + unique_cds = np.unique(clone_cds_matrix) + cd_prob = { + cd: get_county_filter_probability(cd, county_filter) for cd in unique_cds + } + p_matrix = np.vectorize( + cd_prob.__getitem__, + otypes=[float], + )(clone_cds_matrix) + W *= p_matrix label = ( f"CD subset {cd_subset}" @@ -237,7 +193,7 @@ def build_h5( if n_clones == 0: raise ValueError( f"No active clones after filtering. " - f"cd_subset={cd_subset}, county_fips_filter={county_fips_filter}" + f"cd_subset={cd_subset}, county_filter={county_filter}" ) clone_weights = W[active_geo, active_hh] active_blocks = blocks.reshape(n_clones_total, n_hh)[active_geo, active_hh] @@ -258,6 +214,12 @@ def build_h5( for p_idx, p_hh_id in enumerate(person_hh_ids): hh_to_persons[hh_id_to_idx[int(p_hh_id)]].append(p_idx) + SUB_ENTITIES = [ + "tax_unit", + "spm_unit", + "family", + "marital_unit", + ] hh_to_entity = {} entity_id_arrays = {} person_entity_id_arrays = {} @@ -351,6 +313,22 @@ def build_h5( unique_geo = derive_geography_from_blocks(unique_blocks) clone_geo = {k: v[block_inv] for k, v in unique_geo.items()} + # === Calculate weights for all entity levels === + person_weights = np.repeat(clone_weights, persons_per_clone) + per_person_wt = clone_weights / np.maximum(persons_per_clone, 1) + + entity_weights = {} + for ek in SUB_ENTITIES: + n_ents = len(entity_clone_idx[ek]) + ent_person_counts = np.zeros(n_ents, dtype=np.int32) + np.add.at( + ent_person_counts, + new_person_entity_ids[ek], + 1, + ) + clone_ids_e = np.repeat(np.arange(n_clones), entities_per_clone[ek]) + entity_weights[ek] = per_person_wt[clone_ids_e] * ent_person_counts + # === Determine variables to save === vars_to_save = set(sim.input_variables) vars_to_save.add("county") @@ -400,6 +378,7 @@ def build_h5( for period in periods: values = holder.get_array(period) + # Convert Arrow-backed arrays to numpy before indexing if hasattr(values, "_pa_array") or hasattr(values, "_ndarray"): values = np.asarray(values) @@ -436,12 +415,16 @@ def build_h5( } # === Override weights === - # Only write household_weight; sub-entity weights (tax_unit_weight, - # spm_unit_weight, person_weight, etc.) are formula variables in - # policyengine-us that derive from household_weight at runtime. data["household_weight"] = { time_period: clone_weights.astype(np.float32), } + data["person_weight"] = { + time_period: person_weights.astype(np.float32), + } + for ek in SUB_ENTITIES: + data[f"{ek}_weight"] = { + time_period: entity_weights[ek].astype(np.float32), + } # === Override geography === data["state_fips"] = { @@ -487,19 +470,11 @@ def build_h5( print("Recalculating SPM thresholds...") unique_cds_list = sorted(set(active_clone_cds)) cd_geoadj_values = load_cd_geoadj_values(unique_cds_list) - # Build per-SPM-unit geoadj from clone's CD - spm_clone_ids = np.repeat( - np.arange(n_clones, dtype=np.int64), - entities_per_clone["spm_unit"], - ) - spm_unit_geoadj = np.array( - [cd_geoadj_values[active_clone_cds[c]] for c in spm_clone_ids], - dtype=np.float64, - ) - # Get cloned person ages and SPM tenure types + # Get cloned person ages and SPM unit IDs person_ages = sim.calculate("age", map_to="person").values[person_clone_idx] + # Get cloned tenure types spm_tenure_holder = sim.get_holder("spm_unit_tenure_type") spm_tenure_periods = spm_tenure_holder.get_known_periods() if spm_tenure_periods: @@ -516,6 +491,21 @@ def build_h5( dtype="S30", ) + # Build per-SPM-unit geoadj from the clone's CD and tenure. + spm_clone_ids = np.repeat( + np.arange(n_clones, dtype=np.int64), + entities_per_clone["spm_unit"], + ) + spm_unit_geoadj = np.array( + [ + cd_geoadj_values[active_clone_cds[clone_id]][ + (spm_tenure_cloned[spm_unit_index].decode().lower()) + ] + for spm_unit_index, clone_id in enumerate(spm_clone_ids) + ], + dtype=np.float64, + ) + new_spm_thresholds = calculate_spm_thresholds_vectorized( person_ages=person_ages, person_spm_unit_ids=new_person_entity_ids["spm_unit"], @@ -754,6 +744,8 @@ def build_cities( """Build city H5 files with checkpointing, optionally uploading.""" w = np.load(weights_path) + all_cds = sorted(set(geography.cd_geoid.astype(str))) + cities_dir = output_dir / "cities" cities_dir.mkdir(parents=True, exist_ok=True) @@ -763,29 +755,34 @@ def build_cities( if "NYC" in completed_cities: print("Skipping NYC (already completed)") else: - output_path = cities_dir / "NYC.h5" - - try: - build_h5( - weights=w, - geography=geography, - dataset_path=dataset_path, - output_path=output_path, - county_fips_filter=NYC_COUNTY_FIPS, - takeup_filter=takeup_filter, - ) - - if upload: - print("Uploading NYC.h5 to GCP...") - upload_local_area_file(str(output_path), "cities", skip_hf=True) - hf_queue.append((str(output_path), "cities")) - - record_completed_city("NYC") - print("Completed NYC") - - except Exception as e: - print(f"ERROR building NYC: {e}") - raise + cd_subset = [cd for cd in all_cds if cd in NYC_CDS] + if not cd_subset: + print("No NYC-related CDs found, skipping") + else: + output_path = cities_dir / "NYC.h5" + + try: + build_h5( + weights=w, + geography=geography, + dataset_path=dataset_path, + output_path=output_path, + cd_subset=cd_subset, + county_filter=NYC_COUNTIES, + takeup_filter=takeup_filter, + ) + + if upload: + print("Uploading NYC.h5 to GCP...") + upload_local_area_file(str(output_path), "cities", skip_hf=True) + hf_queue.append((str(output_path), "cities")) + + record_completed_city("NYC") + print("Completed NYC") + + except Exception as e: + print(f"ERROR building NYC: {e}") + raise if upload and hf_queue: print(f"\nUploading batch of {len(hf_queue)} city files to HuggingFace...") @@ -880,19 +877,9 @@ def main(): print(f"Using dataset: {inputs['dataset']}") - print("Computing input fingerprint...") - fingerprint = compute_input_fingerprint( - inputs["weights"], - inputs["dataset"], - args.n_clones, - args.seed, - ) - validate_or_clear_checkpoints(fingerprint) - - print("Loading base simulation to get household count...") - _sim = Microsimulation(dataset=str(inputs["dataset"])) - n_hh = len(_sim.calculate("household_id", map_to="household").values) - del _sim + sim = Microsimulation(dataset=str(inputs["dataset"])) + n_hh = sim.calculate("household_id", map_to="household").shape[0] + del sim print(f"\nBase dataset has {n_hh:,} households") geo_cache = WORK_DIR / f"geography_{n_hh}x{args.n_clones}_s{args.seed}.npz" diff --git a/policyengine_us_data/calibration/puf_impute.py b/policyengine_us_data/calibration/puf_impute.py index b87f846f8..a2159fa6e 100644 --- a/policyengine_us_data/calibration/puf_impute.py +++ b/policyengine_us_data/calibration/puf_impute.py @@ -25,6 +25,14 @@ logger = logging.getLogger(__name__) +CLONE_ORIGIN_FLAGS = { + "person": "person_is_puf_clone", + "tax_unit": "tax_unit_is_puf_clone", + "spm_unit": "spm_unit_is_puf_clone", + "family": "family_is_puf_clone", + "household": "household_is_puf_clone", +} + PUF_SUBSAMPLE_TARGET = 20_000 PUF_TOP_PERCENTILE = 99.5 @@ -531,6 +539,20 @@ def _map_to_entity(pred_values, variable_name): time_period: np.concatenate([state_fips, state_fips]).astype(np.int32) } + for entity_key, flag_name in CLONE_ORIGIN_FLAGS.items(): + id_variable = f"{entity_key}_id" + if id_variable not in data: + continue + n_entities = len(data[id_variable][time_period]) + new_data[flag_name] = { + time_period: np.concatenate( + [ + np.zeros(n_entities, dtype=np.int8), + np.ones(n_entities, dtype=np.int8), + ] + ) + } + if y_full: for var in IMPUTED_VARIABLES: if var not in data: diff --git a/policyengine_us_data/datasets/cps/census_cps.py b/policyengine_us_data/datasets/cps/census_cps.py index 042fefe56..6faed88fe 100644 --- a/policyengine_us_data/datasets/cps/census_cps.py +++ b/policyengine_us_data/datasets/cps/census_cps.py @@ -233,6 +233,7 @@ class CensusCPS_2018(CensusCPS): "A_FNLWGT", "A_LINENO", "A_SPOUSE", + "PERRP", "A_AGE", "A_SEX", "PEDISEYE", diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index bdb6aa4bd..2ffe05c6f 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -466,6 +466,7 @@ def children_per_parent(col: str) -> pd.DataFrame: cps["is_surviving_spouse"] = person.A_MARITL == 4 cps["is_separated"] = person.A_MARITL == 6 + cps["is_unmarried_partner_of_household_head"] = person.PERRP.isin([43, 44, 46, 47]) # High school or college/university enrollment status. cps["is_full_time_college_student"] = person.A_HSCOL == 2 @@ -501,6 +502,7 @@ def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int): cps["weekly_hours_worked"] = person.HRSWK cps["hours_worked_last_week"] = person.A_HRS1 + cps["weeks_worked"] = np.clip(person.WKSWORK, 0, 52) cps["taxable_interest_income"] = person.INT_VAL * (p["taxable_interest_fraction"]) cps["tax_exempt_interest_income"] = person.INT_VAL * ( @@ -718,7 +720,8 @@ def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int): cps["health_insurance_premiums_without_medicare_part_b"] = person.PHIP_VAL cps["over_the_counter_health_expenses"] = person.POTC_VAL cps["other_medical_expenses"] = person.PMED_VAL - cps["medicare_part_b_premiums"] = person.PEMCPREM + cps["medicare_part_b_premiums_reported"] = person.PEMCPREM + cps["medicare_enrolled"] = person.MCARE == 1 # Get QBI simulation parameters --- yamlfilename = ( diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index 3d691ff4f..dd8185bd8 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -1,3 +1,7 @@ +import json +from pathlib import Path + +import h5py from policyengine_core.data import Dataset import pandas as pd from policyengine_us_data.utils import ( @@ -28,6 +32,256 @@ torch = None +def initialize_weight_priors( + original_weights: np.ndarray, + seed: int = 1456, + epsilon: float = 1e-6, +) -> np.ndarray: + """Build deterministic positive priors for sparse reweighting. + + Original CPS households should keep their survey weights unchanged. + Clone-half households start with zero weight on purpose, so they + should receive only a tiny positive epsilon to keep the log + optimization well-defined without giving them a meaningful head start. + """ + + weights = np.asarray(original_weights, dtype=np.float64) + if np.any(weights < 0): + raise ValueError("original_weights must be non-negative") + + rng = np.random.default_rng(seed) + priors = np.empty_like(weights, dtype=np.float64) + + positive_mask = weights > 0 + if positive_mask.any(): + priors[positive_mask] = weights[positive_mask] + + zero_mask = ~positive_mask + if zero_mask.any(): + priors[zero_mask] = epsilon * rng.uniform(1.0, 2.0, size=zero_mask.sum()) + + return priors + + +def _to_numpy(value) -> np.ndarray: + return np.asarray(getattr(value, "values", value)) + + +def _weighted_share(mask, weights) -> float: + weights = np.asarray(weights, dtype=np.float64) + total_weight = float(weights.sum()) + if total_weight <= 0: + return 0.0 + mask = np.asarray(mask, dtype=bool) + return 100 * float(weights[mask].sum()) / total_weight + + +def compute_clone_diagnostics_summary( + *, + household_is_puf_clone, + household_weight, + person_is_puf_clone, + person_weight, + person_in_poverty, + person_reported_in_poverty, + spm_unit_is_puf_clone, + spm_unit_weight, + spm_unit_capped_work_childcare_expenses, + spm_unit_pre_subsidy_childcare_expenses, + spm_unit_taxes, + spm_unit_market_income, +) -> dict[str, float]: + household_is_puf_clone = np.asarray(household_is_puf_clone, dtype=bool) + household_weight = np.asarray(household_weight, dtype=np.float64) + person_is_puf_clone = np.asarray(person_is_puf_clone, dtype=bool) + person_weight = np.asarray(person_weight, dtype=np.float64) + person_in_poverty = np.asarray(person_in_poverty, dtype=bool) + person_reported_in_poverty = np.asarray(person_reported_in_poverty, dtype=bool) + spm_unit_is_puf_clone = np.asarray(spm_unit_is_puf_clone, dtype=bool) + spm_unit_weight = np.asarray(spm_unit_weight, dtype=np.float64) + capped_childcare = np.asarray( + spm_unit_capped_work_childcare_expenses, dtype=np.float64 + ) + pre_subsidy_childcare = np.asarray( + spm_unit_pre_subsidy_childcare_expenses, dtype=np.float64 + ) + spm_unit_taxes = np.asarray(spm_unit_taxes, dtype=np.float64) + spm_unit_market_income = np.asarray(spm_unit_market_income, dtype=np.float64) + + poor_modeled_only = person_in_poverty & ~person_reported_in_poverty + clone_spm_weight = spm_unit_weight[spm_unit_is_puf_clone].sum() + + return { + "clone_household_weight_share_pct": _weighted_share( + household_is_puf_clone, household_weight + ), + "clone_person_weight_share_pct": _weighted_share( + person_is_puf_clone, person_weight + ), + "clone_poor_modeled_only_person_weight_share_pct": _weighted_share( + person_is_puf_clone & poor_modeled_only, + person_weight, + ), + "poor_modeled_only_within_clone_person_weight_share_pct": ( + 0.0 + if person_weight[person_is_puf_clone].sum() <= 0 + else _weighted_share( + poor_modeled_only[person_is_puf_clone], + person_weight[person_is_puf_clone], + ) + ), + "clone_childcare_exceeds_pre_subsidy_share_pct": ( + 0.0 + if clone_spm_weight <= 0 + else _weighted_share( + capped_childcare[spm_unit_is_puf_clone] + > pre_subsidy_childcare[spm_unit_is_puf_clone] + 1, + spm_unit_weight[spm_unit_is_puf_clone], + ) + ), + "clone_childcare_above_5000_share_pct": ( + 0.0 + if clone_spm_weight <= 0 + else _weighted_share( + capped_childcare[spm_unit_is_puf_clone] > 5_000, + spm_unit_weight[spm_unit_is_puf_clone], + ) + ), + "clone_taxes_exceed_market_income_share_pct": ( + 0.0 + if clone_spm_weight <= 0 + else _weighted_share( + spm_unit_taxes[spm_unit_is_puf_clone] + > spm_unit_market_income[spm_unit_is_puf_clone] + 1, + spm_unit_weight[spm_unit_is_puf_clone], + ) + ), + } + + +def _load_saved_period_array( + file_path: str | Path, + variable_name: str, + period: int, +) -> np.ndarray: + with h5py.File(file_path, "r") as h5_file: + obj = h5_file[variable_name] + if isinstance(obj, h5py.Dataset): + return np.asarray(obj[...]) + period_key = str(period) + if period_key in obj: + return np.asarray(obj[period_key][...]) + if period in obj: + return np.asarray(obj[period][...]) + raise KeyError(f"{variable_name} missing period {period}") + + +def clone_diagnostics_path(file_path: str | Path) -> Path: + return Path(file_path).with_suffix(".clone_diagnostics.json") + + +def build_clone_diagnostics_payload( + period_to_diagnostics: dict[int, dict[str, float]], +) -> dict: + if not period_to_diagnostics: + raise ValueError("Expected at least one period of clone diagnostics") + + ordered_periods = sorted(period_to_diagnostics) + if len(ordered_periods) == 1: + period = ordered_periods[0] + diagnostics = dict(period_to_diagnostics[period]) + diagnostics["period"] = int(period) + return diagnostics + + return { + "periods": { + str(period): period_to_diagnostics[period] for period in ordered_periods + } + } + + +def write_clone_diagnostics_report(file_path: str | Path, diagnostics: dict) -> Path: + output_path = clone_diagnostics_path(file_path) + output_path.write_text(json.dumps(diagnostics, indent=2, sort_keys=True) + "\n") + return output_path + + +def refresh_clone_diagnostics_report( + file_path: str | Path, + diagnostics_builder, +) -> Path: + output_path = clone_diagnostics_path(file_path) + if output_path.exists(): + output_path.unlink() + diagnostics = diagnostics_builder() + return write_clone_diagnostics_report(file_path, diagnostics) + + +def save_clone_diagnostics_report( + dataset_cls: Type[Dataset], + *, + start_year: int, + end_year: int, +) -> tuple[Path, dict]: + periods = list(range(start_year, end_year + 1)) + output_path = refresh_clone_diagnostics_report( + dataset_cls.file_path, + lambda: build_clone_diagnostics_payload( + { + period: build_clone_diagnostics_for_saved_dataset( + dataset_cls, + period, + ) + for period in periods + } + ), + ) + diagnostics_payload = json.loads(output_path.read_text()) + return output_path, diagnostics_payload + + +def build_clone_diagnostics_for_saved_dataset( + dataset_cls: Type[Dataset], period: int +) -> dict[str, float]: + from policyengine_us import Microsimulation + + sim = Microsimulation(dataset=dataset_cls) + dataset_path = Path(dataset_cls.file_path) + + person_reported_in_poverty = _to_numpy( + sim.calculate("spm_unit_net_income_reported", period=period, map_to="person") + ) < _to_numpy( + sim.calculate("spm_unit_spm_threshold", period=period, map_to="person") + ) + + return compute_clone_diagnostics_summary( + household_is_puf_clone=_load_saved_period_array( + dataset_path, "household_is_puf_clone", period + ), + household_weight=_to_numpy(sim.calculate("household_weight", period=period)), + person_is_puf_clone=_load_saved_period_array( + dataset_path, "person_is_puf_clone", period + ), + person_weight=_to_numpy(sim.calculate("person_weight", period=period)), + person_in_poverty=_to_numpy(sim.calculate("person_in_poverty", period=period)), + person_reported_in_poverty=person_reported_in_poverty, + spm_unit_is_puf_clone=_load_saved_period_array( + dataset_path, "spm_unit_is_puf_clone", period + ), + spm_unit_weight=_to_numpy(sim.calculate("spm_unit_weight", period=period)), + spm_unit_capped_work_childcare_expenses=_to_numpy( + sim.calculate("spm_unit_capped_work_childcare_expenses", period=period) + ), + spm_unit_pre_subsidy_childcare_expenses=_to_numpy( + sim.calculate("spm_unit_pre_subsidy_childcare_expenses", period=period) + ), + spm_unit_taxes=_to_numpy(sim.calculate("spm_unit_taxes", period=period)), + spm_unit_market_income=_to_numpy( + sim.calculate("spm_unit_market_income", period=period) + ), + ) + + def _get_period_array(period_values: dict, period: int) -> np.ndarray: """Get a period array from a TIME_PERIOD_ARRAYS variable dict.""" value = period_values.get(period) @@ -221,9 +475,9 @@ def generate(self): data = sim.dataset.load_dataset() base_year = int(sim.default_calculation_period) data["household_weight"] = {} - original_weights = sim.calculate("household_weight") - original_weights = original_weights.values + np.random.normal( - 1, 0.1, len(original_weights) + original_weights = initialize_weight_priors( + sim.calculate("household_weight").values, + seed=1456, ) bad_targets = [ @@ -342,6 +596,23 @@ def generate(self): logging.info("Post-generation weight validation passed") self.save_dataset(data) + try: + output_path, diagnostics_payload = save_clone_diagnostics_report( + type(self), + start_year=self.start_year, + end_year=self.end_year, + ) + logging.info("Saved clone diagnostics to %s", output_path) + logging.info( + "Clone diagnostics summary: %s", + diagnostics_payload, + ) + except Exception: + logging.warning( + "Unable to compute clone diagnostics for %s", + self.file_path, + exc_info=True, + ) class ReweightedCPS_2024(Dataset): @@ -357,9 +628,9 @@ def generate(self): sim = Microsimulation(dataset=self.input_dataset) data = sim.dataset.load_dataset() - original_weights = sim.calculate("household_weight") - original_weights = original_weights.values + np.random.normal( - 1, 0.1, len(original_weights) + original_weights = initialize_weight_priors( + sim.calculate("household_weight").values, + seed=1456, ) for year in [2024]: loss_matrix, targets_array = build_loss_matrix(self.input_dataset, year) diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 8e01f5054..de583e5d9 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -18,6 +18,10 @@ convert_mortgage_interest_to_structural_inputs, impute_tax_unit_mortgage_balance_hints, ) +from policyengine_us_data.utils.spm import ( + get_spm_reference_thresholds, + spm_equivalence_scale, +) from policyengine_us_data.utils.policyengine import has_policyengine_us_variables from policyengine_us_data.utils.retirement_limits import ( get_retirement_limits, @@ -143,14 +147,12 @@ def _supports_structural_mortgage_inputs() -> bool: "spm_unit_payroll_tax_reported", "spm_unit_federal_tax_reported", "spm_unit_state_tax_reported", - "spm_unit_spm_threshold", "spm_unit_net_income_reported", "spm_unit_pre_subsidy_childcare_expenses", # Medical expenses "health_insurance_premiums_without_medicare_part_b", "over_the_counter_health_expenses", "other_medical_expenses", - "medicare_part_b_premiums", "child_support_expense", # Hours/employment "weekly_hours_worked", @@ -578,124 +580,24 @@ def reconcile_ss_subcomponents(predictions, total_ss): "social_security_survivors", } +_SPM_TENURE_TO_REFERENCE_KEY = { + b"OWNER_WITH_MORTGAGE": "owner_with_mortgage", + b"OWNER_WITHOUT_MORTGAGE": "owner_without_mortgage", + b"RENTER": "renter", + "OWNER_WITH_MORTGAGE": "owner_with_mortgage", + "OWNER_WITHOUT_MORTGAGE": "owner_without_mortgage", + "RENTER": "renter", +} -def derive_clone_capped_childcare_expenses( - donor_pre_subsidy: np.ndarray, - donor_capped: np.ndarray, - clone_pre_subsidy: np.ndarray, - clone_person_data: pd.DataFrame, - clone_spm_unit_ids: np.ndarray, -) -> np.ndarray: - """Derive clone-half capped childcare from clone inputs. - - The CPS provides both pre-subsidy childcare and the SPM-specific - capped childcare deduction. For the clone half, we impute only the - pre-subsidy amount, then deterministically rebuild the capped amount - instead of letting a second QRF predict it independently. - - We preserve the donor's observed capping share while also respecting - the clone's own earnings cap. This keeps the clone-half value - consistent with pre-subsidy childcare and avoids impossible outputs - such as capped childcare exceeding pre-subsidy childcare. - """ - - donor_pre_subsidy = np.asarray(donor_pre_subsidy, dtype=float) - donor_capped = np.asarray(donor_capped, dtype=float) - clone_pre_subsidy = np.asarray(clone_pre_subsidy, dtype=float) - clone_spm_unit_ids = np.asarray(clone_spm_unit_ids) - - donor_cap_share = np.divide( - donor_capped, - donor_pre_subsidy, - out=np.zeros_like(donor_capped, dtype=float), - where=donor_pre_subsidy > 0, - ) - donor_cap_share = np.clip(donor_cap_share, 0.0, 1.0) - capped_from_share = np.maximum(clone_pre_subsidy, 0.0) * donor_cap_share - - if clone_person_data.empty: - earnings_cap = np.zeros(len(clone_spm_unit_ids), dtype=float) - else: - eligible = clone_person_data["is_parent_proxy"].astype(bool) - parent_rows = clone_person_data.loc[ - eligible, ["spm_unit_id", "age", "earnings"] - ].copy() - if parent_rows.empty: - earnings_cap = np.zeros(len(clone_spm_unit_ids), dtype=float) - else: - parent_rows["earnings"] = parent_rows["earnings"].clip(lower=0.0) - parent_rows["age_rank"] = parent_rows.groupby("spm_unit_id")["age"].rank( - method="first", ascending=False - ) - top_two = parent_rows[parent_rows["age_rank"] <= 2].sort_values( - ["spm_unit_id", "age_rank"] - ) - earnings_cap_by_unit = top_two.groupby("spm_unit_id")["earnings"].agg( - lambda values: ( - float(values.iloc[0]) - if len(values) == 1 - else float(np.minimum(values.iloc[0], values.iloc[1])) - ) - ) - earnings_cap = earnings_cap_by_unit.reindex( - clone_spm_unit_ids, fill_value=0.0 - ).to_numpy(dtype=float) - - return np.minimum(capped_from_share, earnings_cap) - - -def _rebuild_clone_capped_childcare_expenses( - data: dict, - time_period: int, - cps_sim, -) -> np.ndarray: - """Rebuild clone-half capped childcare expenses after stage-2 imputation.""" - - n_persons_half = len(data["person_id"][time_period]) // 2 - n_spm_units_half = len(data["spm_unit_id"][time_period]) // 2 - person_roles = cps_sim.calculate_dataframe( - ["age", "is_tax_unit_head", "is_tax_unit_spouse"] - ) - if len(person_roles) != n_persons_half: +def _reference_threshold_key_for_tenure(tenure_type) -> str: + reference_key = _SPM_TENURE_TO_REFERENCE_KEY.get(tenure_type) + if reference_key is None: raise ValueError( - "Unexpected person role frame length while rebuilding clone childcare " - f"expenses: got {len(person_roles)}, expected {n_persons_half}" + "Unsupported spm_unit_tenure_type for cloned threshold rebuild: " + f"{tenure_type!r}" ) - - clone_person_data = pd.DataFrame( - { - "spm_unit_id": data["person_spm_unit_id"][time_period][n_persons_half:], - "age": person_roles["age"].values, - "is_parent_proxy": ( - person_roles["is_tax_unit_head"].values - | person_roles["is_tax_unit_spouse"].values - ), - "earnings": ( - data["employment_income"][time_period][n_persons_half:] - + data["self_employment_income"][time_period][n_persons_half:] - ), - } - ) - - donor_pre_subsidy = data["spm_unit_pre_subsidy_childcare_expenses"][time_period][ - :n_spm_units_half - ] - donor_capped = data["spm_unit_capped_work_childcare_expenses"][time_period][ - :n_spm_units_half - ] - clone_pre_subsidy = data["spm_unit_pre_subsidy_childcare_expenses"][time_period][ - n_spm_units_half: - ] - clone_spm_unit_ids = data["spm_unit_id"][time_period][n_spm_units_half:] - - return derive_clone_capped_childcare_expenses( - donor_pre_subsidy=donor_pre_subsidy, - donor_capped=donor_capped, - clone_pre_subsidy=clone_pre_subsidy, - clone_person_data=clone_person_data, - clone_spm_unit_ids=clone_spm_unit_ids, - ) + return reference_key def _apply_post_processing(predictions, X_test, time_period, data): @@ -741,6 +643,75 @@ def _apply_post_processing(predictions, X_test, time_period, data): return predictions +def _index_into_spm_units( + person_spm_unit_ids: np.ndarray, + spm_unit_ids: np.ndarray, +) -> np.ndarray: + if np.all(spm_unit_ids[:-1] <= spm_unit_ids[1:]): + indices = np.searchsorted(spm_unit_ids, person_spm_unit_ids) + valid = (indices >= 0) & (indices < len(spm_unit_ids)) + if valid.all() and np.array_equal(spm_unit_ids[indices], person_spm_unit_ids): + return indices + + indexer = pd.Index(spm_unit_ids).get_indexer(person_spm_unit_ids) + if (indexer < 0).any(): + raise ValueError("person_spm_unit_id contains values missing from spm_unit_id") + return indexer + + +def rebuild_cloned_spm_thresholds(data: dict, time_period: int) -> np.ndarray: + """Rebuild cloned SPM thresholds from donor-half geography. + + The clone half should keep the donor household's geography but use the + current threshold formula rather than a learned QRF output. We infer the + donor-half geographic adjustment from the stored first-half thresholds and + apply that same geography to the corresponding clone-half SPM units. + """ + + person_ages = np.asarray(data["age"][time_period]) + person_spm_unit_ids = np.asarray(data["person_spm_unit_id"][time_period]) + spm_unit_ids = np.asarray(data["spm_unit_id"][time_period]) + tenure_types = np.asarray(data["spm_unit_tenure_type"][time_period]) + stored_thresholds = np.asarray( + data["spm_unit_spm_threshold"][time_period], dtype=float + ) + + n_units = len(spm_unit_ids) + if n_units % 2 != 0: + raise ValueError( + "Expected cloned SPM units to have an even-length spm_unit_id array" + ) + + unit_indices = _index_into_spm_units(person_spm_unit_ids, spm_unit_ids) + is_adult = person_ages >= 18 + adult_counts = np.zeros(n_units, dtype=np.int32) + child_counts = np.zeros(n_units, dtype=np.int32) + np.add.at(adult_counts, unit_indices, is_adult.astype(np.int32)) + np.add.at(child_counts, unit_indices, (~is_adult).astype(np.int32)) + + thresholds_by_tenure = get_spm_reference_thresholds(time_period) + reference_base = np.array( + [ + thresholds_by_tenure[_reference_threshold_key_for_tenure(tenure)] + for tenure in tenure_types + ], + dtype=float, + ) + equivalence_scale = spm_equivalence_scale(adult_counts, child_counts) + + n_half = n_units // 2 + donor_denominator = reference_base[:n_half] * equivalence_scale[:n_half] + donor_geoadj = np.divide( + stored_thresholds[:n_half], + donor_denominator, + out=np.ones(n_half, dtype=float), + where=donor_denominator > 0, + ) + geoadj = np.concatenate([donor_geoadj, donor_geoadj]) + rebuilt = reference_base * equivalence_scale * geoadj + return rebuilt.astype(np.float32) + + def _splice_cps_only_predictions( data: dict, predictions: pd.DataFrame, @@ -802,24 +773,6 @@ def _splice_cps_only_predictions( new_values = np.concatenate([cps_half, pred_values]) data[var] = {time_period: new_values} - if ( - "spm_unit_capped_work_childcare_expenses" in data - and "spm_unit_pre_subsidy_childcare_expenses" in data - ): - n_half = entity_half_lengths.get( - "spm_unit", - len(data["spm_unit_capped_work_childcare_expenses"][time_period]) // 2, - ) - cps_half = data["spm_unit_capped_work_childcare_expenses"][time_period][:n_half] - clone_half = _rebuild_clone_capped_childcare_expenses( - data=data, - time_period=time_period, - cps_sim=cps_sim, - ) - data["spm_unit_capped_work_childcare_expenses"] = { - time_period: np.concatenate([cps_half, clone_half]) - } - del cps_sim return data @@ -892,6 +845,14 @@ def generate(self): time_period=self.time_period, dataset_path=str(self.cps.file_path), ) + if "spm_unit_spm_threshold" in new_data: + logger.info("Rebuilding cloned SPM thresholds from donor-half geography") + new_data["spm_unit_spm_threshold"] = { + self.time_period: rebuild_cloned_spm_thresholds( + new_data, + self.time_period, + ) + } new_data = self._rename_imputed_to_inputs(new_data) if _supports_structural_mortgage_inputs(): diff --git a/policyengine_us_data/datasets/cps/small_enhanced_cps.py b/policyengine_us_data/datasets/cps/small_enhanced_cps.py index a15080321..6dd6a02b7 100644 --- a/policyengine_us_data/datasets/cps/small_enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/small_enhanced_cps.py @@ -1,14 +1,89 @@ +import h5py +import logging import os -import pandas as pd import numpy as np -import h5py - +import pandas as pd from policyengine_us import Microsimulation -from policyengine_us_data.datasets import EnhancedCPS_2024 -from policyengine_us_data.storage import STORAGE_FOLDER from policyengine_core.enums import Enum from policyengine_core.data.dataset import Dataset -import logging +from policyengine_us_data.calibration.puf_impute import CLONE_ORIGIN_FLAGS +from policyengine_us_data.datasets import EnhancedCPS_2024 +from policyengine_us_data.storage import STORAGE_FOLDER + + +def _load_saved_period_array( + h5_file: h5py.File, variable_name: str, period: int +) -> np.ndarray: + obj = h5_file[variable_name] + if isinstance(obj, h5py.Dataset): + return np.asarray(obj[...]) + + period_key = str(period) + if period_key in obj: + return np.asarray(obj[period_key][...]) + if period in obj: + return np.asarray(obj[period][...]) + + raise KeyError(f"{variable_name} missing period {period}") + + +def _normalize_annual_period_key(period_key) -> int: + if isinstance(period_key, (int, np.integer)): + return int(period_key) + if hasattr(period_key, "year"): + return int(period_key.year) + return int(str(period_key)) + + +def _attach_clone_origin_flags( + data: dict[str, dict[int, np.ndarray]], + source_dataset_path, +) -> None: + with h5py.File(source_dataset_path, "r") as source_h5: + for entity_key, flag_name in CLONE_ORIGIN_FLAGS.items(): + id_variable = f"{entity_key}_id" + if id_variable not in data: + raise KeyError( + f"Expected {id_variable} in sampled small ECPS data " + f"before attaching {flag_name}" + ) + + source_ids_by_period = {} + source_flags_by_period = {} + for period_key in data[id_variable]: + period = _normalize_annual_period_key(period_key) + source_ids = _load_saved_period_array(source_h5, id_variable, period) + source_flags = _load_saved_period_array(source_h5, flag_name, period) + if len(source_ids) != len(source_flags): + raise ValueError( + f"{flag_name} length {len(source_flags)} does not match " + f"{id_variable} length {len(source_ids)} for period {period}" + ) + source_ids_by_period[period_key] = source_ids + source_flags_by_period[period_key] = source_flags + + data[flag_name] = {} + for period_key, sampled_ids in data[id_variable].items(): + source_ids = source_ids_by_period[period_key] + source_flags = source_flags_by_period[period_key] + flag_lookup = { + int(entity_id): np.int8(flag_value) + for entity_id, flag_value in zip(source_ids, source_flags) + } + sampled_ids = np.asarray(sampled_ids) + missing_ids = [ + int(entity_id) + for entity_id in sampled_ids + if int(entity_id) not in flag_lookup + ] + if missing_ids: + raise KeyError( + f"Missing {flag_name} values for sampled IDs {missing_ids[:5]}" + ) + data[flag_name][period_key] = np.asarray( + [flag_lookup[int(entity_id)] for entity_id in sampled_ids], + dtype=np.int8, + ) def create_small_ecps(): @@ -51,6 +126,8 @@ def create_small_ecps(): if len(data[variable]) == 0: del data[variable] + _attach_clone_origin_flags(data, EnhancedCPS_2024.file_path) + with h5py.File(STORAGE_FOLDER / "small_enhanced_cps_2024.h5", "w") as f: for variable, periods in data.items(): grp = f.create_group(variable) @@ -65,7 +142,6 @@ def create_sparse_ecps(): ecps = EnhancedCPS_2024() h5 = ecps.load() sparse_weights = h5["household_weight"][str(time_period)][:] - hh_ids = h5["household_id"][str(time_period)][:] h5.close() template_sim = Microsimulation( @@ -78,7 +154,6 @@ def create_sparse_ecps(): household_weight_column = f"household_weight__{time_period}" df_household_id_column = f"household_id__{time_period}" - df_person_id_column = f"person_id__{time_period}" # Group by household ID and get the first entry for each group h_df = df.groupby(df_household_id_column).first() diff --git a/policyengine_us_data/datasets/puf/puf.py b/policyengine_us_data/datasets/puf/puf.py index bde0f33ff..4d2ce72fd 100644 --- a/policyengine_us_data/datasets/puf/puf.py +++ b/policyengine_us_data/datasets/puf/puf.py @@ -802,10 +802,11 @@ class PUF_2024(PUF): url = "release://policyengine/irs-soi-puf/1.8.0/puf_2024.h5" +# Leave Medicare Part B out of the generic PUF medical-expense split: +# the baseline model now derives Part B premiums separately. MEDICAL_EXPENSE_CATEGORY_BREAKDOWNS = { "health_insurance_premiums_without_medicare_part_b": 0.453, "other_medical_expenses": 0.325, - "medicare_part_b_premiums": 0.137, "over_the_counter_health_expenses": 0.085, } diff --git a/policyengine_us_data/db/etl_national_targets.py b/policyengine_us_data/db/etl_national_targets.py index b6aaa5c2c..7989e8fbc 100644 --- a/policyengine_us_data/db/etl_national_targets.py +++ b/policyengine_us_data/db/etl_national_targets.py @@ -12,6 +12,14 @@ from policyengine_us_data.storage.calibration_targets.soi_metadata import ( RETIREMENT_CONTRIBUTION_TARGETS, ) +from policyengine_us_data.utils.census_spm import ( + build_census_spm_capped_housing_subsidy_target, +) +from policyengine_us_data.utils.cms_medicare import ( + get_beneficiary_paid_medicare_part_b_premiums_notes, + get_beneficiary_paid_medicare_part_b_premiums_source, + get_beneficiary_paid_medicare_part_b_premiums_target, +) from policyengine_us_data.utils.db import ( DEFAULT_YEAR, etl_argparser, @@ -152,9 +160,15 @@ def extract_national_targets(year: int = DEFAULT_YEAR): }, { "variable": "medicare_part_b_premiums", - "value": 112e9, - "source": "CMS Medicare data", - "notes": "Medicare Part B premium payments", + "value": get_beneficiary_paid_medicare_part_b_premiums_target( + HARDCODED_YEAR + ), + "source": get_beneficiary_paid_medicare_part_b_premiums_source( + HARDCODED_YEAR + ), + "notes": get_beneficiary_paid_medicare_part_b_premiums_notes( + HARDCODED_YEAR + ), "year": HARDCODED_YEAR, }, { @@ -185,13 +199,7 @@ def extract_national_targets(year: int = DEFAULT_YEAR): "notes": "Work and childcare expenses for SPM", "year": HARDCODED_YEAR, }, - { - "variable": "spm_unit_capped_housing_subsidy", - "value": 35e9, - "source": "HUD/Census", - "notes": "Housing subsidies", - "year": HARDCODED_YEAR, - }, + build_census_spm_capped_housing_subsidy_target(HARDCODED_YEAR), { "variable": "tanf", "value": 9e9, diff --git a/policyengine_us_data/storage/calibration_targets/pull_hardcoded_targets.py b/policyengine_us_data/storage/calibration_targets/pull_hardcoded_targets.py index 3199a56a2..0503c7b59 100644 --- a/policyengine_us_data/storage/calibration_targets/pull_hardcoded_targets.py +++ b/policyengine_us_data/storage/calibration_targets/pull_hardcoded_targets.py @@ -1,52 +1,67 @@ import pandas as pd import numpy as np from policyengine_us_data.storage import CALIBRATION_FOLDER +from policyengine_us_data.utils.census_spm import ( + get_census_spm_capped_housing_subsidy_total, +) """ -Hardcoded targets for the year 2024 from CPS-derived statistics and other sources. Include medical expenses, sum of SPM thresholds, and child support expenses. +Hardcoded targets for the year 2024 from CPS-derived statistics and other +sources. Housing uses the Census SPM capped subsidy concept, not HUD spending. """ -HARD_CODED_TOTALS = { - "health_insurance_premiums_without_medicare_part_b": 385e9, - "other_medical_expenses": 278e9, - "medicare_part_b_premiums": 112e9, - "over_the_counter_health_expenses": 72e9, - "spm_unit_spm_threshold": 3_945e9, - "child_support_expense": 33e9, - "child_support_received": 33e9, - "spm_unit_capped_work_childcare_expenses": 348e9, - "spm_unit_capped_housing_subsidy": 35e9, - "tanf": 9e9, - # Alimony could be targeted via SOI - "alimony_income": 13e9, - "alimony_expense": 13e9, - # Rough estimate, not CPS derived - "real_estate_taxes": 500e9, # Rough estimate between 350bn and 600bn total property tax collections - "rent": 735e9, # ACS total uprated by CPI - # Table 5A from https://www.irs.gov/statistics/soi-tax-stats-individual-information-return-form-w2-statistics - # shows $38,316,190,000 in Box 7: Social security tips (2018) - # Wages and salaries grew 32% from 2018 to 2023: https://fred.stlouisfed.org/graph/?g=1J0CC - # Assume 40% through 2024 - "tip_income": 38e9 * 1.4, -} + +def get_hard_coded_totals( + year: int = 2024, + storage_folder=None, +) -> dict[str, float]: + return { + "health_insurance_premiums_without_medicare_part_b": 385e9, + "other_medical_expenses": 278e9, + "medicare_part_b_premiums": 112e9, + "over_the_counter_health_expenses": 72e9, + "spm_unit_spm_threshold": 3_945e9, + "child_support_expense": 33e9, + "child_support_received": 33e9, + "spm_unit_capped_work_childcare_expenses": 348e9, + "spm_unit_capped_housing_subsidy": get_census_spm_capped_housing_subsidy_total( + year, storage_folder=storage_folder + ), + "tanf": 9e9, + # Alimony could be targeted via SOI + "alimony_income": 13e9, + "alimony_expense": 13e9, + # Rough estimate, not CPS derived + "real_estate_taxes": 500e9, # Rough estimate between 350bn and 600bn total property tax collections + "rent": 735e9, # ACS total uprated by CPI + # Table 5A from https://www.irs.gov/statistics/soi-tax-stats-individual-information-return-form-w2-statistics + # shows $38,316,190,000 in Box 7: Social security tips (2018) + # Wages and salaries grew 32% from 2018 to 2023: https://fred.stlouisfed.org/graph/?g=1J0CC + # Assume 40% through 2024 + "tip_income": 38e9 * 1.4, + } -def pull_hardcoded_targets(): +def pull_hardcoded_targets(year: int = 2024, storage_folder=None): """ Returns a DataFrame with hardcoded targets for various CPS-derived statistics and other sources. """ + hard_coded_totals = get_hard_coded_totals( + year=year, + storage_folder=storage_folder, + ) data = { - "DATA_SOURCE": ["hardcoded"] * len(HARD_CODED_TOTALS), - "GEO_ID": ["0000000US"] * len(HARD_CODED_TOTALS), - "GEO_NAME": ["national"] * len(HARD_CODED_TOTALS), - "VARIABLE": list(HARD_CODED_TOTALS.keys()), - "VALUE": list(HARD_CODED_TOTALS.values()), + "DATA_SOURCE": ["hardcoded"] * len(hard_coded_totals), + "GEO_ID": ["0000000US"] * len(hard_coded_totals), + "GEO_NAME": ["national"] * len(hard_coded_totals), + "VARIABLE": list(hard_coded_totals.keys()), + "VALUE": list(hard_coded_totals.values()), "IS_COUNT": [0.0] - * len(HARD_CODED_TOTALS), # All values are monetary amounts, not counts + * len(hard_coded_totals), # All values are monetary amounts, not counts "BREAKDOWN_VARIABLE": [np.nan] - * len(HARD_CODED_TOTALS), # No breakdown variable for hardcoded targets - "LOWER_BOUND": [np.nan] * len(HARD_CODED_TOTALS), - "UPPER_BOUND": [np.nan] * len(HARD_CODED_TOTALS), + * len(hard_coded_totals), # No breakdown variable for hardcoded targets + "LOWER_BOUND": [np.nan] * len(hard_coded_totals), + "UPPER_BOUND": [np.nan] * len(hard_coded_totals), } df = pd.DataFrame(data) diff --git a/policyengine_us_data/storage/upload_completed_datasets.py b/policyengine_us_data/storage/upload_completed_datasets.py index a21a94b3c..e88504ab2 100644 --- a/policyengine_us_data/storage/upload_completed_datasets.py +++ b/policyengine_us_data/storage/upload_completed_datasets.py @@ -5,6 +5,7 @@ from policyengine_us_data.datasets import EnhancedCPS_2024 from policyengine_us_data.datasets.cps.cps import CPS_2024 +from policyengine_us_data.datasets.cps.enhanced_cps import clone_diagnostics_path from policyengine_us_data.storage import STORAGE_FOLDER from policyengine_us_data.utils.data_upload import upload_data_files from policyengine_us_data.utils.dataset_validation import ( @@ -187,6 +188,7 @@ def upload_datasets(require_enhanced_cps: bool = True): ] enhanced_files = [ EnhancedCPS_2024.file_path, + clone_diagnostics_path(EnhancedCPS_2024.file_path), STORAGE_FOLDER / "small_enhanced_cps_2024.h5", ] if require_enhanced_cps: diff --git a/policyengine_us_data/tests/test_local_area_calibration/test_spm_thresholds.py b/policyengine_us_data/tests/test_local_area_calibration/test_spm_thresholds.py new file mode 100644 index 000000000..3e5b8bd34 --- /dev/null +++ b/policyengine_us_data/tests/test_local_area_calibration/test_spm_thresholds.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd +import pytest + +from policyengine_us import CountryTaxBenefitSystem +from policyengine_us_data.calibration.calibration_utils import ( + calculate_spm_thresholds_vectorized, + load_cd_geoadj_values, +) + +SYSTEM = CountryTaxBenefitSystem() +CPI_U = SYSTEM.parameters.gov.bls.cpi.cpi_u + + +def test_load_cd_geoadj_values_returns_tenure_specific_lookup(monkeypatch): + rent_df = pd.DataFrame( + { + "cd_id": ["0101"], + "median_2br_rent": [1_500.0], + "national_median_2br_rent": [1_000.0], + } + ) + monkeypatch.setattr( + "policyengine_us_data.calibration.calibration_utils.pd.read_csv", + lambda *args, **kwargs: rent_df, + ) + + geoadj_lookup = load_cd_geoadj_values(["101"]) + + assert geoadj_lookup["101"]["renter"] == pytest.approx(1.2215) + assert geoadj_lookup["101"]["owner_with_mortgage"] == pytest.approx(1.217) + assert geoadj_lookup["101"]["owner_without_mortgage"] == pytest.approx(1.1615) + + +def test_calculate_spm_thresholds_vectorized_matches_policyengine_us_future_path(): + thresholds = calculate_spm_thresholds_vectorized( + person_ages=np.array([40, 35, 10, 8]), + person_spm_unit_ids=np.array([0, 0, 0, 0]), + spm_unit_tenure_types=np.array([b"RENTER"]), + spm_unit_geoadj=np.array([1.1]), + year=2027, + ) + + cpi_ratio = float(CPI_U("2027-02-01") / CPI_U("2024-02-01")) + expected = 39_430.0 * cpi_ratio * 1.1 + + assert thresholds[0] == pytest.approx(expected) diff --git a/policyengine_us_data/utils/census_spm.py b/policyengine_us_data/utils/census_spm.py new file mode 100644 index 000000000..e7a9c32b4 --- /dev/null +++ b/policyengine_us_data/utils/census_spm.py @@ -0,0 +1,65 @@ +from pathlib import Path + +import pandas as pd + +from policyengine_us_data.storage import STORAGE_FOLDER + +# These totals are derived from raw public-use Census CPS ASEC files by +# summing SPM_CAPHOUSESUB * SPM_WEIGHT / 100 at the SPM-unit level. +# They represent the Census SPM capped housing subsidy concept, not HUD +# spending or outlays. +CENSUS_SPM_CAPPED_HOUSING_SUBSIDY_TOTALS = { + 2022: 29_549_204_420.92, + 2023: 31_844_144_470.85, + 2024: 33_649_114_150.37, +} + + +def get_census_spm_capped_housing_subsidy_total( + year: int, + storage_folder: Path | str | None = None, +) -> float: + """ + Return the Census CPS ASEC total for SPM_CAPHOUSESUB. + + If a storage folder is provided, recompute directly from the local + raw Census CPS HDF file. Otherwise, use the checked-in year-specific + totals above so callers do not depend on local data files at import + time. + """ + + if storage_folder is None: + if year not in CENSUS_SPM_CAPPED_HOUSING_SUBSIDY_TOTALS: + raise ValueError( + f"No published Census SPM capped housing subsidy total for {year}." + ) + return CENSUS_SPM_CAPPED_HOUSING_SUBSIDY_TOTALS[year] + + storage_path = Path(storage_folder) / f"census_cps_{year}.h5" + if not storage_path.exists(): + raise FileNotFoundError( + f"Missing raw Census CPS file for {year}: {storage_path}" + ) + + with pd.HDFStore(storage_path, mode="r") as store: + spm_unit = store["spm_unit"][["SPM_CAPHOUSESUB", "SPM_WEIGHT"]] + + return float((spm_unit.SPM_CAPHOUSESUB * spm_unit.SPM_WEIGHT).sum() / 100) + + +def build_census_spm_capped_housing_subsidy_target( + year: int, + storage_folder: Path | str | None = None, +) -> dict: + return { + "variable": "spm_unit_capped_housing_subsidy", + "value": get_census_spm_capped_housing_subsidy_total( + year, storage_folder=storage_folder + ), + "source": "Census CPS ASEC public-use SPM_CAPHOUSESUB", + "notes": ( + "Capped SPM housing subsidy total from raw Census CPS ASEC " + "SPM_CAPHOUSESUB; not HUD spending or outlays." + ), + "year": year, + } diff --git a/policyengine_us_data/utils/cms_medicare.py b/policyengine_us_data/utils/cms_medicare.py new file mode 100644 index 000000000..b710cbdea --- /dev/null +++ b/policyengine_us_data/utils/cms_medicare.py @@ -0,0 +1,41 @@ +MEDICARE_PART_B_GROSS_PREMIUM_INCOME = { + 2024: 139.837e9, +} + + +MEDICARE_STATE_BUY_IN_MINIMUM_BENEFICIARIES = { + 2024: 10_000_000, +} + + +BENEFICIARY_PAID_MEDICARE_PART_B_PREMIUM_TARGETS = { + 2024: 112e9, +} + + +def get_beneficiary_paid_medicare_part_b_premiums_target(year: int) -> float: + try: + return BENEFICIARY_PAID_MEDICARE_PART_B_PREMIUM_TARGETS[year] + except KeyError as exc: + raise ValueError( + f"No beneficiary-paid Medicare Part B premium target sourced for {year}." + ) from exc + + +def get_beneficiary_paid_medicare_part_b_premiums_source(year: int) -> str: + gross_income = MEDICARE_PART_B_GROSS_PREMIUM_INCOME[year] / 1e9 + minimum_buy_in = MEDICARE_STATE_BUY_IN_MINIMUM_BENEFICIARIES[year] + return ( + "CMS 2025 Medicare Trustees Report Table III.C3 actual 2024 Part B " + f"premium income (${gross_income:.3f}B), plus CMS State Buy-In FAQ " + f"noting states paid Part B premiums for over {minimum_buy_in:,} people" + ) + + +def get_beneficiary_paid_medicare_part_b_premiums_notes(year: int) -> str: + return ( + "Approximate beneficiary-paid Medicare Part B out-of-pocket premiums " + "for SPM/MOOP calibration. This intentionally does not target gross " + "trust-fund premium income because Medicaid and other MSP pathways pay " + "premiums on behalf of some enrollees." + ) diff --git a/policyengine_us_data/utils/dataset_validation.py b/policyengine_us_data/utils/dataset_validation.py index 932003860..58960b472 100644 --- a/policyengine_us_data/utils/dataset_validation.py +++ b/policyengine_us_data/utils/dataset_validation.py @@ -22,6 +22,14 @@ "household": "household_id", } +AUXILIARY_ENTITY_PREFIXES = { + "person_": "person", + "tax_unit_": "tax_unit", + "family_": "family", + "spm_unit_": "spm_unit", + "household_": "household", +} + class DatasetContractError(Exception): """Raised when a built dataset does not match the active country package.""" @@ -127,6 +135,24 @@ def _infer_auxiliary_entity( entity_counts: dict[str, int], file_name: str, ) -> str: + for prefix, entity_key in AUXILIARY_ENTITY_PREFIXES.items(): + if not variable_name.startswith(prefix): + continue + expected_length = entity_counts.get(entity_key) + if expected_length is None: + raise DatasetContractError( + f"{file_name} contains auxiliary variable {variable_name} with a " + f"{entity_key}-scoped prefix, but {ENTITY_ID_VARIABLES[entity_key]} " + "is missing from the dataset." + ) + if actual_length != expected_length: + raise DatasetContractError( + f"{file_name} contains auxiliary variable {variable_name} with " + f"{prefix!r} prefix and expected {entity_key} length " + f"{expected_length}, found {actual_length}." + ) + return entity_key + candidate_entities = [ entity_key for entity_key, entity_count in entity_counts.items() diff --git a/policyengine_us_data/utils/hud_housing.py b/policyengine_us_data/utils/hud_housing.py new file mode 100644 index 000000000..1b7620e5d --- /dev/null +++ b/policyengine_us_data/utils/hud_housing.py @@ -0,0 +1,49 @@ +HUD_USER_HOUSING_ASSISTANCE_BENCHMARKS = { + 2022: { + "reported_households": 4_537_614, + "average_monthly_spending_per_unit": 899, + }, + 2023: { + "reported_households": 4_569_973, + "average_monthly_spending_per_unit": 989, + }, + 2024: { + "reported_households": 4_584_691, + "average_monthly_spending_per_unit": 1_067, + }, + 2025: { + "reported_households": 4_519_561, + "average_monthly_spending_per_unit": 1_135, + }, +} + + +def get_hud_user_housing_benchmark(year: int) -> dict: + if year not in HUD_USER_HOUSING_ASSISTANCE_BENCHMARKS: + raise ValueError(f"No HUD USER housing benchmark for {year}.") + + benchmark = dict(HUD_USER_HOUSING_ASSISTANCE_BENCHMARKS[year]) + benchmark["annual_spending_total"] = ( + benchmark["reported_households"] + * benchmark["average_monthly_spending_per_unit"] + * 12 + ) + return benchmark + + +def build_hud_user_housing_assistance_benchmark(year: int) -> dict: + benchmark = get_hud_user_housing_benchmark(year) + return { + "variable": "housing_assistance", + "annual_spending_total": benchmark["annual_spending_total"], + "reported_households": benchmark["reported_households"], + "average_monthly_spending_per_unit": benchmark[ + "average_monthly_spending_per_unit" + ], + "source": "HUD USER Picture of Subsidized Households", + "notes": ( + "Annual federal spending and occupied assisted households from " + "HUD USER; not Census SPM capped subsidy." + ), + "year": year, + } diff --git a/policyengine_us_data/utils/loss.py b/policyengine_us_data/utils/loss.py index 9a81ea9d6..72c38e5ce 100644 --- a/policyengine_us_data/utils/loss.py +++ b/policyengine_us_data/utils/loss.py @@ -12,6 +12,12 @@ from policyengine_us_data.storage.calibration_targets.soi_metadata import ( RETIREMENT_CONTRIBUTION_TARGETS, ) +from policyengine_us_data.utils.census_spm import ( + get_census_spm_capped_housing_subsidy_total, +) +from policyengine_us_data.utils.cms_medicare import ( + get_beneficiary_paid_medicare_part_b_premiums_target, +) from policyengine_core.reforms import Reform from policyengine_us_data.utils.soi import pe_to_soi, get_soi @@ -25,13 +31,17 @@ HARD_CODED_TOTALS = { "health_insurance_premiums_without_medicare_part_b": 385e9, "other_medical_expenses": 278e9, - "medicare_part_b_premiums": 112e9, + "medicare_part_b_premiums": get_beneficiary_paid_medicare_part_b_premiums_target( + 2024 + ), "over_the_counter_health_expenses": 72e9, "spm_unit_spm_threshold": 3_945e9, "child_support_expense": 33e9, "child_support_received": 33e9, "spm_unit_capped_work_childcare_expenses": 348e9, - "spm_unit_capped_housing_subsidy": 35e9, + "spm_unit_capped_housing_subsidy": get_census_spm_capped_housing_subsidy_total( + 2024 + ), "tanf": 9e9, # Alimony could be targeted via SOI "alimony_income": 13e9, diff --git a/policyengine_us_data/utils/spm.py b/policyengine_us_data/utils/spm.py index ad3c9e9fb..a2220339b 100644 --- a/policyengine_us_data/utils/spm.py +++ b/policyengine_us_data/utils/spm.py @@ -1,7 +1,69 @@ -"""SPM threshold calculation utilities using the spm-calculator package.""" +"""SPM threshold calculation utilities aligned with PolicyEngine US.""" +from functools import lru_cache import numpy as np -from spm_calculator import SPMCalculator, spm_equivalence_scale +from policyengine_us import CountryTaxBenefitSystem + +PUBLISHED_SPM_REFERENCE_THRESHOLDS = { + 2015: { + "renter": 25_155.0, + "owner_with_mortgage": 24_859.0, + "owner_without_mortgage": 20_639.0, + }, + 2016: { + "renter": 25_558.0, + "owner_with_mortgage": 25_248.0, + "owner_without_mortgage": 20_943.0, + }, + 2017: { + "renter": 26_213.0, + "owner_with_mortgage": 25_897.0, + "owner_without_mortgage": 21_527.0, + }, + 2018: { + "renter": 26_905.0, + "owner_with_mortgage": 26_565.0, + "owner_without_mortgage": 22_095.0, + }, + 2019: { + "renter": 27_515.0, + "owner_with_mortgage": 27_172.0, + "owner_without_mortgage": 22_600.0, + }, + 2020: { + "renter": 28_881.0, + "owner_with_mortgage": 28_533.0, + "owner_without_mortgage": 23_948.0, + }, + 2021: { + "renter": 31_453.0, + "owner_with_mortgage": 31_089.0, + "owner_without_mortgage": 26_022.0, + }, + 2022: { + "renter": 33_402.0, + "owner_with_mortgage": 32_949.0, + "owner_without_mortgage": 27_679.0, + }, + 2023: { + "renter": 36_606.0, + "owner_with_mortgage": 36_192.0, + "owner_without_mortgage": 30_347.0, + }, + 2024: { + "renter": 39_430.0, + "owner_with_mortgage": 39_068.0, + "owner_without_mortgage": 32_586.0, + }, +} + +LATEST_PUBLISHED_SPM_THRESHOLD_YEAR = max(PUBLISHED_SPM_REFERENCE_THRESHOLDS) +REFERENCE_RAW_SCALE = 3**0.7 +TENURE_HOUSING_SHARES = { + "owner_with_mortgage": 0.434, + "owner_without_mortgage": 0.323, + "renter": 0.443, +} TENURE_CODE_MAP = { 1: "owner_with_mortgage", @@ -10,6 +72,74 @@ } +@lru_cache(maxsize=1) +def _get_cpi_u_parameter(): + system = CountryTaxBenefitSystem() + return system.parameters.gov.bls.cpi.cpi_u + + +def spm_equivalence_scale(num_adults, num_children): + adults, children = np.broadcast_arrays( + np.asarray(num_adults, dtype=float), + np.asarray(num_children, dtype=float), + ) + + raw = np.zeros_like(adults, dtype=float) + has_people = (adults + children) > 0 + with_children = has_people & (children > 0) + + single_adult_with_children = with_children & (adults <= 1) + raw[single_adult_with_children] = ( + 1.0 + 0.8 + 0.5 * np.maximum(children[single_adult_with_children] - 1, 0) + ) ** 0.7 + + multi_adult_with_children = with_children & ~single_adult_with_children + raw[multi_adult_with_children] = ( + adults[multi_adult_with_children] + 0.5 * children[multi_adult_with_children] + ) ** 0.7 + + no_children = has_people & ~with_children + one_adult = no_children & (adults <= 1) + two_adults = no_children & (adults == 2) + larger_adult_units = no_children & (adults > 2) + + raw[one_adult] = 1.0 + raw[two_adults] = 1.41 + raw[larger_adult_units] = adults[larger_adult_units] ** 0.7 + + return raw / REFERENCE_RAW_SCALE + + +def get_spm_reference_thresholds(year: int) -> dict[str, float]: + if year in PUBLISHED_SPM_REFERENCE_THRESHOLDS: + return PUBLISHED_SPM_REFERENCE_THRESHOLDS[year].copy() + + if year < min(PUBLISHED_SPM_REFERENCE_THRESHOLDS): + raise ValueError( + f"No published SPM reference thresholds for {year}. " + f"Earliest available year is {min(PUBLISHED_SPM_REFERENCE_THRESHOLDS)}." + ) + + cpi_u = _get_cpi_u_parameter() + factor = float( + cpi_u(f"{year}-02-01") / cpi_u(f"{LATEST_PUBLISHED_SPM_THRESHOLD_YEAR}-02-01") + ) + latest_thresholds = PUBLISHED_SPM_REFERENCE_THRESHOLDS[ + LATEST_PUBLISHED_SPM_THRESHOLD_YEAR + ] + return {tenure: value * factor for tenure, value in latest_thresholds.items()} + + +def calculate_geoadj_from_rent( + local_rent, + national_rent: float, + tenure: str = "renter", +): + share = TENURE_HOUSING_SHARES[tenure] + rent_ratio = np.asarray(local_rent, dtype=float) / float(national_rent) + return rent_ratio * share + (1.0 - share) + + def calculate_spm_thresholds_with_geoadj( num_adults: np.ndarray, num_children: np.ndarray, @@ -35,8 +165,7 @@ def calculate_spm_thresholds_with_geoadj( Returns: Array of SPM threshold values. """ - calc = SPMCalculator(year=year) - base_thresholds = calc.get_base_thresholds() + base_thresholds = get_spm_reference_thresholds(year) n = len(num_adults) thresholds = np.zeros(n) diff --git a/tests/unit/calibration/test_calibration_puf_impute.py b/tests/unit/calibration/test_calibration_puf_impute.py index d803486ee..13a2f5176 100644 --- a/tests/unit/calibration/test_calibration_puf_impute.py +++ b/tests/unit/calibration/test_calibration_puf_impute.py @@ -34,9 +34,11 @@ def _make_mock_data(n_persons=20, n_households=5, time_period=2024): "household_id": {time_period: np.arange(1, n_households + 1)}, "tax_unit_id": {time_period: np.arange(1, n_households + 1)}, "spm_unit_id": {time_period: np.arange(1, n_households + 1)}, + "family_id": {time_period: np.arange(1, n_households + 1)}, "person_household_id": {time_period: household_ids_person}, "person_tax_unit_id": {time_period: tax_unit_ids_person}, "person_spm_unit_id": {time_period: spm_unit_ids_person}, + "person_family_id": {time_period: household_ids_person}, "age": {time_period: ages.astype(np.float32)}, "is_male": {time_period: is_male.astype(np.float32)}, "household_weight": {time_period: np.ones(n_households) * 1000}, @@ -135,6 +137,31 @@ def test_overridden_subset_of_imputed(self): for var in OVERRIDDEN_IMPUTED_VARIABLES: assert var in IMPUTED_VARIABLES + def test_clone_origin_flags_are_added(self): + data = _make_mock_data(n_persons=20, n_households=5) + state_fips = np.array([1, 2, 36, 6, 48]) + + result = puf_clone_dataset( + data=data, + state_fips=state_fips, + time_period=2024, + skip_qrf=True, + ) + + expected_lengths = { + "person_is_puf_clone": 20, + "tax_unit_is_puf_clone": 5, + "spm_unit_is_puf_clone": 5, + "family_is_puf_clone": 5, + "household_is_puf_clone": 5, + } + + for variable_name, half_length in expected_lengths.items(): + values = result[variable_name][2024] + assert values.dtype == np.int8 + np.testing.assert_array_equal(values[:half_length], 0) + np.testing.assert_array_equal(values[half_length:], 1) + class TestStratifiedSubsample: def test_noop_when_small(self): diff --git a/tests/unit/test_census_spm_housing_targets.py b/tests/unit/test_census_spm_housing_targets.py new file mode 100644 index 000000000..e595848d1 --- /dev/null +++ b/tests/unit/test_census_spm_housing_targets.py @@ -0,0 +1,174 @@ +import importlib +import sys +import types + +import pandas as pd +import validation.benefit_validation as benefit_validation + +from policyengine_us_data.db import etl_national_targets +from policyengine_us_data.storage.calibration_targets.pull_hardcoded_targets import ( + pull_hardcoded_targets, +) +from policyengine_us_data.utils.census_spm import ( + build_census_spm_capped_housing_subsidy_target, + get_census_spm_capped_housing_subsidy_total, +) +from policyengine_us_data.utils.hud_housing import ( + build_hud_user_housing_assistance_benchmark, + get_hud_user_housing_benchmark, +) + + +def _write_census_cps(storage_dir, year=2024): + path = storage_dir / f"census_cps_{year}.h5" + with pd.HDFStore(path, mode="w") as store: + store["spm_unit"] = pd.DataFrame( + { + "SPM_CAPHOUSESUB": [1_000.0, 0.0, 2_000.0], + "SPM_WEIGHT": [150, 250, 100], + } + ) + return path + + +def test_get_census_spm_capped_housing_subsidy_total_uses_raw_cps_spm_values( + tmp_path, +): + _write_census_cps(tmp_path) + + total = get_census_spm_capped_housing_subsidy_total(2024, storage_folder=tmp_path) + + expected = (1_000 * 150 + 0 * 250 + 2_000 * 100) / 100 + assert total == expected + + +def test_pull_hardcoded_targets_uses_census_spm_housing_total(tmp_path): + _write_census_cps(tmp_path) + + targets = pull_hardcoded_targets(year=2024, storage_folder=tmp_path) + housing = targets.loc[ + targets.VARIABLE == "spm_unit_capped_housing_subsidy", "VALUE" + ].iat[0] + + assert housing == (1_000 * 150 + 0 * 250 + 2_000 * 100) / 100 + + +def test_extract_national_targets_uses_census_spm_housing_target(monkeypatch): + class DummyMicrosimulation: + def __init__(self, dataset): + self.default_calculation_period = 2024 + + policyengine_us = importlib.import_module("policyengine_us") + monkeypatch.setitem( + sys.modules, + "policyengine_us", + types.SimpleNamespace( + **{ + name: getattr(policyengine_us, name) + for name in dir(policyengine_us) + if not name.startswith("__") and name != "Microsimulation" + }, + Microsimulation=DummyMicrosimulation, + ), + ) + monkeypatch.setattr( + etl_national_targets, + "build_census_spm_capped_housing_subsidy_target", + lambda year, storage_folder=None: { + "variable": "spm_unit_capped_housing_subsidy", + "value": 123.0, + "source": "Census CPS ASEC public-use SPM_CAPHOUSESUB", + "notes": "Capped SPM housing subsidy total from raw Census CPS ASEC.", + "year": year, + }, + ) + + targets = etl_national_targets.extract_national_targets(2024) + housing = next( + target + for target in targets["direct_sum_targets"] + if target["variable"] == "spm_unit_capped_housing_subsidy" + ) + + assert housing["value"] == 123.0 + assert housing["source"] == "Census CPS ASEC public-use SPM_CAPHOUSESUB" + assert "Capped SPM housing subsidy" in housing["notes"] + + +def test_build_census_spm_capped_housing_subsidy_target_labels_concept(tmp_path): + _write_census_cps(tmp_path) + + target = build_census_spm_capped_housing_subsidy_target( + 2024, storage_folder=tmp_path + ) + + assert target["variable"] == "spm_unit_capped_housing_subsidy" + assert target["value"] == (1_000 * 150 + 0 * 250 + 2_000 * 100) / 100 + assert target["source"] == "Census CPS ASEC public-use SPM_CAPHOUSESUB" + assert "not HUD spending" in target["notes"] + + +def test_get_hud_user_housing_benchmark_uses_annual_spending_concept(): + benchmark = get_hud_user_housing_benchmark(2022) + + assert benchmark["reported_households"] == 4_537_614 + assert benchmark["average_monthly_spending_per_unit"] == 899 + assert benchmark["annual_spending_total"] == 48_951_779_832 + + +def test_build_hud_user_housing_assistance_benchmark_labels_admin_concept(): + benchmark = build_hud_user_housing_assistance_benchmark(2022) + + assert benchmark["variable"] == "housing_assistance" + assert benchmark["source"] == "HUD USER Picture of Subsidized Households" + assert benchmark["reported_households"] == 4_537_614 + assert benchmark["annual_spending_total"] == 48_951_779_832 + assert "not Census SPM capped subsidy" in benchmark["notes"] + + +def test_get_program_benchmarks_keeps_housing_concepts_separate(monkeypatch): + monkeypatch.setattr( + benefit_validation, + "build_census_spm_capped_housing_subsidy_target", + lambda year: { + "variable": "spm_unit_capped_housing_subsidy", + "value": 111e9, + "source": "Census benchmark", + "notes": "Census concept", + "year": year, + }, + ) + monkeypatch.setattr( + benefit_validation, + "build_hud_user_housing_assistance_benchmark", + lambda year: { + "variable": "housing_assistance", + "annual_spending_total": 222e9, + "reported_households": 3_000_000, + "average_monthly_spending_per_unit": 999, + "source": "HUD USER benchmark", + "notes": "HUD concept", + "year": year, + }, + ) + + benchmarks = benefit_validation.get_program_benchmarks(2022) + + assert ( + benchmarks["housing_spm_capped"]["variable"] + == "spm_unit_capped_housing_subsidy" + ) + assert benchmarks["housing_spm_capped"]["benchmark_total"] == 111 + assert benchmarks["housing_spm_capped"]["benchmark_source"] == "Census benchmark" + + assert benchmarks["housing_assistance_hud_user"]["variable"] == "housing_assistance" + assert benchmarks["housing_assistance_hud_user"]["benchmark_total"] == 222 + assert ( + benchmarks["housing_assistance_hud_user"]["benchmark_source"] + == "HUD USER benchmark" + ) + assert ( + benchmarks["housing_assistance_hud_user"]["benchmark_participants_millions"] + == 3 + ) + assert benchmarks["housing_assistance_hud_user"]["map_to"] == "household" diff --git a/tests/unit/test_cms_medicare_targets.py b/tests/unit/test_cms_medicare_targets.py new file mode 100644 index 000000000..d780bf8d4 --- /dev/null +++ b/tests/unit/test_cms_medicare_targets.py @@ -0,0 +1,25 @@ +import pytest + +from policyengine_us_data.utils.cms_medicare import ( + get_beneficiary_paid_medicare_part_b_premiums_notes, + get_beneficiary_paid_medicare_part_b_premiums_source, + get_beneficiary_paid_medicare_part_b_premiums_target, +) + + +def test_beneficiary_paid_medicare_part_b_target_2024_is_sourced(): + assert get_beneficiary_paid_medicare_part_b_premiums_target(2024) == pytest.approx( + 112e9 + ) + + +def test_beneficiary_paid_medicare_part_b_source_mentions_primary_sources(): + source = get_beneficiary_paid_medicare_part_b_premiums_source(2024) + assert "2025 Medicare Trustees Report" in source + assert "State Buy-In FAQ" in source + + +def test_beneficiary_paid_medicare_part_b_notes_describe_out_of_pocket_semantics(): + notes = get_beneficiary_paid_medicare_part_b_premiums_notes(2024) + assert "out-of-pocket" in notes + assert "gross trust-fund premium income" in notes diff --git a/tests/unit/test_dataset_validation.py b/tests/unit/test_dataset_validation.py index c1ee1ea84..af9941577 100644 --- a/tests/unit/test_dataset_validation.py +++ b/tests/unit/test_dataset_validation.py @@ -248,3 +248,59 @@ def test_validate_dataset_contract_rejects_entity_length_mismatch( microsimulation_cls=_FakeMicrosimulation, dataset_loader=lambda path: path, ) + + +def test_validate_dataset_contract_accepts_prefixed_auxiliary_entity( + tmp_path, monkeypatch +): + file_path = tmp_path / "prefixed_aux.h5" + _write_test_h5( + file_path, + { + "person_id": np.array([101], dtype=np.int32), + "household_id": np.array([201], dtype=np.int32), + "household_is_puf_clone": np.array([1], dtype=np.int8), + }, + ) + monkeypatch.setattr( + "policyengine_us_data.utils.dataset_validation.assert_locked_policyengine_us_version", + lambda: PolicyEngineUSBuildInfo(version="1.587.0"), + ) + + summary = validate_dataset_contract( + file_path, + tax_benefit_system=_fake_tax_benefit_system(), + microsimulation_cls=_FakeMicrosimulation, + dataset_loader=lambda path: path, + ) + + assert summary.variable_count == 3 + + +def test_validate_dataset_contract_rejects_prefixed_auxiliary_length_mismatch( + tmp_path, monkeypatch +): + file_path = tmp_path / "prefixed_aux_bad.h5" + _write_test_h5( + file_path, + { + "person_id": np.array([101], dtype=np.int32), + "household_id": np.array([201], dtype=np.int32), + "household_is_puf_clone": np.array([1, 0], dtype=np.int8), + }, + ) + monkeypatch.setattr( + "policyengine_us_data.utils.dataset_validation.assert_locked_policyengine_us_version", + lambda: PolicyEngineUSBuildInfo(version="1.587.0"), + ) + + with pytest.raises( + DatasetContractError, + match="expected household length 1, found 2", + ): + validate_dataset_contract( + file_path, + tax_benefit_system=_fake_tax_benefit_system(), + microsimulation_cls=_FakeMicrosimulation, + dataset_loader=lambda path: path, + ) diff --git a/tests/unit/test_enhanced_cps_clone_diagnostics.py b/tests/unit/test_enhanced_cps_clone_diagnostics.py new file mode 100644 index 000000000..a0261aac3 --- /dev/null +++ b/tests/unit/test_enhanced_cps_clone_diagnostics.py @@ -0,0 +1,138 @@ +from pathlib import Path + +import pytest + +from policyengine_us_data.datasets.cps.enhanced_cps import ( + build_clone_diagnostics_payload, + compute_clone_diagnostics_summary, + clone_diagnostics_path, + refresh_clone_diagnostics_report, + save_clone_diagnostics_report, +) + + +def test_compute_clone_diagnostics_summary(): + diagnostics = compute_clone_diagnostics_summary( + household_is_puf_clone=[False, True], + household_weight=[9.0, 1.0], + person_is_puf_clone=[False, True, True], + person_weight=[4.0, 3.0, 3.0], + person_in_poverty=[False, True, True], + person_reported_in_poverty=[False, False, True], + spm_unit_is_puf_clone=[False, True, True], + spm_unit_weight=[2.0, 3.0, 5.0], + spm_unit_capped_work_childcare_expenses=[0.0, 6000.0, 7000.0], + spm_unit_pre_subsidy_childcare_expenses=[0.0, 5000.0, 8000.0], + spm_unit_taxes=[100.0, 9000.0, 200.0], + spm_unit_market_income=[1000.0, 8000.0, 1000.0], + ) + + assert diagnostics["clone_household_weight_share_pct"] == pytest.approx(10.0) + assert diagnostics[ + "clone_poor_modeled_only_person_weight_share_pct" + ] == pytest.approx(30.0) + assert diagnostics[ + "poor_modeled_only_within_clone_person_weight_share_pct" + ] == pytest.approx(50.0) + assert diagnostics[ + "clone_childcare_exceeds_pre_subsidy_share_pct" + ] == pytest.approx(37.5) + assert diagnostics["clone_childcare_above_5000_share_pct"] == pytest.approx(100.0) + assert diagnostics["clone_taxes_exceed_market_income_share_pct"] == pytest.approx( + 37.5 + ) + + +def test_build_clone_diagnostics_payload_single_period(): + payload = build_clone_diagnostics_payload( + {2024: {"clone_person_weight_share_pct": 12.5}} + ) + + assert payload == { + "period": 2024, + "clone_person_weight_share_pct": 12.5, + } + + +def test_build_clone_diagnostics_payload_multiple_periods(): + payload = build_clone_diagnostics_payload( + { + 2026: {"clone_person_weight_share_pct": 20.0}, + 2024: {"clone_person_weight_share_pct": 10.0}, + } + ) + + assert payload == { + "periods": { + "2024": {"clone_person_weight_share_pct": 10.0}, + "2026": {"clone_person_weight_share_pct": 20.0}, + } + } + + +def test_refresh_clone_diagnostics_report_removes_stale_sidecar_on_failure(tmp_path): + file_path = tmp_path / "enhanced_cps_2024.h5" + file_path.write_text("placeholder") + stale_path = clone_diagnostics_path(file_path) + stale_path.write_text("stale") + + def _raise(): + raise RuntimeError("boom") + + with pytest.raises(RuntimeError, match="boom"): + refresh_clone_diagnostics_report(file_path, _raise) + + assert stale_path == Path(file_path).with_suffix(".clone_diagnostics.json") + assert not stale_path.exists() + + +def test_save_clone_diagnostics_report_removes_stale_sidecar_on_failure( + tmp_path, monkeypatch +): + class DummyDataset: + file_path = tmp_path / "enhanced_cps_2024.h5" + + DummyDataset.file_path.write_text("placeholder") + stale_path = clone_diagnostics_path(DummyDataset.file_path) + stale_path.write_text("stale") + + monkeypatch.setattr( + "policyengine_us_data.datasets.cps.enhanced_cps.build_clone_diagnostics_for_saved_dataset", + lambda dataset_cls, period: (_ for _ in ()).throw(RuntimeError("boom")), + ) + + with pytest.raises(RuntimeError, match="boom"): + save_clone_diagnostics_report( + DummyDataset, + start_year=2024, + end_year=2024, + ) + + assert not stale_path.exists() + + +def test_save_clone_diagnostics_report_writes_fresh_payload(tmp_path, monkeypatch): + class DummyDataset: + file_path = tmp_path / "enhanced_cps_2024.h5" + + DummyDataset.file_path.write_text("placeholder") + + monkeypatch.setattr( + "policyengine_us_data.datasets.cps.enhanced_cps.build_clone_diagnostics_for_saved_dataset", + lambda dataset_cls, period: {"clone_person_weight_share_pct": float(period)}, + ) + + output_path, payload = save_clone_diagnostics_report( + DummyDataset, + start_year=2024, + end_year=2025, + ) + + assert output_path == clone_diagnostics_path(DummyDataset.file_path) + assert payload == { + "periods": { + "2024": {"clone_person_weight_share_pct": 2024.0}, + "2025": {"clone_person_weight_share_pct": 2025.0}, + } + } + assert output_path.exists() diff --git a/tests/unit/test_extended_cps.py b/tests/unit/test_extended_cps.py index 646878760..cc6851c99 100644 --- a/tests/unit/test_extended_cps.py +++ b/tests/unit/test_extended_cps.py @@ -11,6 +11,7 @@ import pandas as pd import pytest +from policyengine_us_data.datasets.cps.enhanced_cps import initialize_weight_priors from policyengine_us_data.calibration.puf_impute import ( IMPUTED_VARIABLES, OVERRIDDEN_IMPUTED_VARIABLES, @@ -24,6 +25,7 @@ _derive_overtime_occupation_inputs, _impute_clone_cps_features, apply_retirement_constraints, + rebuild_cloned_spm_thresholds, derive_clone_capped_childcare_expenses, reconcile_ss_subcomponents, ) @@ -125,6 +127,7 @@ def test_pension_income_not_in_cps_only(self): should_not_be_here = { "taxable_private_pension_income", "tax_exempt_private_pension_income", + "spm_unit_spm_threshold", } present = should_not_be_here & set(CPS_ONLY_IMPUTED_VARIABLES) assert present == set(), ( @@ -132,8 +135,7 @@ def test_pension_income_not_in_cps_only(self): ) def test_capped_childcare_not_in_cps_only(self): - """Capped childcare should be derived from clone-half inputs, not - independently QRF-imputed.""" + """Capped childcare should not be independently QRF-imputed.""" assert "spm_unit_capped_work_childcare_expenses" not in set( CPS_ONLY_IMPUTED_VARIABLES ) @@ -212,6 +214,79 @@ def test_falls_back_to_zero_without_parent_proxies(self): np.testing.assert_allclose(result, np.array([0.0])) +class TestDeterministicSpmThresholdRebuild: + def test_rebuild_cloned_spm_thresholds_uses_donor_geoadj(self): + data = { + "age": {2024: np.array([40, 12, 70, 40, 12, 70], dtype=np.int32)}, + "person_spm_unit_id": { + 2024: np.array([10, 10, 20, 30, 30, 40], dtype=np.int32) + }, + "spm_unit_id": {2024: np.array([10, 20, 30, 40], dtype=np.int32)}, + "spm_unit_tenure_type": { + 2024: np.array( + [ + b"RENTER", + b"OWNER_WITHOUT_MORTGAGE", + b"RENTER", + b"OWNER_WITHOUT_MORTGAGE", + ] + ) + }, + "spm_unit_spm_threshold": { + 2024: np.array( + [20_000.0, 15_000.0, 999_999.0, 999_999.0], dtype=np.float64 + ) + }, + } + + rebuilt = rebuild_cloned_spm_thresholds(data, 2024) + + np.testing.assert_allclose(rebuilt[:2], np.array([20_000.0, 15_000.0])) + np.testing.assert_allclose(rebuilt[2:], np.array([20_000.0, 15_000.0])) + + def test_rebuild_cloned_spm_thresholds_rejects_unknown_tenure(self): + data = { + "age": {2024: np.array([40, 12, 40, 12], dtype=np.int32)}, + "person_spm_unit_id": {2024: np.array([10, 10, 20, 20], dtype=np.int32)}, + "spm_unit_id": {2024: np.array([10, 20], dtype=np.int32)}, + "spm_unit_tenure_type": { + 2024: np.array([b"RENTER", b"UNRECOGNIZED"], dtype="|S12") + }, + "spm_unit_spm_threshold": { + 2024: np.array([20_000.0, 20_000.0], dtype=np.float64) + }, + } + + with pytest.raises(ValueError, match="Unsupported spm_unit_tenure_type"): + rebuild_cloned_spm_thresholds(data, 2024) + + +class TestWeightPriorInitialization: + def test_initialize_weight_priors_keeps_zero_weight_records_near_zero(self): + weights = np.array([1_500.0, 0.0, 625.0, 0.0], dtype=np.float64) + + priors = initialize_weight_priors(weights, seed=123) + + assert np.all(priors > 0) + assert priors[1] < 1e-4 + assert priors[3] < 1e-4 + assert priors[0] == pytest.approx(1_500.0) + assert priors[2] == pytest.approx(625.0) + + def test_initialize_weight_priors_preserves_positive_weights_exactly(self): + weights = np.array([1_500.0, 625.0, 42.0], dtype=np.float64) + + priors = initialize_weight_priors(weights, seed=123) + + np.testing.assert_array_equal(priors, weights) + + def test_initialize_weight_priors_is_reproducible(self): + weights = np.array([400.0, 0.0, 100.0], dtype=np.float64) + + priors_a = initialize_weight_priors(weights, seed=77) + priors_b = initialize_weight_priors(weights, seed=77) + + np.testing.assert_allclose(priors_a, priors_b) class TestRetirementConstraints: """Post-processing retirement constraints enforce IRS caps.""" diff --git a/tests/unit/test_medicare_part_b_inputs.py b/tests/unit/test_medicare_part_b_inputs.py new file mode 100644 index 000000000..a082720df --- /dev/null +++ b/tests/unit/test_medicare_part_b_inputs.py @@ -0,0 +1,10 @@ +from policyengine_us_data.datasets.cps.extended_cps import CPS_ONLY_IMPUTED_VARIABLES +from policyengine_us_data.datasets.puf.puf import MEDICAL_EXPENSE_CATEGORY_BREAKDOWNS + + +def test_medicare_part_b_not_qrf_imputed_for_clone_half(): + assert "medicare_part_b_premiums" not in set(CPS_ONLY_IMPUTED_VARIABLES) + + +def test_medicare_part_b_not_allocated_from_generic_puf_medical_expenses(): + assert "medicare_part_b_premiums" not in MEDICAL_EXPENSE_CATEGORY_BREAKDOWNS diff --git a/tests/unit/test_reference_partner.py b/tests/unit/test_reference_partner.py new file mode 100644 index 000000000..83ee6bd28 --- /dev/null +++ b/tests/unit/test_reference_partner.py @@ -0,0 +1,32 @@ +""" +Tests for reference-person partner extraction from CPS ASEC. + +The public CPS ASEC relationship-to-reference-person variable PERRP identifies +unmarried partners of the household head/reference person. We carry that +through so the SPM childcare cap can distinguish the reference person's partner +from unrelated adults in the same SPM unit. +""" + +from pathlib import Path + + +class TestReferencePartner: + """Test suite for CPS relationship-to-reference-person extraction.""" + + def test_census_cps_includes_perrp(self): + census_cps_path = Path(__file__).parent.parent.parent / ( + "policyengine_us_data/datasets/cps/census_cps.py" + ) + content = census_cps_path.read_text() + + assert '"PERRP"' in content, "PERRP should be in PERSON_COLUMNS" + + def test_cps_maps_unmarried_partner_from_perrp(self): + cps_path = Path(__file__).parent.parent.parent / ( + "policyengine_us_data/datasets/cps/cps.py" + ) + content = cps_path.read_text() + + assert 'cps["is_unmarried_partner_of_household_head"]' in content + for code in ("43", "44", "46", "47"): + assert code in content diff --git a/tests/unit/test_small_enhanced_cps.py b/tests/unit/test_small_enhanced_cps.py new file mode 100644 index 000000000..9a5089cc0 --- /dev/null +++ b/tests/unit/test_small_enhanced_cps.py @@ -0,0 +1,130 @@ +import h5py +import numpy as np +import pandas as pd +import pytest + +from policyengine_us_data.datasets.cps.small_enhanced_cps import ( + _attach_clone_origin_flags, +) + + +def _write_period_group( + h5_file, variable_name: str, values_by_period: dict[int, np.ndarray] +): + group = h5_file.create_group(variable_name) + for period, values in values_by_period.items(): + group.create_dataset(str(period), data=values) + + +def test_attach_clone_origin_flags_maps_sampled_entity_ids(tmp_path): + source_path = tmp_path / "enhanced_cps_2024.h5" + with h5py.File(source_path, "w") as h5_file: + _write_period_group(h5_file, "person_id", {2024: np.array([11, 12, 13])}) + _write_period_group( + h5_file, "person_is_puf_clone", {2024: np.array([0, 1, 0], dtype=np.int8)} + ) + _write_period_group(h5_file, "tax_unit_id", {2024: np.array([21, 22])}) + _write_period_group( + h5_file, "tax_unit_is_puf_clone", {2024: np.array([1, 0], dtype=np.int8)} + ) + _write_period_group(h5_file, "spm_unit_id", {2024: np.array([31, 32])}) + _write_period_group( + h5_file, "spm_unit_is_puf_clone", {2024: np.array([0, 1], dtype=np.int8)} + ) + _write_period_group(h5_file, "family_id", {2024: np.array([41, 42])}) + _write_period_group( + h5_file, "family_is_puf_clone", {2024: np.array([1, 1], dtype=np.int8)} + ) + _write_period_group(h5_file, "household_id", {2024: np.array([51, 52])}) + _write_period_group( + h5_file, "household_is_puf_clone", {2024: np.array([0, 1], dtype=np.int8)} + ) + + sampled_data = { + "person_id": {2024: np.array([12, 11], dtype=np.int32)}, + "tax_unit_id": {2024: np.array([22, 21], dtype=np.int32)}, + "spm_unit_id": {2024: np.array([32, 31], dtype=np.int32)}, + "family_id": {2024: np.array([41, 42], dtype=np.int32)}, + "household_id": {2024: np.array([52, 51], dtype=np.int32)}, + } + + _attach_clone_origin_flags(sampled_data, source_path) + + assert sampled_data["person_is_puf_clone"][2024].tolist() == [1, 0] + assert sampled_data["tax_unit_is_puf_clone"][2024].tolist() == [0, 1] + assert sampled_data["spm_unit_is_puf_clone"][2024].tolist() == [1, 0] + assert sampled_data["family_is_puf_clone"][2024].tolist() == [1, 1] + assert sampled_data["household_is_puf_clone"][2024].tolist() == [1, 0] + + +def test_attach_clone_origin_flags_rejects_missing_sampled_id(tmp_path): + source_path = tmp_path / "enhanced_cps_2024.h5" + with h5py.File(source_path, "w") as h5_file: + for entity_name, id_base in [ + ("person", 10), + ("tax_unit", 20), + ("spm_unit", 30), + ("family", 40), + ("household", 50), + ]: + _write_period_group( + h5_file, + f"{entity_name}_id", + {2024: np.array([id_base], dtype=np.int32)}, + ) + _write_period_group( + h5_file, + f"{entity_name}_is_puf_clone", + {2024: np.array([0], dtype=np.int8)}, + ) + + sampled_data = { + "person_id": {2024: np.array([999], dtype=np.int32)}, + "tax_unit_id": {2024: np.array([20], dtype=np.int32)}, + "spm_unit_id": {2024: np.array([30], dtype=np.int32)}, + "family_id": {2024: np.array([40], dtype=np.int32)}, + "household_id": {2024: np.array([50], dtype=np.int32)}, + } + + with pytest.raises(KeyError, match="person_is_puf_clone"): + _attach_clone_origin_flags(sampled_data, source_path) + + +def test_attach_clone_origin_flags_accepts_period_keys(tmp_path): + source_path = tmp_path / "enhanced_cps_2024.h5" + with h5py.File(source_path, "w") as h5_file: + _write_period_group(h5_file, "person_id", {2024: np.array([11, 12])}) + _write_period_group( + h5_file, + "person_is_puf_clone", + {2024: np.array([0, 1], dtype=np.int8)}, + ) + for entity_name, id_base in [ + ("tax_unit", 20), + ("spm_unit", 30), + ("family", 40), + ("household", 50), + ]: + _write_period_group( + h5_file, + f"{entity_name}_id", + {2024: np.array([id_base], dtype=np.int32)}, + ) + _write_period_group( + h5_file, + f"{entity_name}_is_puf_clone", + {2024: np.array([0], dtype=np.int8)}, + ) + + period = pd.Period("2024", freq="Y") + sampled_data = { + "person_id": {period: np.array([12, 11], dtype=np.int32)}, + "tax_unit_id": {period: np.array([20], dtype=np.int32)}, + "spm_unit_id": {period: np.array([30], dtype=np.int32)}, + "family_id": {period: np.array([40], dtype=np.int32)}, + "household_id": {period: np.array([50], dtype=np.int32)}, + } + + _attach_clone_origin_flags(sampled_data, source_path) + + assert sampled_data["person_is_puf_clone"][period].tolist() == [1, 0] diff --git a/tests/unit/test_upload_completed_datasets.py b/tests/unit/test_upload_completed_datasets.py index 7a602c340..34265ec18 100644 --- a/tests/unit/test_upload_completed_datasets.py +++ b/tests/unit/test_upload_completed_datasets.py @@ -7,6 +7,7 @@ import policyengine_us_data.storage.upload_completed_datasets as upload_module from policyengine_us_data.storage.upload_completed_datasets import ( DatasetValidationError, + upload_datasets, validate_dataset, ) import policyengine_us_data.utils.dataset_validation as _dv_mod @@ -153,3 +154,79 @@ def test_validate_dataset_infers_time_period_for_flat_h5(tmp_path, monkeypatch): validate_dataset(file_path) assert _TimePeriodCheckingAggregateMicrosimulation.last_dataset.time_period == 2024 + + +def test_upload_datasets_includes_clone_diagnostics_sidecar(tmp_path, monkeypatch): + storage_folder = tmp_path / "storage" + calibration_folder = storage_folder / "calibration" + calibration_folder.mkdir(parents=True) + + cps_path = storage_folder / "cps_2024.h5" + enhanced_path = storage_folder / "enhanced_cps_2024.h5" + diagnostics_path = enhanced_path.with_suffix(".clone_diagnostics.json") + small_path = storage_folder / "small_enhanced_cps_2024.h5" + policy_db = calibration_folder / "policy_data.db" + + for path in [cps_path, enhanced_path, diagnostics_path, small_path, policy_db]: + path.write_text("placeholder") + + monkeypatch.setattr( + upload_module, + "CPS_2024", + SimpleNamespace(file_path=cps_path), + ) + monkeypatch.setattr( + upload_module, + "EnhancedCPS_2024", + SimpleNamespace(file_path=enhanced_path), + ) + monkeypatch.setattr(upload_module, "STORAGE_FOLDER", storage_folder) + monkeypatch.setattr(upload_module, "validate_dataset", lambda file_path: None) + + uploaded_files = [] + + def _capture_upload(*, files, **kwargs): + uploaded_files.extend(files) + + monkeypatch.setattr(upload_module, "upload_data_files", _capture_upload) + + upload_datasets(require_enhanced_cps=True) + + assert uploaded_files == [ + cps_path, + policy_db, + enhanced_path, + diagnostics_path, + small_path, + ] + + +def test_upload_datasets_requires_clone_diagnostics_sidecar(tmp_path, monkeypatch): + storage_folder = tmp_path / "storage" + calibration_folder = storage_folder / "calibration" + calibration_folder.mkdir(parents=True) + + cps_path = storage_folder / "cps_2024.h5" + enhanced_path = storage_folder / "enhanced_cps_2024.h5" + small_path = storage_folder / "small_enhanced_cps_2024.h5" + policy_db = calibration_folder / "policy_data.db" + + for path in [cps_path, enhanced_path, small_path, policy_db]: + path.write_text("placeholder") + + monkeypatch.setattr( + upload_module, + "CPS_2024", + SimpleNamespace(file_path=cps_path), + ) + monkeypatch.setattr( + upload_module, + "EnhancedCPS_2024", + SimpleNamespace(file_path=enhanced_path), + ) + monkeypatch.setattr(upload_module, "STORAGE_FOLDER", storage_folder) + monkeypatch.setattr(upload_module, "validate_dataset", lambda file_path: None) + monkeypatch.setattr(upload_module, "upload_data_files", lambda **kwargs: None) + + with pytest.raises(FileNotFoundError, match="clone_diagnostics"): + upload_datasets(require_enhanced_cps=True) diff --git a/tests/unit/test_weeks_worked.py b/tests/unit/test_weeks_worked.py new file mode 100644 index 000000000..ad5f801af --- /dev/null +++ b/tests/unit/test_weeks_worked.py @@ -0,0 +1,41 @@ +""" +Tests for weeks_worked extraction from CPS ASEC. + +The Census CPS ASEC exposes WKSWORK directly, which we now carry through as +the model input for future-year SPM work-expense calculations. +""" + +import numpy as np +from pathlib import Path + + +class TestWeeksWorked: + """Test suite for weeks_worked variable extraction.""" + + def test_census_cps_includes_wkswork(self): + census_cps_path = Path(__file__).parent.parent.parent / ( + "policyengine_us_data/datasets/cps/census_cps.py" + ) + content = census_cps_path.read_text() + + assert '"WKSWORK"' in content, "WKSWORK should be in PERSON_COLUMNS" + + def test_cps_maps_weeks_worked_from_wkswork(self): + cps_path = Path(__file__).parent.parent.parent / ( + "policyengine_us_data/datasets/cps/cps.py" + ) + content = cps_path.read_text() + + assert 'cps["weeks_worked"]' in content + assert "person.WKSWORK" in content + assert "np.clip(person.WKSWORK, 0, 52)" in content + + def test_weeks_worked_value_range(self): + raw_values = np.array([-4, 0, 1, 26, 52, 60]) + processed = np.clip(raw_values, 0, 52) + + assert processed.min() >= 0, "Minimum should be >= 0" + assert processed.max() <= 52, "Maximum should be <= 52" + assert processed[0] == 0, "Negative values should clip to 0" + assert processed[3] == 26, "Valid weeks should be preserved" + assert processed[5] == 52, "Values above 52 should clip to 52" diff --git a/validation/benefit_validation.py b/validation/benefit_validation.py index cf4689720..14360782d 100644 --- a/validation/benefit_validation.py +++ b/validation/benefit_validation.py @@ -9,41 +9,75 @@ import numpy as np from policyengine_us import Microsimulation from policyengine_us_data.datasets.cps.enhanced_cps import EnhancedCPS - - -def analyze_benefit_underreporting(): - """Analyze impact of CPS benefit underreporting on results.""" - - sim = Microsimulation(dataset=EnhancedCPS, dataset_year=2022) - - programs = { +from policyengine_us_data.utils.census_spm import ( + build_census_spm_capped_housing_subsidy_target, +) +from policyengine_us_data.utils.hud_housing import ( + build_hud_user_housing_assistance_benchmark, +) + + +def get_program_benchmarks(year: int): + housing_spm_target = build_census_spm_capped_housing_subsidy_target(year) + housing_hud_target = build_hud_user_housing_assistance_benchmark(year) + return { "snap": { "variable": "snap", - "admin_total": 114.1, # billions, from USDA + "benchmark_total": 114.1, # billions, from USDA + "benchmark_source": "USDA", "participation_rate": 0.82, # from USDA + "weight_variable": "spm_unit_weight", }, "ssi": { "variable": "ssi", - "admin_total": 56.7, # from SSA + "benchmark_total": 56.7, # from SSA + "benchmark_source": "SSA", "participation_rate": 0.93, + "weight_variable": "spm_unit_weight", }, "tanf": { "variable": "tanf", - "admin_total": 7.1, # from HHS + "benchmark_total": 7.1, # from HHS + "benchmark_source": "HHS", "participation_rate": 0.23, + "weight_variable": "spm_unit_weight", }, - "housing": { - "variable": "housing_benefit", - "admin_total": 50.3, # from HUD - "participation_rate": 0.76, + "housing_spm_capped": { + "variable": "spm_unit_capped_housing_subsidy", + "benchmark_total": housing_spm_target["value"] / 1e9, + "benchmark_source": housing_spm_target["source"], + "participation_rate": np.nan, + "weight_variable": "spm_unit_weight", + }, + "housing_assistance_hud_user": { + "variable": "housing_assistance", + "benchmark_total": housing_hud_target["annual_spending_total"] / 1e9, + "benchmark_source": housing_hud_target["source"], + "benchmark_participants_millions": ( + housing_hud_target["reported_households"] / 1e6 + ), + "participation_rate": np.nan, + "weight_variable": "household_weight", + "map_to": "household", }, } + +def analyze_benefit_underreporting(): + """Analyze impact of CPS benefit underreporting on results.""" + + year = 2022 + sim = Microsimulation(dataset=EnhancedCPS, dataset_year=year) + programs = get_program_benchmarks(year) + results = [] for program, info in programs.items(): # Calculate totals - benefit = sim.calculate(info["variable"], 2022) - weight = sim.calculate("household_weight", 2022) + benefit_kwargs = {} + if info.get("map_to"): + benefit_kwargs["map_to"] = info["map_to"] + benefit = sim.calculate(info["variable"], year, **benefit_kwargs) + weight = sim.calculate(info["weight_variable"], year) # Total benefits total = (benefit * weight).sum() / 1e9 # billions @@ -53,15 +87,24 @@ def analyze_benefit_underreporting(): weighted_participants = ((benefit > 0) * weight).sum() / 1e6 # millions # Underreporting factor - underreporting = info["admin_total"] / total if total > 0 else np.inf + benchmark_ratio = info["benchmark_total"] / total if total > 0 else np.inf + benchmark_participants = info.get("benchmark_participants_millions") + participant_ratio = ( + weighted_participants / benchmark_participants + if benchmark_participants not in (None, 0) + else np.nan + ) results.append( { "program": program, "enhanced_cps_total": total, - "admin_total": info["admin_total"], - "underreporting_factor": underreporting, + "benchmark_total": info["benchmark_total"], + "benchmark_source": info["benchmark_source"], + "benchmark_ratio": benchmark_ratio, "participants_millions": weighted_participants, + "benchmark_participants_millions": benchmark_participants, + "participant_ratio": participant_ratio, "mean_benefit": ( benefit[benefit > 0].mean() if (benefit > 0).any() else 0 ), @@ -77,11 +120,11 @@ def validate_program_interactions(): sim = Microsimulation(dataset=EnhancedCPS, dataset_year=2022) # Get program benefits - snap = sim.calculate("snap", 2022) > 0 - medicaid = sim.calculate("medicaid", 2022) > 0 - ssi = sim.calculate("ssi", 2022) > 0 - tanf = sim.calculate("tanf", 2022) > 0 - housing = sim.calculate("housing_benefit", 2022) > 0 + snap = sim.calculate("snap", 2022, map_to="household") > 0 + medicaid = sim.calculate("medicaid", 2022, map_to="household") > 0 + ssi = sim.calculate("ssi", 2022, map_to="household") > 0 + tanf = sim.calculate("tanf", 2022, map_to="household") > 0 + housing = sim.calculate("housing_assistance", 2022, map_to="household") > 0 weight = sim.calculate("household_weight", 2022)