diff --git a/docs/history.md b/docs/history.md index 1adbbfa0..47d1d141 100644 --- a/docs/history.md +++ b/docs/history.md @@ -10,6 +10,7 @@ * ADD: ``incomplete_sweep`` parameter (``"drop"``/``"pad"``) to ``open_nexradlevel2_datatree`` for handling incomplete sweeps in partial volume data ({pull}`332`) by [@aladinor](https://github.com/aladinor) * ADD: Notebook example for streaming NEXRAD Level 2 chunks from S3 (``nexrad_read_chunks.ipynb``) ({pull}`332`) by [@aladinor](https://github.com/aladinor) * ADD: Comprehensive test suite for chunk list-input and incomplete sweep handling ({pull}`332`) by [@aladinor](https://github.com/aladinor) +* MNT: Add ``cfradial1_sgp_dtree`` session fixture and refactor tests in ``test_util.py`` and ``test_accessors.py`` to use it ({issue}`346`) by [@aladinor](https://github.com/aladinor) ## 0.11.1 (2026-02-03) diff --git a/tests/conftest.py b/tests/conftest.py index 0ed531a1..954df25d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,14 @@ def file_or_filelike(request): return request.param +@pytest.fixture(scope="session") +def cfradial1_sgp_dtree(): + import xradar as xd + + filename = DATASETS.fetch("sample_sgp_data.nc") + return xd.io.open_cfradial1_datatree(filename) + + @pytest.fixture(scope="session") def cfradial1_file(tmp_path_factory): return DATASETS.fetch("cfrad.20080604_002217_000_SPOL_v36_SUR.nc") diff --git a/tests/test_accessors.py b/tests/test_accessors.py index b8120888..beea5d38 100644 --- a/tests/test_accessors.py +++ b/tests/test_accessors.py @@ -111,64 +111,38 @@ def test_crs_datatree(): ), "CRS coordinate is missing in DataTree" -def test_map_over_sweeps_apply_dummy_function(): - """ - Test applying a dummy function to all sweep nodes using map_over_sweeps. - """ - # Fetch the sample radar file - filename = DATASETS.fetch("sample_sgp_data.nc") +def test_map_over_sweeps_apply_dummy_function(cfradial1_sgp_dtree): + """Test applying a dummy function to all sweep nodes using map_over_sweeps.""" - # Open the radar file into a DataTree object - dtree = xd.io.open_cfradial1_datatree(filename) - - # Define a simple dummy function that adds a constant field to the dataset def dummy_function(ds): - ds = ds.assign( - dummy_field=ds["reflectivity_horizontal"] * 0 - ) # Field with zeros + ds = ds.assign(dummy_field=ds["reflectivity_horizontal"] * 0) ds["dummy_field"].attrs = {"unit": "dBZ", "long_name": "Dummy Field"} return ds - # Apply using map_over_sweeps accessor - dtree_modified = dtree.xradar.map_over_sweeps(dummy_function) + dtree_modified = cfradial1_sgp_dtree.xradar.map_over_sweeps(dummy_function) - # Check that the new field exists in sweep_0 and has the correct attributes sweep_0 = dtree_modified["sweep_0"] assert "dummy_field" in sweep_0.data_vars assert sweep_0.dummy_field.attrs["unit"] == "dBZ" assert sweep_0.dummy_field.attrs["long_name"] == "Dummy Field" - # Ensure all non-NaN values are 0 (accounting for -0.0 and NaN values) - non_nan_values = np.nan_to_num( - sweep_0.dummy_field.values - ) # Convert NaNs to zero for comparison + non_nan_values = np.nan_to_num(sweep_0.dummy_field.values) assert np.all(np.isclose(non_nan_values, 0)) -def test_map_over_sweeps_non_sweep_nodes(): - """ - Test that non-sweep nodes remain unchanged when using map_over_sweeps. - """ - # Fetch the sample radar file - filename = DATASETS.fetch("sample_sgp_data.nc") - - # Open the radar file into a DataTree object and add a non-sweep node - dtree = xd.io.open_cfradial1_datatree(filename) +def test_map_over_sweeps_non_sweep_nodes(cfradial1_sgp_dtree): + """Test that non-sweep nodes remain unchanged when using map_over_sweeps.""" + dtree = cfradial1_sgp_dtree.copy() non_sweep_data = xr.Dataset({"non_sweep_data": ("dim", np.arange(10))}) dtree["non_sweep_node"] = non_sweep_data - # Define a simple function that only modifies sweep nodes def dummy_function(ds): if "range" in ds.dims: - ds = ds.assign( - dummy_field=ds["reflectivity_horizontal"] * 0 - ) # Field with zeros + ds = ds.assign(dummy_field=ds["reflectivity_horizontal"] * 0) return ds - # Apply using map_over_sweeps dtree_modified = dtree.xradar.map_over_sweeps(dummy_function) - # Check that non-sweep nodes remain unchanged assert "non_sweep_data" in dtree_modified["non_sweep_node"].data_vars assert "dummy_field" not in dtree_modified["non_sweep_node"].data_vars assert "dummy_field" in dtree_modified["sweep_0"].data_vars diff --git a/tests/test_util.py b/tests/test_util.py index a5755d68..42cae74a 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -266,11 +266,9 @@ def test_ipol_time2(missing, a1gate, rot): dsx.pipe(util.ipol_time, direction=direction) -def test_get_sweep_keys(): - # Test finding sweep keys - filename = DATASETS.fetch("sample_sgp_data.nc") - dtree = io.open_cfradial1_datatree(filename) - # set a fake group +def test_get_sweep_keys(cfradial1_sgp_dtree): + # Test finding sweep keys — copy since we mutate it + dtree = cfradial1_sgp_dtree.copy() dtree["sneep_1"] = dtree["sweep_1"] keys = util.get_sweep_keys(dtree) assert keys == [ @@ -283,134 +281,82 @@ def test_get_sweep_keys(): ] -def test_apply_to_sweeps(): - # Fetch the sample radar file - filename = DATASETS.fetch("sample_sgp_data.nc") - - # Open the radar file into a DataTree object - dtree = io.open_cfradial1_datatree(filename) - - # Define a simple function to test with apply_to_sweeps +def test_apply_to_sweeps(cfradial1_sgp_dtree): def dummy_function(ds): - """A dummy function that adds a constant field to the dataset.""" - ds["dummy_field"] = ( - ds["reflectivity_horizontal"] * 0 - ) # Adding a field with all zeros + ds["dummy_field"] = ds["reflectivity_horizontal"] * 0 ds["dummy_field"].attrs = {"units": "dBZ", "long_name": "Dummy Field"} return ds - # Apply the dummy function to all sweeps using apply_to_sweeps - modified_dtree = util.apply_to_sweeps(dtree, dummy_function) + modified_dtree = util.apply_to_sweeps(cfradial1_sgp_dtree, dummy_function) - # Verify that the dummy field has been added to each sweep sweep_keys = util.get_sweep_keys(modified_dtree) for key in sweep_keys: - assert ( - "dummy_field" in modified_dtree[key].data_vars - ), f"dummy_field not found in {key}" + assert "dummy_field" in modified_dtree[key].data_vars assert modified_dtree[key]["dummy_field"].attrs["units"] == "dBZ" assert modified_dtree[key]["dummy_field"].attrs["long_name"] == "Dummy Field" - # Check that the original data has not been modified - assert ( - "dummy_field" not in dtree["/"].data_vars - ), "dummy_field should not be in the root node" + # Original not modified + assert "dummy_field" not in cfradial1_sgp_dtree["/"].data_vars - # Test that an exception is raised when a function that causes an error is applied + # Error propagation with pytest.raises(ValueError, match="This is an intentional error"): def error_function(ds): raise ValueError("This is an intentional error") - util.apply_to_sweeps(dtree, error_function) - - -def test_apply_to_volume(): - # Fetch the sample radar file - filename = DATASETS.fetch("sample_sgp_data.nc") + util.apply_to_sweeps(cfradial1_sgp_dtree, error_function) - # Open the radar file into a DataTree object - dtree = io.open_cfradial1_datatree(filename) - # Define a simple function to test with apply_to_volume +def test_apply_to_volume(cfradial1_sgp_dtree): def dummy_function(ds): - """A dummy function that adds a constant field to the dataset.""" - ds["dummy_field"] = ( - ds["reflectivity_horizontal"] * 0 - ) # Adding a field with all zeros + ds["dummy_field"] = ds["reflectivity_horizontal"] * 0 ds["dummy_field"].attrs = {"units": "dBZ", "long_name": "Dummy Field"} return ds - # Apply the dummy function to all sweeps using apply_to_volume - modified_dtree = util.apply_to_volume(dtree, dummy_function) + modified_dtree = util.apply_to_volume(cfradial1_sgp_dtree, dummy_function) - # Verify that the modified_dtree is an instance of DataTree - assert isinstance( - modified_dtree, xr.DataTree - ), "The result should be a DataTree instance." + assert isinstance(modified_dtree, xr.DataTree) - # Verify that the dummy field has been added to each sweep sweep_keys = util.get_sweep_keys(modified_dtree) for key in sweep_keys: - assert ( - "dummy_field" in modified_dtree[key].data_vars - ), f"dummy_field not found in {key}" + assert "dummy_field" in modified_dtree[key].data_vars assert modified_dtree[key]["dummy_field"].attrs["units"] == "dBZ" assert modified_dtree[key]["dummy_field"].attrs["long_name"] == "Dummy Field" - # Check that the original DataTree (dtree) has not been modified - original_sweep_keys = util.get_sweep_keys(dtree) - for key in original_sweep_keys: - assert ( - "dummy_field" not in dtree[key].data_vars - ), f"dummy_field should not be in the original DataTree at {key}" + # Original not modified + for key in util.get_sweep_keys(cfradial1_sgp_dtree): + assert "dummy_field" not in cfradial1_sgp_dtree[key].data_vars - # Test edge case: Apply a function that modifies only certain sweeps + # Selective function def selective_function(ds): - """Only modifies sweeps with a specific condition.""" if "reflectivity_horizontal" in ds: ds["selective_field"] = ds["reflectivity_horizontal"] * 1 return ds - # Apply the selective function to all sweeps using apply_to_volume - selectively_modified_dtree = util.apply_to_volume(dtree, selective_function) - - # Verify that the selective field was added only where the condition was met + selectively_modified_dtree = util.apply_to_volume( + cfradial1_sgp_dtree, selective_function + ) for key in sweep_keys: if "reflectivity_horizontal" in modified_dtree[key].data_vars: - assert ( - "selective_field" in selectively_modified_dtree[key].data_vars - ), f"selective_field not found in {key} where it should have been added." + assert "selective_field" in selectively_modified_dtree[key].data_vars else: - assert ( - "selective_field" not in selectively_modified_dtree[key].data_vars - ), f"selective_field should not be present in {key}" + assert "selective_field" not in selectively_modified_dtree[key].data_vars - # Test that an exception is raised when a function that causes an error is applied + # Error propagation with pytest.raises(ValueError, match="This is an intentional error"): def error_function(ds): raise ValueError("This is an intentional error") - util.apply_to_volume(dtree, error_function) - + util.apply_to_volume(cfradial1_sgp_dtree, error_function) -def test_map_over_sweeps_decorator_dummy_function(): - """ - Test applying a dummy function to all sweep nodes using the map_over_sweeps decorator. - """ - # Fetch the sample radar file - filename = DATASETS.fetch("sample_sgp_data.nc") - # Open the radar file into a DataTree object - dtree = xd.io.open_cfradial1_datatree(filename) +def test_map_over_sweeps_decorator_dummy_function(cfradial1_sgp_dtree): + """Test applying a dummy function using the map_over_sweeps decorator.""" - # Use the decorator on the dummy function @xd.map_over_sweeps def dummy_function(ds, refl="none"): - ds = ds.assign( - dummy_field=ds["reflectivity_horizontal"] * 0 - ) # Field with zeros + ds = ds.assign(dummy_field=ds["reflectivity_horizontal"] * 0) ds["dummy_field"].attrs = { "unit": "dBZ", "long_name": "Dummy Field", @@ -418,20 +364,15 @@ def dummy_function(ds, refl="none"): } return ds - # Apply using pipe and decorator - dtree_modified = dtree.pipe(dummy_function, refl="test") + dtree_modified = cfradial1_sgp_dtree.pipe(dummy_function, refl="test") - # Check that the new field exists in sweep_0 and has the correct attributes sweep_0 = dtree_modified["sweep_0"] assert "dummy_field" in sweep_0.data_vars assert sweep_0.dummy_field.attrs["unit"] == "dBZ" assert sweep_0.dummy_field.attrs["long_name"] == "Dummy Field" assert sweep_0.dummy_field.attrs["test"] == "test" - # Ensure all non-NaN values are 0 (accounting for -0.0 and NaN values) - non_nan_values = np.nan_to_num( - sweep_0.dummy_field.values - ) # Convert NaNs to zero for comparison + non_nan_values = np.nan_to_num(sweep_0.dummy_field.values) assert np.all(np.isclose(non_nan_values, 0))