diff --git a/docs/api/regionprocessor.rst b/docs/api/regionprocessor.rst index d6ed5337..30bf4c53 100644 --- a/docs/api/regionprocessor.rst +++ b/docs/api/regionprocessor.rst @@ -4,4 +4,4 @@ =================== .. autoclass:: RegionProcessor - :members: from_directory, validate_with_definition, apply, check_region_aggregation + :members: from_directory, validate_with_definition, apply, check_region_aggregation, get_common_region_country_mapping, get_native_region_country_mapping diff --git a/nomenclature/processor/region.py b/nomenclature/processor/region.py index 762a9a82..99d7607e 100644 --- a/nomenclature/processor/region.py +++ b/nomenclature/processor/region.py @@ -129,10 +129,12 @@ def convert_to_list(cls, v): @field_validator("native_regions") @classmethod - def validate_native_regions_name(cls, v, info: ValidationInfo): - """Checks if a native region occurs a maximum of two ways: - * at most once in both keep name AND rename format - * only once in either keep name OR rename format""" + def check_native_region_name(cls, v, info: ValidationInfo): + """ + Checks if a native region name occurs at most once in each format: + * at most once in keep name format + * at most once in rename format + """ keep = [nr.name for nr in v if nr.rename is None] rename = [nr.name for nr in v if nr.rename is not None] keep_dups = [item for item, count in Counter(keep).items() if count > 1] @@ -150,7 +152,8 @@ def validate_native_regions_name(cls, v, info: ValidationInfo): @field_validator("native_regions") @classmethod - def validate_native_regions_target(cls, v, info: ValidationInfo): + def check_native_regions_unique(cls, v, info: ValidationInfo): + """Check that the target native region names are not duplicated.""" target_names = [nr.target_native_region for nr in v] duplicates = [ item for item, count in Counter(target_names).items() if count > 1 @@ -168,7 +171,7 @@ def validate_native_regions_target(cls, v, info: ValidationInfo): @field_validator("common_regions") @classmethod - def validate_common_regions(cls, v, info: ValidationInfo): + def check_common_regions_unique(cls, v, info: ValidationInfo): """Check for duplicate common (target) regions and self-referencing (source in target) regions.""" names = [cr.name for cr in v] @@ -189,7 +192,7 @@ def validate_common_regions(cls, v, info: ValidationInfo): def check_native_or_common_regions( cls, v: "RegionAggregationMapping" ) -> "RegionAggregationMapping": - # Check that we have at least one of the two: native and common regions + """Check that we have at least one of the two: native regions or common regions""" if not v.native_regions and not v.common_regions: raise ValueError( "At least one of 'native_regions' and 'common_regions' must be " @@ -199,11 +202,10 @@ def check_native_or_common_regions( @model_validator(mode="after") @classmethod - def check_illegal_renaming( + def check_native_common_region_no_overlap( cls, v: "RegionAggregationMapping" ) -> "RegionAggregationMapping": - """Check if any renaming overlaps with common regions""" - + """Check that native region target names do not overlap with common region names.""" native_region_names = {nr.target_native_region for nr in v.native_regions} common_region_names = {cr.name for cr in v.common_regions} overlap = list(native_region_names & common_region_names) @@ -236,6 +238,7 @@ def check_exclude_common_region_overlap( def check_constituent_regions_in_native_regions( cls, v: "RegionAggregationMapping" ) -> "RegionAggregationMapping": + """Check that all constituent regions in common regions are listed as native regions.""" if v.common_regions and v.native_regions: if missing := set( [cr for r in v.common_regions for cr in r.constituent_regions] @@ -250,8 +253,7 @@ def from_file(cls, file: Path | str) -> "RegionAggregationMapping": Parameters ---------- file : Path | str - Path to a yaml file which contains region aggregation information for one - model. + Path to a file which contains region aggregation information for one model. Returns ------- @@ -261,7 +263,7 @@ def from_file(cls, file: Path | str) -> "RegionAggregationMapping": Notes ----- - This function is used to convert a model mapping yaml file into a dictionary + This function is used to convert a model mapping file into a dictionary which is used to initialize a RegionAggregationMapping. """ @@ -277,6 +279,18 @@ def from_file(cls, file: Path | str) -> "RegionAggregationMapping": @classmethod def from_yaml(cls, file: Path) -> "RegionAggregationMapping": + """Initialize a RegionAggregationMapping from a yaml file. + + Parameters + ---------- + file : Path + Path to a yaml file which contains region aggregation information for one model. + + Returns + ------- + RegionAggregationMapping + The resulting region aggregation mapping. + """ try: with open(file, "r", encoding="utf-8") as f: mapping_input = yaml.safe_load(f) @@ -316,7 +330,19 @@ def from_yaml(cls, file: Path) -> "RegionAggregationMapping": return cls(**mapping_input) @classmethod - def from_excel(cls, file) -> "RegionAggregationMapping": + def from_excel(cls, file: Path) -> "RegionAggregationMapping": + """Initialize a RegionAggregationMapping from a spreadsheet file. + + Parameters + ---------- + file : Path + Path to a spreadsheet file which contains region aggregation information for one model. + + Returns + ------- + RegionAggregationMapping + The resulting region aggregation mapping. + """ try: model = pd.read_excel(file, sheet_name="Model", usecols="B", nrows=1).iloc[ 0, 0 @@ -370,7 +396,7 @@ def from_excel(cls, file) -> "RegionAggregationMapping": 0, CommonRegion( name="World", constituent_regions=constituent_world_regions - ) + ), ) except Exception as error: raise ValueError(f"{error} in {get_relative_path(file)}") from error @@ -383,45 +409,49 @@ def from_excel(cls, file) -> "RegionAggregationMapping": @property def all_regions(self) -> list[str]: + """List of all native and common regions in the mapping.""" # For the native regions we take the **renamed** (if given) names nr_list = [x.target_native_region for x in self.native_regions or []] return nr_list + self.common_region_names @property def model_native_region_names(self) -> list[str]: - # List of the **original** model native region names + """List of the original model native region names.""" return [x.name for x in self.native_regions or []] @property def common_region_names(self) -> list[str]: - # List of the common region names + """List of the common region names.""" return [x.name for x in self.common_regions or []] @property def rename_mapping(self) -> dict[str, str]: + """Mapping from original native region names to renamed native region names.""" return { r.name: r.target_native_region for r in self.native_regions or [] if r.rename is not None } + @property + def reverse_rename_mapping(self) -> dict[str, str]: + """Mapping from renamed native region names to original native region names.""" + return {renamed: original for original, renamed in self.rename_mapping.items()} + @property def upload_native_regions(self) -> list[str]: + """List of native region names to be uploaded.""" return [ native_region.target_native_region for native_region in self.native_regions or [] ] - @property - def reverse_rename_mapping(self) -> dict[str, str]: - return {renamed: original for original, renamed in self.rename_mapping.items()} - @property def models(self) -> list[str]: return self.model def check_unexpected_regions(self, df: IamDataFrame) -> None: - # Raise error if a region in the input data is not used in the model mapping + """Raise an error if there are regions in the input data that are not in the model mapping.""" if regions_not_found := set(df.region) - set( self.model_native_region_names @@ -466,7 +496,8 @@ def serialize_common_regions(self, common_regions) -> list: for common_region in common_regions ] - def to_yaml(self, file) -> None: + def to_yaml(self, file: Path) -> None: + """Write the RegionAggregationMapping to a yaml file.""" with open(file, "w", encoding="utf-8") as f: yaml.dump( self.model_dump(mode="json", exclude_defaults=True, exclude={"file"}), @@ -781,17 +812,67 @@ def _apply_region_processing( return IamDataFrame(_data, meta=model_df.meta), difference def revert(self, df: pyam.IamDataFrame) -> pyam.IamDataFrame: + """Revert region processing by removing common regions and applying inverse renaming.""" model_dfs = [] for model in df.model: model_df = df.filter(model=model) if mapping := self.mappings.get(model): - # remove common regions, then apply inverse-renaming of native-regions model_df = model_df.filter( region=mapping.common_region_names, keep=False ).rename(region=mapping.reverse_rename_mapping) model_dfs.append(model_df) return pyam.concat(model_dfs) + def get_common_region_country_mapping(self, model: str) -> dict[str, list[str]]: + """Return a mapping from common region names to constituent countries for a model. + + Parameters + ---------- + model : str + Name of the model. + + Returns + ------- + dict[str, list[str]] + Dictionary mapping each common region name to the aggregated list of + countries from all of its constituent native regions. + """ + mapping = self.mappings[model] + result: dict[str, list[str]] = {} + for common_region in mapping.common_regions: + countries: list[str] = [] + for constituent in common_region.constituent_regions: + # Apply renaming if applicable to get the correct region name + target_name = mapping.rename_mapping.get(constituent, constituent) + region_code = self.region_codelist[target_name] + if region_code.countries: + countries.extend(region_code.countries) + result[common_region.name] = countries + return result + + def get_native_region_country_mapping(self, model: str) -> dict[str, list[str]]: + """Return a mapping from (renamed) native region names to countries for a model. + + Parameters + ---------- + model : str + Name of the model. + + Returns + ------- + dict[str, list[str]] + Dictionary mapping each native region name (after any renaming) to its + list of countries. + """ + mapping = self.mappings[model] + return { + nr.target_native_region: self.region_codelist[ + nr.target_native_region + ].countries + or [] + for nr in mapping.native_regions + } + def _aggregate_region(df, var, *regions, **kwargs): """Perform region aggregation with kwargs catching inconsistent-index errors""" diff --git a/tests/test_region_aggregation.py b/tests/test_region_aggregation.py index cacd39c5..33464a8a 100644 --- a/tests/test_region_aggregation.py +++ b/tests/test_region_aggregation.py @@ -14,6 +14,8 @@ RegionProcessor, process, ) +from nomenclature.code import RegionCode +from nomenclature.codelist import RegionCodeList, VariableCodeList from nomenclature.processor.region import CommonRegion, NativeRegion TEST_FOLDER_REGION_PROCESSING = TEST_DATA_DIR / "region_processing" @@ -391,3 +393,82 @@ def test_model_mapping_from_excel_to_yaml(tmp_path): TEST_DATA_DIR / "model_registration" / "excel_mapping_reference.yaml" ) assert obs == exp + + +@pytest.fixture +def region_codelist_with_countries(): + """RegionCodeList where each native region target name has countries assigned.""" + return RegionCodeList( + name="region", + mapping={ + "alternative_name_a": RegionCode( + name="alternative_name_a", countries=["Germany", "France"] + ), + "alternative_name_b": RegionCode( + name="alternative_name_b", countries=["China"] + ), + "region_c": RegionCode(name="region_c", countries=["Japan"]), + "common_region_1": RegionCode(name="common_region_1"), + "common_region_2": RegionCode(name="common_region_2"), + }, + ) + + +@pytest.fixture +def region_processor_with_countries(region_codelist_with_countries): + """RegionProcessor with a mapping where each native region target name has countries assigned.""" + mapping = RegionAggregationMapping.from_file( + TEST_FOLDER_REGION_AGGREGATION / "working_mapping.yaml" + ) + return RegionProcessor( + mappings={"model_a": mapping}, + region_codelist=region_codelist_with_countries, + variable_codelist=VariableCodeList(name="variable"), + ) + + +def test_get_common_region_country_mapping( + region_processor_with_countries, +): + obs = region_processor_with_countries.get_common_region_country_mapping("model_a") + assert obs == { + "common_region_1": ["Germany", "France", "China"], + "common_region_2": ["Japan"], + } + + +def test_get_native_region_country_mapping( + region_processor_with_countries, +): + obs = region_processor_with_countries.get_native_region_country_mapping("model_a") + assert obs == { + "alternative_name_a": ["Germany", "France"], + "alternative_name_b": ["China"], + "region_c": ["Japan"], + } + + +def test_get_common_region_country_mapping_no_countries(): + """Regions without a countries attribute return an empty list.""" + mapping = RegionAggregationMapping.from_file( + TEST_FOLDER_REGION_AGGREGATION / "working_mapping.yaml" + ) + codelist = RegionCodeList( + name="region", + mapping={ + "alternative_name_a": RegionCode(name="alternative_name_a"), + "alternative_name_b": RegionCode(name="alternative_name_b"), + "region_c": RegionCode(name="region_c"), + "common_region_1": RegionCode(name="common_region_1"), + "common_region_2": RegionCode(name="common_region_2"), + }, + ) + region_processor = RegionProcessor( + mappings={"model_a": mapping}, + region_codelist=codelist, + variable_codelist=VariableCodeList(name="variable"), + ) + assert region_processor.get_common_region_country_mapping("model_a") == { + "common_region_1": [], + "common_region_2": [], + }