diff --git a/cdds/cdds/extract/validate.py b/cdds/cdds/extract/validate.py index 2737e60d4..006e783ed 100644 --- a/cdds/cdds/extract/validate.py +++ b/cdds/cdds/extract/validate.py @@ -1,14 +1,15 @@ -# (C) British Crown Copyright 2023-2025, Met Office. +# (C) British Crown Copyright 2023-2026, Met Office. # Please see LICENSE.md for license details. import logging import os +import argparse from metomi.isodatetime.data import Calendar from cdds.common import generate_datestamps_nc, generate_datestamps_pp from cdds.common.cdds_files.cdds_directories import component_directory, log_directory -from cdds.common.plugins.plugins import PluginStore +from cdds.common.plugins.plugins import PluginStore, CddsPlugin from cdds.common.request.request import read_request from cdds.extract.common import ( FileContentError, @@ -24,87 +25,198 @@ ) from cdds.extract.constants import STREAMTYPE_NC, STREAMTYPE_PP from cdds.extract.filters import Filters +from cdds.common.request.request import Request -def validate_streams(streams, args): - logger = logging.getLogger(__name__) +def setup_mappings(request: Request, plugin: CddsPlugin) -> Filters: + """Sets up the basic mapping information. - request = read_request(args.request) - plugin = PluginStore.instance().get_plugin() - model_params = plugin.models_parameters(request.metadata.model_id) - stream_file_info = model_params.stream_file_info() - stream = streams[0] + Parameters + ---------- + request: Request + Object that stores the information about the request. + plugin: CddsPlugin + CDDS plugin interface for supported models. + Returns + ------- + mappings: Filters + The basic mapping information for the data request. + """ var_list = configure_variables(os.path.join(component_directory(request, "prepare"), plugin.requested_variables_list_filename(request))) - - # configure mappings for each variables mappings = Filters(plugin.proc_directory(request), var_list) mappings.set_mappings(request) - mapping_status = configure_mappings(mappings) + return mappings + + +def configure_mapping_for_each_variable(mappings: Filters, request: Request, stream: str) -> Filters: + """Constructs the mapping information for each variable in a single stream using the given request. + + Parameters + ---------- + mappings: Filters + The mappings for the data request. + request: Request + Object that stores the information about the request. + stream: str + The stream. + + Returns + ------- + Filters + The complete mappings for the data request. + """ + mappings.stream = stream if request.data.mass_ensemble_member: mappings.ensemble_member_id = request.data.mass_ensemble_member - file_type = get_streamtype(stream) - mappings.source = build_mass_location(request.data.mass_data_class, - request.data.model_workflow_id, - stream, - file_type, - request.data.mass_ensemble_member) + mappings.source = build_mass_location(request.data.mass_data_class, request.data.model_workflow_id, stream, + get_streamtype(stream), request.data.mass_ensemble_member) else: mappings.ensemble_member_id = None - mappings.stream = stream - # generate expected filenames - file_frequency = stream_file_info.file_frequencies[stream].frequency + return mappings - stream_validation = StreamValidationResult(stream) +def calculate_file_frequency(plugin: CddsPlugin, request: Request, stream: str) -> str: + """Calculates the file frequency for a single given stream. + + Parameters + ---------- + plugin: CddsPlugin + CDDS plugin interface for supported models. + request: Request + Object that stores the information about the request. + stream: str + The stream. + + Returns + ------- + str + The file frequency. + """ + model_params = plugin.models_parameters(request.metadata.model_id) + stream_file_info = model_params.stream_file_info() + + return stream_file_info.file_frequencies[stream].frequency + + +def process_pp_streamtype(request: Request, file_frequency: str, mappings: Filters) -> list: + """Processes any files of type pp. + + Parameters + ---------- + request: Request + Object that stores the information about the request. + file_frequency: str + The frequency. + mappings: Filters + The mappings for the data request. + + Returns + ------- + list + List of filenames with datestamps. + """ + datestamps, _ = generate_datestamps_pp(request.data.start_date, request.data.end_date, file_frequency) + + return mappings.generate_filenames_pp(datestamps) + + +def process_nc_streamtype(request: Request, file_frequency: str, mappings: Filters) -> list: + """Processes any files of type nc. + + Parameters + ---------- + request: Request + Object that stores the information about the request. + file_frequency: str + The frequency. + mappings: Filters + The mappings for the data request. + + Returns + ------- + list + List of filenames with datestamps and substreams. + """ + datestamps, _ = generate_datestamps_nc(request.data.start_date, request.data.end_date, file_frequency) + filenames = [] + substreams = list(mappings.filters.keys()) + for sub_stream in substreams: + filenames += mappings.generate_filenames_nc(datestamps, sub_stream) + + return filenames + + +def validate_streams(streams: list, args: argparse.Namespace) -> StreamValidationResult: + """Validates the given streams. + + Parameters + ---------- + streams: list + The streams to validate. + args: argparse.Namespace + Command line arguments given by the user when running cdds_validate_streams. Includes args.request and + args.streams. + + Returns + ------- + StreamValidationResult + An object to hold results from the stream validation. + """ + logger = logging.getLogger(__name__) + request = read_request(args.request) + plugin = PluginStore.instance().get_plugin() + stream = streams[0] + mappings = setup_mappings(request, plugin) + mappings = configure_mapping_for_each_variable(mappings, request, stream) + mapping_status = configure_mappings(mappings) + file_frequency = calculate_file_frequency(plugin, request, stream) + stream_validation = StreamValidationResult(stream) stream_validation.add_mappings(mappings) if stream in mapping_status: - data_target = get_data_target(os.path.join(plugin.data_directory(request), 'input'), - request.data.model_workflow_id, stream) + target_path_root = os.path.join(plugin.data_directory(request), 'input') + data_target = get_data_target(target_path_root, request.data.model_workflow_id, stream) streamtype = get_streamtype(stream) _, _, _, stash_codes = (mappings.format_filter(streamtype, stream)) if streamtype == STREAMTYPE_PP: - datestamps, _ = generate_datestamps_pp(request.data.start_date, request.data.end_date, file_frequency) - filenames = mappings.generate_filenames_pp(datestamps) - + filenames = process_pp_streamtype(request, file_frequency, mappings) elif streamtype == STREAMTYPE_NC: - datestamps, _ = generate_datestamps_nc(request.data.start_date, request.data.end_date, file_frequency) - filenames = [] - substreams = list(mappings.filters.keys()) - for sub_stream in substreams: - filenames += mappings.generate_filenames_nc(datestamps, sub_stream) - + filenames = process_nc_streamtype(request, file_frequency, mappings) validate(data_target, stream, stash_codes, stream_validation, filenames, file_frequency) else: logger.info('skipped [{}]: there are no variables requiring this stream'.format(stream)) + stream_validation.log_results(log_directory(request, "extract")) return stream_validation -def validate(path, stream, stash_codes, validation_result, filenames, file_frequency): - """Simple validation based on checking correct number of files have - been extracted, and stash codes comparison in the case of pp streams. - - In the case of ncdf files, it tests if they can be opened at all. +def validate(path: str, stream: str, stash_codes: set, validation_result: StreamValidationResult, filenames: list, + file_frequency: str) -> None: + """Simple validation based on checking correct number of files have been extracted, and stash codes comparison in + the case of pp streams. In the case of ncdf files, it tests if they can be opened at all. Parameters ---------- - path:str - directory path containing files to validate - stream:dict - stream attributes - stash_codes : set - a set of short stash codes appearing in filters - validation_result: cdds.common.StreamValidationResult - An object to hold results from the stream validation + path: str + Directory path containing files to validate. + stream: str + Stream attributes. + stash_codes: set + A set of short stash codes appearing in filters. + validation_result: StreamValidationResult + An object to hold results from the stream validation. + filenames: list + A list of all found files. + file_frequency: str + The frequency. """ streamtype = get_streamtype(stream) - validate_file_names(path, validation_result, filenames, streamtype) + validate_file_names(path, streamtype, filenames, validation_result) if streamtype == STREAMTYPE_PP: logger = logging.getLogger(__name__) logger.info("Checking STASH fields") @@ -115,41 +227,40 @@ def validate(path, stream, stash_codes, validation_result, filenames, file_frequ validate_directory_netcdf(path, validation_result) -def validate_file_names(path, validation_result, filenames, file_type): - """Compare a list of expected files against the files on disk. If strict=True then - validation will fail if there are additional files that are not expected. +def validate_file_names(path: str, file_type: str, filenames: list, + validation_result: StreamValidationResult) -> None: + """Compare a list of expected files against the files on disk. If strict=True then validation will fail if there are + additional files that are not expected. Parameters ---------- path: str Path to the dataset. - stream: dict - Stream description dictionary. - substreams: list - List of expected substreams. - validation_result: cdds.common.StreamValidationResult - An object to hold results from the stream validation + file_type: str + File type. + filenames: list + A list of all found files. + validation_result: StreamValidationResult + An object to hold results from the stream validation. """ logger = logging.getLogger(__name__) logger.info("Checking for missing and unexpected files") - actual_files = set([file for file in os.listdir(path) if file.endswith(file_type)]) expected_files = set(filenames) validation_result.add_file_names(expected_files, actual_files) -def validate_directory_netcdf(path, validation_result): - """Checks that |netCDF| files at provided location can be read. - Returns overall validation status, error message(s), and a list - of unreadable files (if present). +def validate_directory_netcdf(path: str, validation_result: StreamValidationResult) -> None: + """Checks that |netCDF| files at provided location can be read. Returns overall validation status, error message(s), + and a list of unreadable files (if present). Parameters ---------- path: str Path pointing to the location of |netCDF| dataset. - validation_result: cdds.common.StreamValidationResult - An object to hold results from the stream validation + validation_result: StreamValidationResult + An object to hold results from the stream validation. """ logger = logging.getLogger(__name__) logger.info("Checking netCDF files in \"{}\"".format(path)) @@ -161,7 +272,7 @@ def validate_directory_netcdf(path, validation_result): def get_stash_fields(path: str, validation_result: StreamValidationResult) -> dict[str, dict[str, int]]: - """Validates if pp files in a given location contain all requiredmstash codes, + """Validates if pp files in a given location contain all required stash codes. Parameters ---------- @@ -172,7 +283,7 @@ def get_stash_fields(path: str, validation_result: StreamValidationResult) -> di Returns ------- - dict[in, int] + dict[str, dict[str, int]] """ stash_in_file = {} pp_files = [file for file in os.listdir(path) if file.endswith('.pp')] @@ -184,26 +295,23 @@ def get_stash_fields(path: str, validation_result: StreamValidationResult) -> di validation_result.add_file_content_error( FileContentError(os.path.join(path, pp_file), "unreadable file")) stash_in_file[pp_file] = stash + return stash_in_file -def check_expected_stash( - stash_in_file: dict[str, dict[int, int]], - validation_result: StreamValidationResult, - path: str, - expected_stash: set[int], -): +def check_expected_stash(stash_in_file: dict[str, dict], validation_result: StreamValidationResult, path: str, + expected_stash: set[int]) -> None: """Checks that all the expected stash codes are found in each file. Parameters ---------- - stash_in_file : dict[str, dict[int, int]] + stash_in_file: dict[str, dict] Stash entries in files. - validation_result : StreamValidationResult + validation_result: StreamValidationResult Validation results for a given stream. - path : str + path: str Path to files. - expected_stash : set[int] + expected_stash: set[int] The set of expected stash codes. """ for file, stash in stash_in_file.items(): @@ -216,29 +324,25 @@ def check_expected_stash( validation_result.add_file_content_error(error) -def check_consistent_stash( - stash_in_file: dict[str, dict[str, int]], - validation_result: StreamValidationResult, - path: str, - file_frequency: str -) -> None: +def check_consistent_stash(stash_in_file: dict[str, dict[str, int]], validation_result: StreamValidationResult, + path: str, file_frequency: str) -> None: """Checks that the number of stash entries is consistent across all files. Parameters ---------- - stash_in_file : dict + stash_in_file: dict[str, dict[str, int]] Stash entries in files. - validation_result : StreamValidationResult + validation_result: StreamValidationResult Validation results for a given stream. - path : str + path: str Path to files. + file_frequency: str + The frequency. """ if Calendar.default().mode == "gregorian" and file_frequency in ["monthly", "seasonal"]: logger = logging.getLogger(__name__) - logger.info( - f"Skipping stash consistency check as CDDS does not currently validate \ - {file_frequency} files when using the gregorian calendar." - ) + logger.info(f"Skipping stash consistency check as CDDS does not currently validate {file_frequency} files when " + "using the gregorian calendar.") return None reference_file: str diff --git a/cdds/cdds/tests/test_extract/data/CMIP6_CMIP_piControl_UKESM1-0-LL_ap5.json b/cdds/cdds/tests/test_extract/data/CMIP6_CMIP_piControl_UKESM1-0-LL_ap5.json new file mode 100644 index 000000000..24b5f1d1a --- /dev/null +++ b/cdds/cdds/tests/test_extract/data/CMIP6_CMIP_piControl_UKESM1-0-LL_ap5.json @@ -0,0 +1,72 @@ +{ + "experiment_id":"piControl", + "history":[ + { + "comment":"Requested variables file created.", + "time":"2026-03-04T09:17:13.845839" + } + ], + "metadata":{}, + "mip":"CMIP", + "mip_era":"CMIP6", + "model_id":"UKESM1-0-LL", + "model_type":"AOGCM BGC AER CHEM", + "production_info":"Produced using CDDS Prepare version \"3.3.0.dev0+513_refactor_extract_validation.52e95fcf\"", + "requested_variables":[ + { + "active":true, + "cell_methods":"area: time: mean", + "comments":[], + "dimensions":[ + "longitude", + "latitude", + "time" + ], + "frequency":"mon", + "in_mappings":true, + "in_model":true, + "label":"emiso2", + "miptable":"AERmon", + "producible":"yes", + "stream":"ap4" + }, + { + "active":true, + "cell_methods":"area: time: mean", + "comments":[], + "dimensions":[ + "longitude", + "latitude", + "time" + ], + "frequency":"mon", + "in_mappings":true, + "in_model":true, + "label":"rsds", + "miptable":"Amon", + "producible":"yes", + "stream":"ap5" + }, + { + "active":true, + "cell_methods":"area: mean where land time: mean", + "comments":[], + "dimensions":[ + "longitude", + "latitude", + "time" + ], + "frequency":"mon", + "in_mappings":true, + "in_model":true, + "label":"mrso", + "miptable":"Lmon", + "producible":"yes", + "stream":"ap5" + } + ], + "status":"ok", + "suite_branch":"cdds", + "suite_id":"u-aw310", + "suite_revision":"HEAD" +} diff --git a/cdds/cdds/tests/test_extract/data/minimal_variable_list.txt b/cdds/cdds/tests/test_extract/data/minimal_variable_list.txt new file mode 100644 index 000000000..32a3bcb1a --- /dev/null +++ b/cdds/cdds/tests/test_extract/data/minimal_variable_list.txt @@ -0,0 +1,2 @@ +Amon/rsds:ap5 +Lmon/mrso:ap5 \ No newline at end of file diff --git a/cdds/cdds/tests/test_extract/data/test_request_minimal.cfg b/cdds/cdds/tests/test_extract/data/test_request_minimal.cfg new file mode 100644 index 000000000..b37f53f43 --- /dev/null +++ b/cdds/cdds/tests/test_extract/data/test_request_minimal.cfg @@ -0,0 +1,32 @@ +[metadata] +branch_date_in_child = 1960-01-01T00:00:00 +branch_date_in_parent = 1960-01-01T00:00:00 +branch_method = standard +calendar = 360_day +experiment_id = piControl +institution_id = MOHC +mip = CMIP +mip_era = CMIP6 +parent_experiment_id = piControl-spinup +parent_mip = CMIP +parent_mip_era = CMIP6 +parent_variant_label = r1i1p1f2 +sub_experiment_id = none +variant_label = r1i1p1f2 +model_id = UKESM1-0-LL +model_type = AOGCM BGC AER CHEM + +[common] +package = round-1 +root_proc_dir = cdds/cdds/tests/test_extract/data/proc +root_data_dir = cdds/cdds/tests/test_extract/data/data + +[data] +end_date = 1970-06-01T00:00:00 +start_date = 1970-01-01T00:00:00 +model_workflow_id = u-aw310 +variable_list_file = cdds/cdds/tests/test_extract/data/minimal_variable_list.txt +streams = ap5 + +[conversion] +mip_convert_plugin = HadGEM3 diff --git a/cdds/cdds/tests/test_extract/test_validate.py b/cdds/cdds/tests/test_extract/test_validate.py new file mode 100644 index 000000000..51feda108 --- /dev/null +++ b/cdds/cdds/tests/test_extract/test_validate.py @@ -0,0 +1,50 @@ +# (C) British Crown Copyright 2026, Met Office. +# Please see LICENSE.md for license details. + +"""Tests for validate in the extract module""" + +import unittest + +from cdds.common.plugins.plugins import PluginStore +from cdds.common.request.request import read_request +from cdds.extract.common import configure_variables +from cdds.extract.filters import Filters +from cdds.extract.validate import configure_mapping_for_each_variable, calculate_file_frequency, process_pp_streamtype + + +class TestValidate(unittest.TestCase): + def setUp(self): + request_filepath = "cdds/cdds/tests/test_extract/data/test_request_minimal.cfg" + variables_json_filepath = "cdds/cdds/tests/test_extract/data/CMIP6_CMIP_piControl_UKESM1-0-LL_ap5.json" + self.request = read_request(request_filepath) + self.stream = "ap5" + self.plugin = PluginStore.instance().get_plugin() + self.mappings = Filters(self.plugin.proc_directory(self.request), configure_variables(variables_json_filepath)) + self.mappings.suite_id = self.request.data.model_workflow_id + self.mappings.stream = self.stream + + def test_configure_mapping_for_each_variable(self): + output = configure_mapping_for_each_variable(self.mappings, self.request, self.stream) + id_msg = f"Incorrect ensemble member id configured, expected: 'None', got: '{output.ensemble_member_id}'" + + self.assertEqual(output.stream, "ap5", f"Incorrect stream configured, expected: 'ap5', got: '{output.stream}'") + self.assertEqual(output.ensemble_member_id, None, id_msg) + self.assertEqual(output.source, "", f"Incorrect source, expected: '', got: '{output.source}'") + + def test_calculate_file_frequency(self): + output = calculate_file_frequency(self.plugin, self.request, self.stream) + msg = f"Incorrect file frequency identified, expected: 'monthly', got: '{output}'" + + self.assertEqual(output, "monthly", msg) + + def test_process_pp_streamtype(self): + output = process_pp_streamtype(self.request, "monthly", self.mappings) + expected = ["aw310a.p51970jan.pp", "aw310a.p51970feb.pp", "aw310a.p51970mar.pp", "aw310a.p51970apr.pp", + "aw310a.p51970may.pp"] + msg = f"Incorrect filename list produced for pp files, expected: '{expected}', got: '{output}'" + + self.assertEqual(output, expected, msg) + + +if __name__ == "__main__": + unittest.main()