diff --git a/src/nemo_spinup_forecast/dimensionality_reduction.py b/src/nemo_spinup_forecast/dimensionality_reduction.py index 61fe9bf..9094f7d 100644 --- a/src/nemo_spinup_forecast/dimensionality_reduction.py +++ b/src/nemo_spinup_forecast/dimensionality_reduction.py @@ -538,7 +538,7 @@ def rmse(self, n): return reconstruction, rmse_values, rmse_map - def rmseValues(self, reconstruction): + def rmse_values(self, reconstruction): """RMSE per time sample. Parameters diff --git a/src/nemo_spinup_forecast/forecast_method.py b/src/nemo_spinup_forecast/forecast_method.py index 275e214..0dfb2e4 100644 --- a/src/nemo_spinup_forecast/forecast_method.py +++ b/src/nemo_spinup_forecast/forecast_method.py @@ -128,8 +128,6 @@ class RecursiveForecaster(BaseForecaster): Number of lagged values of the target to include as features. window_size : int, default=10 Size of the rolling window used to compute additional statistics. - steps : int, default=30 - Number of future steps to forecast. Attributes ---------- @@ -139,11 +137,9 @@ class RecursiveForecaster(BaseForecaster): Number of lags used in forecasting. window_size : int Rolling window size for feature engineering. - steps : int - Horizon length for recursive forecasting. """ - def __init__(self, regressor, lags=10, window_size=10, steps=30): + def __init__(self, regressor, lags=10, window_size=10): """ Initialize the recursive forecaster. @@ -155,15 +151,12 @@ def __init__(self, regressor, lags=10, window_size=10, steps=30): See class docstring. window_size : int, default=10 See class docstring. - steps : int, default=30 - See class docstring. """ self.regressor = regressor self.lags = lags self.window_size = window_size - self.steps = steps - def apply_forecast(self, y_train, _x_train, _x_pred): + def apply_forecast(self, y_train, _x_train, x_pred): """ Fit a recursive forecaster using the provided regressor. @@ -171,10 +164,11 @@ def apply_forecast(self, y_train, _x_train, _x_pred): ---------- y_train : array-like Training target series. - _x_train : array-like, optional - Not used directly in this recursive implementation. - _x_pred : array-like, optional - Not used directly in this recursive implementation. + _x_train : array-like + Not used in this recursive implementation. + x_pred : array-like + Feature matrix whose first dimension determines the number of + forecast steps. Returns ------- @@ -182,10 +176,10 @@ def apply_forecast(self, y_train, _x_train, _x_pred): ``(y_hat, y_hat_std)`` where: * **y_hat** : ndarray - Predicted values for the next `steps` periods. + Predicted values for ``x_pred.shape[0]`` periods. * **y_hat_std** : ndarray - Returned here as a placeholder equal to ``y_hat`` since the - recursive strategy used does not compute prediction intervals. + Placeholder equal to ``y_hat`` since the recursive strategy + does not compute prediction intervals. Notes ----- @@ -209,7 +203,8 @@ def apply_forecast(self, y_train, _x_train, _x_pred): forecaster.fit(pd.Series(y_train)) # Predict the specified steps - y_hat = forecaster.predict(steps=self.steps) + steps = x_pred.shape[0] + y_hat = forecaster.predict(steps=steps) # Recursive forecaster does not return standard deviation for each point # Use y_hat as a placeholder for std @@ -275,7 +270,7 @@ def create_gp_regressor( gp_recursive_forecaster = RecursiveForecaster( - create_gp_regressor(), lags=10, window_size=10, steps=30 + create_gp_regressor(), lags=10, window_size=10 ) gp_forecaster = DirectForecaster(create_gp_regressor()) diff --git a/tests/conftest.py b/tests/conftest.py index fe1d055..6892e36 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,35 +1,49 @@ -from importlib.resources import files - import pytest from nemo_spinup_forecast.dimensionality_reduction import ( - dimensionality_reduction_techniques, + DimensionalityReductionKernelPCA, + DimensionalityReductionPCA, ) from nemo_spinup_forecast.forecast import Predictions, Simulation, load_ts from nemo_spinup_forecast.forecast_method import forecast_techniques from nemo_spinup_forecast.utils import ( create_run_dir, - get_dr_technique, - get_forecast_technique, prepare, ) -# Resolve packaged config file (no reliance on repo root layout) -tech_cfg = files("nemo_spinup_forecast.configs").joinpath("techniques_config.yaml") +DR_PARAMS = [ + (DimensionalityReductionPCA, 0.9), + (DimensionalityReductionKernelPCA, 4), +] + + +@pytest.fixture( + params=DR_PARAMS, + ids=["PCA", "KernelPCA"], +) +def dr_technique(request): + """Return a (DR class, comp) tuple.""" + return request.param + -dr_technique = get_dr_technique(tech_cfg, dimensionality_reduction_techniques) -forecast_technique = get_forecast_technique(tech_cfg, forecast_techniques) +@pytest.fixture( + params=list(forecast_techniques.values()), + ids=list(forecast_techniques.keys()), +) +def forecast_technique(request): + """Return the forecast technique instance.""" + return request.param @pytest.fixture() -def setup_simulation_class(request): +def setup_simulation_class(request, dr_technique): """Fixture to set up the simulation class.""" # Parameters for the simulation class + dr_class, comp = dr_technique path = "tests/data/nemo_data_e3/" start = 20 # Start year for the simulation end = 50 # End year for the simulation ye = True # Indicates if the simulation is yearly - comp = 0.9 # Explained variance ratio for PCA term, filename = request.param # Tuple (phycial property/term, file) simu = Simulation( @@ -40,15 +54,16 @@ def setup_simulation_class(request): comp=comp, term=term, filename=filename, - dimensionality_reduction=dr_technique, + dimensionality_reduction=dr_class, ) return simu @pytest.fixture() -def setup_prediction_class(request): +def setup_prediction_class(request, dr_technique, forecast_technique): """Fixture to set up a prediction class.""" + dr_class, comp = dr_technique # create a per-run directory to store results run_dir = create_run_dir("tests/data/nemo_data_e3/") out_dir = run_dir / "forecasts" @@ -60,13 +75,12 @@ def setup_prediction_class(request): start = 20 end = 50 ye = True # Indicates if the simulation is yearly - comp = 0.9 # Explained variance ratio for PCA - # Applies PCA and saves the results to disk - prepare(term, filename, path, start, end, ye, comp, dr_technique) + # Applies DR technique and saves the results to disk + prepare(term, filename, path, start, end, ye, comp, dr_class) df, infos = load_ts(f"{path}/simu_prepared/{term}", term) - simu_ts = Predictions(term, df, infos, forecast_technique, dr_technique) + simu_ts = Predictions(term, df, infos, forecast_technique, dr_class) return simu_ts, infos diff --git a/tests/test_pca.py b/tests/test_pca.py new file mode 100644 index 0000000..111d436 --- /dev/null +++ b/tests/test_pca.py @@ -0,0 +1,446 @@ +import numpy as np +import pytest + +from nemo_spinup_forecast.dimensionality_reduction import DimensionalityReductionPCA +from nemo_spinup_forecast.forecast import Predictions, Simulation, load_ts +from nemo_spinup_forecast.forecast_method import forecast_techniques +from nemo_spinup_forecast.utils import create_run_dir, prepare + + +@pytest.fixture() +def setup_simulation_class(request): + """Set up a Simulation hardcoded to PCA.""" + path = "tests/data/nemo_data_e3/" + term, filename = request.param + simu = Simulation( + path=path, + start=20, + end=50, + ye=True, + comp=0.9, + term=term, + filename=filename, + dimensionality_reduction=DimensionalityReductionPCA, + ) + return simu + + +@pytest.fixture() +def setup_prediction_class(request): + """Prediction fixture hardcoded to PCA and GaussianProcessForecaster.""" + run_dir = create_run_dir("tests/data/nemo_data_e3/") + out_dir = run_dir / "forecasts" + out_dir.mkdir(parents=True, exist_ok=True) + + term, filename = request.param + path = out_dir + dr_technique = DimensionalityReductionPCA + + prepare(term, filename, path, 20, 50, True, 0.9, dr_technique) + df, infos = load_ts(f"{path}/simu_prepared/{term}", term) + + forecast_technique = forecast_techniques["GaussianProcessForecaster"] + simu_ts = Predictions(term, df, infos, forecast_technique, dr_technique) + return simu_ts, infos + + +@pytest.mark.parametrize( + "setup_simulation_class", + [ + ("toce", "DINO_1y_grid_T.nc"), + ("soce", "DINO_1y_grid_T.nc"), + ("ssh", "DINO_1m_To_1y_grid_T.nc"), + ], + indirect=True, +) +def test_applyPCA_real_data(setup_simulation_class): + """Check applyPCA works correctly on real data.""" + sim = setup_simulation_class + # Prepare the data + sim.get_simulation_data(stand=False) + # After prepare, simulation attribute should be a NumPy array + assert isinstance(sim.simulation, np.ndarray), ( + "Simulation should be numpy array after prepare" + ) + initial_shape = sim.simulation.shape + + sim.decompose() + components = sim.components + + # Components first dimension should equal time length + expected_time_dim = sim.len + + msg = ( + f"Components time dimension {components.shape[0]} " + f"!= expected {expected_time_dim}" + ) + assert components.shape[0] == expected_time_dim, msg + # Components second dimension should equal number of PCA components + expected_n_components = sim.pca.n_components_ + msg = ( + f"Components feature dimension {components.shape[1]} " + f"!= expected {expected_n_components}" + ) + assert components.shape[1] == expected_n_components, msg + + # Boolean mask length should equal total number of spatial features + feature_count = np.prod(initial_shape[1:]) + expected_mask_shape = (feature_count,) + assert sim.bool_mask.shape == expected_mask_shape, ( + f"Mask shape {sim.bool_mask.shape} != expected {expected_mask_shape}" + ) + + # PCA components shape should match [n_components, n_features] + expected_pca_shape = (sim.pca.n_components_, feature_count) + msg = ( + f"PCA components shape {sim.pca.components_.shape} " + f"!= expected {expected_pca_shape}" + ) + assert sim.pca.components_.shape == expected_pca_shape, msg + + +@pytest.mark.parametrize( + "setup_simulation_class", + [ + ("soce", "DINO_1y_grid_T.nc"), # 3D case (z,y,x) + ("toce", "DINO_1y_grid_T.nc"), # 3D case (z,y,x) + ("ssh", "DINO_1m_To_1y_grid_T.nc"), # 2D case (y,x) + ], + indirect=True, +) +def test_getPC_real_data(setup_simulation_class): + """Check getPC returns correct PC map shape, mask, and values for real data.""" + sim = setup_simulation_class + + # prepare real slice and compute PCA + sim.get_simulation_data(stand=False) + sim.decompose() + + std = sim.desc["std"] + mean = sim.desc["mean"] + mask = sim.bool_mask # 1D boolean mask over flattened features + shape = sim.shape # e.g. (z,y,x) or (y,x) + + # Flattened mask length must equal product of spatial dimensions + expected_mask_len = np.prod(shape) + assert mask.shape == (expected_mask_len,), ( + f"Mask length {mask.shape} != expected ({expected_mask_len},)" + ) + + # Test every principal component + for comp in range(sim.pca.n_components_): + pc_map = sim.get_component( + comp + ) # Contribution of each coordinate to the components + + # Should return a numpy array with correct spatial shape + assert isinstance(pc_map, np.ndarray), f"PC map {comp} should be numpy array" + assert pc_map.shape == shape, ( + f"PC map {comp} shape {pc_map.shape} != expected {shape}" + ) + + flat_map = pc_map.ravel() + comp_vals = sim.pca.components_[comp] + + # Build expected flattened map: transform component values back to original scale + expected_flat = np.full(mask.shape, np.nan, dtype=float) + expected_flat[mask] = 2 * comp_vals * std + mean + + np.testing.assert_allclose( + flat_map, + expected_flat, + equal_nan=True, + err_msg=f"PC map {comp} values incorrect", + ) + + +@pytest.mark.parametrize( + "setup_simulation_class", + [ + ("soce", "DINO_1y_grid_T.nc"), # 3D case (z,y,x) + ("toce", "DINO_1y_grid_T.nc"), # 3D case (z,y,x) + ("ssh", "DINO_1m_To_1y_grid_T.nc"), # 2D case (y,x) + ], + indirect=True, +) +def test_reconstruct_shape_and_mask_real_data(setup_simulation_class): + """ + Check reconstruct returns correct arrays. + + This checks that the array is the correct shape, preserves the mask, + and has finite values. + """ + sim = setup_simulation_class + + # set up the PCA on the real data + sim.get_simulation_data(stand=False) + sim.decompose() + + # Check for a few n values: 1, all components, and beyond + ns = [1, sim.pca.n_components_] + for n in ns: + rec = sim.dimensionality_reduction.reconstruct_components(n) + + # Should return array with correct shape: (time, *spatial_dims) + expected_shape = (sim.len, *sim.shape) + assert isinstance(rec, np.ndarray), ( + f"Reconstruction with {n} components should be numpy array" + ) + assert rec.shape == expected_shape, ( + f"Reconstruction shape {rec.shape} != expected {expected_shape}" + ) + + # Integer mask should be updated to match spatial shape + int_mask = sim.dimensionality_reduction.int_mask + assert int_mask.shape == sim.shape, ( + f"Integer mask shape {int_mask.shape} != expected spatial shape {sim.shape}" + ) + + # For each time slice, masked positions should be NaN, unmasked finite + flat_mask = int_mask.ravel() + for t in range(rec.shape[0]): + flat_rec = rec[t].ravel() + # Masked positions (0) should contain NaN values + masked_positions = flat_mask == 0 + assert np.all(np.isnan(flat_rec[masked_positions])), ( + f"Time {t}: masked positions should be NaN" + ) + # Unmasked positions (1) should contain finite values + unmasked_positions = flat_mask == 1 + assert np.all(np.isfinite(flat_rec[unmasked_positions])), ( + f"Time {t}: unmasked positions should be finite" + ) + + +@pytest.mark.parametrize( + "setup_simulation_class", + [ + ("soce", "DINO_1y_grid_T.nc"), + ("toce", "DINO_1y_grid_T.nc"), + ("ssh", "DINO_1m_To_1y_grid_T.nc"), + ], + indirect=True, +) +def test_reconstruct_full_components_recovers_original_data(setup_simulation_class): + """Check reconstruct with all components recovers original data.""" + sim = setup_simulation_class + + sim.get_simulation_data(stand=False) + # Use all available components for reconstruction + sim.dimensionality_reduction.comp = None + sim.decompose() + + # Reconstruct using all components - should recover original data + rec_all = sim.dimensionality_reduction.reconstruct_components(sim.pca.n_components_) + + # Original simulation was stored as raw values before PCA + orig = sim.simulation + assert isinstance(orig, np.ndarray), "Original simulation should be numpy array" + + # Shapes should match exactly + assert rec_all.shape == orig.shape, ( + f"Reconstruction shape {rec_all.shape} != original shape {orig.shape}" + ) + + # Values should match up to numerical tolerance for full reconstruction + np.testing.assert_allclose( + rec_all, + orig, + rtol=1e-5, + atol=1e-1, + equal_nan=True, + err_msg="Full reconstruction should recover original data", + ) + + +@pytest.mark.parametrize( + "setup_simulation_class", + [ + ("toce", "DINO_1y_grid_T.nc"), # 3D data (time, z, y, x) + ("soce", "DINO_1y_grid_T.nc"), # 3D data (time, z, y, x) + ("ssh", "DINO_1m_To_1y_grid_T.nc"), # 2D data (time, y, x) + ], + indirect=True, +) +def test_rmseMap_real_data_full_components_zero(setup_simulation_class): + """Check rmseMap returns zeros for full-component reconstruction on real data.""" + sim = setup_simulation_class + # Prepare without standardization to retain raw values + sim.get_simulation_data(stand=False) + # Ensure simulation data is a NumPy array + assert isinstance(sim.simulation, np.ndarray), "Simulation should be numpy array" + + # Use all available components to reconstruct full data + sim.dimensionality_reduction.comp = None # None defaults to using all components + sim.decompose() + rec_all = sim.dimensionality_reduction.reconstruct_components(sim.pca.n_components_) + + # Compute RMSE map between original and reconstructed data + rmse_map = sim.dimensionality_reduction.rmse_map(rec_all) + print("max: ", np.max(rec_all)) + + # Check return type and shape + assert isinstance(rmse_map, np.ndarray), "RMSE map should be numpy array" + assert rmse_map.shape == sim.shape, ( + f"Expected rmse_map shape {sim.shape}, got {rmse_map.shape}" + ) + + # Boolean mask of valid (unmasked) positions, reshaped to spatial dimensions + mask = sim.bool_mask.reshape(sim.shape) + + # Unmasked positions (True) should have zero RMSE + # within tolerance for full reconstruction + ( + np.testing.assert_allclose(rmse_map[mask], 0.0, atol=1e-1), + ("Non-zero RMSE found at unmasked positions for full reconstruction"), + ) + + # Masked positions (False) should remain NaN (no data to compute RMSE) + assert np.all(np.isnan(rmse_map[~mask])), ( + "Expected NaN at masked positions in rmse_map" + ) + + +@pytest.mark.parametrize( + "setup_simulation_class", + [ + ("toce", "DINO_1y_grid_T.nc"), # 3D data (time, z, y, x) + ("soce", "DINO_1y_grid_T.nc"), # 3D data (time, z, y, x) + ("ssh", "DINO_1m_To_1y_grid_T.nc"), # 2D data (time, y, x) + ], + indirect=True, +) +def test_rmseValues_real_data_full_components_zero(setup_simulation_class): + """Check rmseValues returns zeros for full-component reconstruction on real data.""" + sim = setup_simulation_class + + # Prepare raw numpy simulation and compute PCA for full reconstruction + sim.get_simulation_data(stand=False) + sim.dimensionality_reduction.comp = None + sim.decompose() + rec_all = sim.dimensionality_reduction.reconstruct_components(sim.pca.n_components_) + + # Compute RMSE values over time + rmse_values = sim.dimensionality_reduction.rmse_values(rec_all) + + # Verify return type + assert isinstance(rmse_values, np.ndarray), "RMSE values should be numpy array" + + # Check the correct output shape based on data dimensionality + if sim.z_size is not None: + # For 3D data, shape should be (time, depth) for + # RMSE per time step and depth level + expected_shape = (sim.len, sim.z_size) + else: + # For 2D data, shape should be (time,) - RMSE per time step + expected_shape = (sim.len,) + + assert rmse_values.shape == expected_shape, ( + f"RMSE values shape {rmse_values.shape} != expected {expected_shape}" + ) + + # All RMSE values should be effectively zero for full reconstruction + np.testing.assert_allclose( + rmse_values, + 0, + atol=1e-3, + err_msg="RMSE should be near zero for full reconstruction", + ) + + +@pytest.mark.parametrize( + "setup_simulation_class", + [ + ("toce", "DINO_1y_grid_T.nc"), + ("soce", "DINO_1y_grid_T.nc"), + ("ssh", "DINO_1m_To_1y_grid_T.nc"), + ], + indirect=True, +) +def test_rmseOfPCA_real_full_zero(setup_simulation_class): + """Check rmseOfPCA returns zeros RMSE values and map for full reconstruction.""" + sim = setup_simulation_class + sim.dimensionality_reduction.comp = None + sim.get_simulation_data(stand=False) + sim.decompose() + + # Use all components for full reconstruction - should be nearly perfect + n_comp = sim.pca.n_components_ + _rec, rmse_values, rmse_map = sim.error(n_comp) + + # Check RMSE values shape and near-zero values + if sim.z_size is not None: + expected_values_shape = (sim.len, sim.z_size) + else: + expected_values_shape = (sim.len,) + + assert rmse_values.shape == expected_values_shape, ( + f"RMSE values shape {rmse_values.shape} != expected {expected_values_shape}" + ) + np.testing.assert_allclose( + rmse_values, + 0, + atol=5e-1, + err_msg="RMSE values should be near zero for full reconstruction", + ) + + # Check RMSE map shape and near-zero values + if sim.z_size is not None: + expected_map_shape = (sim.z_size, sim.y_size, sim.x_size) + else: + expected_map_shape = (sim.y_size, sim.x_size) + + assert rmse_map.shape == expected_map_shape, ( + f"RMSE map shape {rmse_map.shape} != expected {expected_map_shape}" + ) + np.testing.assert_allclose( + rmse_map, + 0, + atol=5e-1, + err_msg="RMSE map should be near zero for full reconstruction", + ) + + +CASES = [ + ("ssh", "DINO_1m_To_1y_grid_T.nc"), + ("soce", "DINO_1y_grid_T.nc"), + ("toce", "DINO_1y_grid_T.nc"), +] + +# Duplicate each case so BOTH fixtures receive the same tuple +PARAM_ROWS = [pytest.param(c, c, id=f"{c[0]}-{c[1]}") for c in CASES] + +## NOTE: The test below is not solely pca specific + + +@pytest.mark.parametrize( + ("setup_prediction_class", "setup_simulation_class"), + PARAM_ROWS, + indirect=["setup_prediction_class", "setup_simulation_class"], +) +def test_predictions_reconstruct(setup_prediction_class, setup_simulation_class): + """ + Check that the reconstruct function rebuilds the time series correctly. + + This will check the result is the correct shape. + """ + # setup prediction class + pred, infos = setup_prediction_class + sim = setup_simulation_class + + steps = 20 + + # Forecast specified number of steps + y_hat, _y_hat_std, _metrics = pred.parallel_forecast(len(pred), steps) + + # Reconstruct with n predicted components + n = len(pred.info["pca"].components_) + reconstructed_preds = sim.reconstruct( + y_hat, n, infos, begin=0 + ) # TODO: Use simulation prediction class, setup simulation class fixture + + # Expected shape: forecast steps x original spatial dimensions + expected_shape = (steps, *tuple(pred.info["shape"])) + assert reconstructed_preds.shape == expected_shape, ( + f"Reconstructed shape {reconstructed_preds.shape} != expected {expected_shape}" + ) diff --git a/tests/test_prediction.py b/tests/test_prediction.py index bf2c9b2..b4381fd 100644 --- a/tests/test_prediction.py +++ b/tests/test_prediction.py @@ -106,8 +106,6 @@ def test_forecast_ts_valid_input(setup_prediction_class): # Expected shape: original data length + forecasted steps expected_shape = (steps,) - assert isinstance(y_hat, np.ndarray), "y_hat should be a NumPy array" - assert isinstance(y_hat_std, np.ndarray), "y_hat_std should be a NumPy array" assert y_hat.shape == expected_shape, ( f"y_hat shape {y_hat.shape} != expected {expected_shape}" ) @@ -141,11 +139,7 @@ def test_forecast_ts_no_test_data_no_steps(setup_prediction_class): # No test data available for validation => metrics must be None assert metrics is None, "Metrics should be None when no test data is available" - # Expected shape: original data length + steps + 1 - # (when steps=0, still returns one extra point) expected_shape = (0,) - assert isinstance(y_hat, np.ndarray), "y_hat should be a NumPy array" - assert isinstance(y_hat_std, np.ndarray), "y_hat_std should be a NumPy array" assert y_hat.shape == expected_shape, ( f"y_hat shape {y_hat.shape} != expected {expected_shape}" ) @@ -175,10 +169,7 @@ def test_forecast_ts_no_test_data_with_steps(setup_prediction_class): # No test data available for validation => metrics must be None assert metrics is None, "Metrics should be None when no test data is available" - # Expected shape: original data length + forecasted steps expected_shape = (steps,) - assert isinstance(y_hat, np.ndarray), "y_hat should be a NumPy array" - assert isinstance(y_hat_std, np.ndarray), "y_hat_std should be a NumPy array" assert y_hat.shape == expected_shape, ( f"y_hat shape {y_hat.shape} != expected {expected_shape}" ) @@ -240,46 +231,3 @@ def test_train_test_series_standard_case(setup_prediction_class): total_len = steps expected_x_pred = np.linspace(1 + pas, 1 + steps * pas, steps).reshape(-1, 1) assert np.allclose(x_pred, expected_x_pred) - - -CASES = [ - ("ssh", "DINO_1m_To_1y_grid_T.nc"), - ("soce", "DINO_1y_grid_T.nc"), - ("toce", "DINO_1y_grid_T.nc"), -] - -# Duplicate each case so BOTH fixtures receive the same tuple -PARAM_ROWS = [pytest.param(c, c, id=f"{c[0]}-{c[1]}") for c in CASES] - - -@pytest.mark.parametrize( - ("setup_prediction_class", "setup_simulation_class"), - PARAM_ROWS, - indirect=["setup_prediction_class", "setup_simulation_class"], -) -def test_predictions_reconstruct(setup_prediction_class, setup_simulation_class): - """ - Check that the reconstruct function rebuilds the time series correctly. - - This will check the result is the correct shape. - """ - # setup prediction class - pred, infos = setup_prediction_class - sim = setup_simulation_class - - steps = 20 - - # Forecast specified number of steps - y_hat, _y_hat_std, _metrics = pred.parallel_forecast(len(pred), steps) - - # Reconstruct with n predicted components - n = len(pred.info["pca"].components_) - reconstructed_preds = sim.reconstruct( - y_hat, n, infos, begin=0 - ) # TODO: Use simulation prediction class, setup simulation class fixture - - # Expected shape: forecast steps x original spatial dimensions - expected_shape = (steps, *tuple(pred.info["shape"])) - assert reconstructed_preds.shape == expected_shape, ( - f"Reconstructed shape {reconstructed_preds.shape} != expected {expected_shape}" - ) diff --git a/tests/test_simulation.py b/tests/test_simulation.py index f24f091..3e3b86c 100644 --- a/tests/test_simulation.py +++ b/tests/test_simulation.py @@ -350,231 +350,6 @@ def test_standardize(setup_simulation_class): ) -@pytest.mark.parametrize( - "setup_simulation_class", - [ - ("toce", "DINO_1y_grid_T.nc"), - ("soce", "DINO_1y_grid_T.nc"), - ("ssh", "DINO_1m_To_1y_grid_T.nc"), - ], - indirect=True, -) -def test_applyPCA_real_data(setup_simulation_class): - """Check applyPCA works correctly on real data.""" - sim = setup_simulation_class - # Prepare the data - sim.get_simulation_data(stand=False) - # After prepare, simulation attribute should be a NumPy array - assert isinstance(sim.simulation, np.ndarray), ( - "Simulation should be numpy array after prepare" - ) - initial_shape = sim.simulation.shape - - sim.decompose() - components = sim.components - - # Components first dimension should equal time length - expected_time_dim = sim.len - - msg = ( - f"Components time dimension {components.shape[0]} " - f"!= expected {expected_time_dim}" - ) - assert components.shape[0] == expected_time_dim, msg - # Components second dimension should equal number of PCA components - expected_n_components = sim.pca.n_components_ - msg = ( - f"Components feature dimension {components.shape[1]} " - f"!= expected {expected_n_components}" - ) - assert components.shape[1] == expected_n_components, msg - - # Boolean mask length should equal total number of spatial features - feature_count = np.prod(initial_shape[1:]) - expected_mask_shape = (feature_count,) - assert sim.bool_mask.shape == expected_mask_shape, ( - f"Mask shape {sim.bool_mask.shape} != expected {expected_mask_shape}" - ) - - # PCA components shape should match [n_components, n_features] - expected_pca_shape = (sim.pca.n_components_, feature_count) - msg = ( - f"PCA components shape {sim.pca.components_.shape} " - f"!= expected {expected_pca_shape}" - ) - assert sim.pca.components_.shape == expected_pca_shape, msg - - -@pytest.mark.parametrize( - "setup_simulation_class", - [ - ("soce", "DINO_1y_grid_T.nc"), # 3D case (z,y,x) - ("toce", "DINO_1y_grid_T.nc"), # 3D case (z,y,x) - ("ssh", "DINO_1m_To_1y_grid_T.nc"), # 2D case (y,x) - ], - indirect=True, -) -def test_getPC_real_data(setup_simulation_class): - """Check getPC returns correct PC map shape, mask, and values for real data.""" - sim = setup_simulation_class - - # prepare real slice and compute PCA - sim.get_simulation_data(stand=False) - sim.decompose() - - std = sim.desc["std"] - mean = sim.desc["mean"] - mask = sim.bool_mask # 1D boolean mask over flattened features - shape = sim.shape # e.g. (z,y,x) or (y,x) - - # Flattened mask length must equal product of spatial dimensions - expected_mask_len = np.prod(shape) - assert mask.shape == (expected_mask_len,), ( - f"Mask length {mask.shape} != expected ({expected_mask_len},)" - ) - - # Test every principal component - for comp in range(sim.pca.n_components_): - pc_map = sim.get_component( - comp - ) # Contribution of each coordinate to the components - - # Should return a numpy array with correct spatial shape - assert isinstance(pc_map, np.ndarray), f"PC map {comp} should be numpy array" - assert pc_map.shape == shape, ( - f"PC map {comp} shape {pc_map.shape} != expected {shape}" - ) - - flat_map = pc_map.ravel() - comp_vals = sim.pca.components_[comp] - - # Build expected flattened map: transform component values back to original scale - expected_flat = np.full(mask.shape, np.nan, dtype=float) - expected_flat[mask] = 2 * comp_vals * std + mean - - np.testing.assert_allclose( - flat_map, - expected_flat, - equal_nan=True, - err_msg=f"PC map {comp} values incorrect", - ) - - -@pytest.mark.parametrize( - "setup_simulation_class", - [ - ("soce", "DINO_1y_grid_T.nc"), # 3D case (z,y,x) - ("toce", "DINO_1y_grid_T.nc"), # 3D case (z,y,x) - ("ssh", "DINO_1m_To_1y_grid_T.nc"), # 2D case (y,x) - ], - indirect=True, -) -def test_reconstruct_shape_and_mask_real_data(setup_simulation_class): - """ - Check reconstruct returns correct arrays. - - This checks that the array is the correct shape, preserves the mask, - and has finite values. - """ - sim = setup_simulation_class - - # set up the PCA on the real data - sim.get_simulation_data(stand=False) - sim.decompose() - - # Check for a few n values: 1, all components, and beyond - ns = [1, sim.pca.n_components_] - for n in ns: - rec = sim.dimensionality_reduction.reconstruct_components(n) - - # Should return array with correct shape: (time, *spatial_dims) - expected_shape = (sim.len, *sim.shape) - assert isinstance(rec, np.ndarray), ( - f"Reconstruction with {n} components should be numpy array" - ) - assert rec.shape == expected_shape, ( - f"Reconstruction shape {rec.shape} != expected {expected_shape}" - ) - - # Integer mask should be updated to match spatial shape - int_mask = sim.dimensionality_reduction.int_mask - assert int_mask.shape == sim.shape, ( - f"Integer mask shape {int_mask.shape} != expected spatial shape {sim.shape}" - ) - - # For each time slice, masked positions should be NaN, unmasked finite - flat_mask = int_mask.ravel() - for t in range(rec.shape[0]): - flat_rec = rec[t].ravel() - # Masked positions (0) should contain NaN values - masked_positions = flat_mask == 0 - assert np.all(np.isnan(flat_rec[masked_positions])), ( - f"Time {t}: masked positions should be NaN" - ) - # Unmasked positions (1) should contain finite values - unmasked_positions = flat_mask == 1 - assert np.all(np.isfinite(flat_rec[unmasked_positions])), ( - f"Time {t}: unmasked positions should be finite" - ) - - -@pytest.mark.parametrize( - "setup_simulation_class", - [ - ("soce", "DINO_1y_grid_T.nc"), - ("toce", "DINO_1y_grid_T.nc"), - ("ssh", "DINO_1m_To_1y_grid_T.nc"), - ], - indirect=True, -) -def test_reconstruct_full_components_recovers_original_data(setup_simulation_class): - """Check reconstruct with all components recovers original data.""" - sim = setup_simulation_class - - sim.get_simulation_data(stand=False) - # Use all available components for reconstruction - sim.dimensionality_reduction.comp = None - sim.decompose() - - # Reconstruct using all components - should recover original data - rec_all = sim.dimensionality_reduction.reconstruct_components(sim.pca.n_components_) - - # Original simulation was stored as raw values before PCA - orig = sim.simulation - assert isinstance(orig, np.ndarray), "Original simulation should be numpy array" - - # Shapes should match exactly - assert rec_all.shape == orig.shape, ( - f"Reconstruction shape {rec_all.shape} != original shape {orig.shape}" - ) - - # Values should match up to numerical tolerance for full reconstruction - np.testing.assert_allclose( - rec_all, - orig, - rtol=1e-5, - atol=1e-1, - equal_nan=True, - err_msg="Full reconstruction should recover original data", - ) - - -@pytest.fixture -def dummy_sim_array(): - """Fixture providing a simple 3-time-step, 2x2 spatial array simulation.""" - sim = Simulation.__new__(Simulation) - # Create a 3x2x2 array with increasing values for predictable RMSE calculations - sim.simulation = np.array( - [ - [[1.0, 2.0], [3.0, 4.0]], - [[2.0, 3.0], [4.0, 5.0]], - [[3.0, 4.0], [5.0, 6.0]], - ] - ) - sim.len = sim.simulation.shape[0] - return sim - - @pytest.mark.parametrize( "setup_simulation_class", [ @@ -611,54 +386,6 @@ def test_rmseMap_zero_for_identical(setup_simulation_class): ) -@pytest.mark.parametrize( - "setup_simulation_class", - [ - ("toce", "DINO_1y_grid_T.nc"), # 3D data (time, z, y, x) - ("soce", "DINO_1y_grid_T.nc"), # 3D data (time, z, y, x) - ("ssh", "DINO_1m_To_1y_grid_T.nc"), # 2D data (time, y, x) - ], - indirect=True, -) -def test_rmseMap_real_data_full_components_zero(setup_simulation_class): - """Check rmseMap returns zeros for full-component reconstruction on real data.""" - sim = setup_simulation_class - # Prepare without standardization to retain raw values - sim.get_simulation_data(stand=False) - # Ensure simulation data is a NumPy array - assert isinstance(sim.simulation, np.ndarray), "Simulation should be numpy array" - - # Use all available components to reconstruct full data - sim.dimensionality_reduction.comp = None # None defaults to using all components - sim.decompose() - rec_all = sim.dimensionality_reduction.reconstruct_components(sim.pca.n_components_) - - # Compute RMSE map between original and reconstructed data - rmse_map = sim.dimensionality_reduction.rmse_map(rec_all) - print("max: ", np.max(rec_all)) - - # Check return type and shape - assert isinstance(rmse_map, np.ndarray), "RMSE map should be numpy array" - assert rmse_map.shape == sim.shape, ( - f"Expected rmse_map shape {sim.shape}, got {rmse_map.shape}" - ) - - # Boolean mask of valid (unmasked) positions, reshaped to spatial dimensions - mask = sim.bool_mask.reshape(sim.shape) - - # Unmasked positions (True) should have zero RMSE - # within tolerance for full reconstruction - ( - np.testing.assert_allclose(rmse_map[mask], 0.0, atol=1e-1), - ("Non-zero RMSE found at unmasked positions for full reconstruction"), - ) - - # Masked positions (False) should remain NaN (no data to compute RMSE) - assert np.all(np.isnan(rmse_map[~mask])), ( - "Expected NaN at masked positions in rmse_map" - ) - - @pytest.mark.parametrize( "setup_simulation_class", [ @@ -746,103 +473,3 @@ def test_rmseValues_zero_for_identical(setup_simulation_class): 0, err_msg="RMSE should be close to zero for identical reconstruction", ) - - -@pytest.mark.parametrize( - "setup_simulation_class", - [ - ("toce", "DINO_1y_grid_T.nc"), # 3D data (time, z, y, x) - ("soce", "DINO_1y_grid_T.nc"), # 3D data (time, z, y, x) - ("ssh", "DINO_1m_To_1y_grid_T.nc"), # 2D data (time, y, x) - ], - indirect=True, -) -def test_rmseValues_real_data_full_components_zero(setup_simulation_class): - """Check rmseValues returns zeros for full-component reconstruction on real data.""" - sim = setup_simulation_class - - # Prepare raw numpy simulation and compute PCA for full reconstruction - sim.get_simulation_data(stand=False) - sim.dimensionality_reduction.comp = None - sim.decompose() - rec_all = sim.dimensionality_reduction.reconstruct_components(sim.pca.n_components_) - - # Compute RMSE values over time - rmse_values = sim.dimensionality_reduction.rmse_values(rec_all) - - # Verify return type - assert isinstance(rmse_values, np.ndarray), "RMSE values should be numpy array" - - # Check the correct output shape based on data dimensionality - if sim.z_size is not None: - # For 3D data, shape should be (time, depth) for - # RMSE per time step and depth level - expected_shape = (sim.len, sim.z_size) - else: - # For 2D data, shape should be (time,) - RMSE per time step - expected_shape = (sim.len,) - - assert rmse_values.shape == expected_shape, ( - f"RMSE values shape {rmse_values.shape} != expected {expected_shape}" - ) - - # All RMSE values should be effectively zero for full reconstruction - np.testing.assert_allclose( - rmse_values, - 0, - atol=1e-3, - err_msg="RMSE should be near zero for full reconstruction", - ) - - -@pytest.mark.parametrize( - "setup_simulation_class", - [ - ("toce", "DINO_1y_grid_T.nc"), - ("soce", "DINO_1y_grid_T.nc"), - ("ssh", "DINO_1m_To_1y_grid_T.nc"), - ], - indirect=True, -) -def test_rmseOfPCA_real_full_zero(setup_simulation_class): - """Check rmseOfPCA returns zeros RMSE values and map for full reconstruction.""" - sim = setup_simulation_class - sim.dimensionality_reduction.comp = None - sim.get_simulation_data(stand=False) - sim.decompose() - - # Use all components for full reconstruction - should be nearly perfect - n_comp = sim.pca.n_components_ - _rec, rmse_values, rmse_map = sim.error(n_comp) - - # Check RMSE values shape and near-zero values - if sim.z_size is not None: - expected_values_shape = (sim.len, sim.z_size) - else: - expected_values_shape = (sim.len,) - - assert rmse_values.shape == expected_values_shape, ( - f"RMSE values shape {rmse_values.shape} != expected {expected_values_shape}" - ) - np.testing.assert_allclose( - rmse_values, - 0, - atol=5e-1, - err_msg="RMSE values should be near zero for full reconstruction", - ) - - # Check RMSE map shape and near-zero values - if sim.z_size is not None: - expected_map_shape = (sim.z_size, sim.y_size, sim.x_size) - else: - expected_map_shape = (sim.y_size, sim.x_size) - - assert rmse_map.shape == expected_map_shape, ( - f"RMSE map shape {rmse_map.shape} != expected {expected_map_shape}" - ) - np.testing.assert_allclose( - rmse_map, - 0, - atol=5e-1, - err_msg="RMSE map should be near zero for full reconstruction", - )