diff --git a/pysp2/util/normalized_derivative_method.py b/pysp2/util/normalized_derivative_method.py index b7622ca..6dff399 100644 --- a/pysp2/util/normalized_derivative_method.py +++ b/pysp2/util/normalized_derivative_method.py @@ -87,18 +87,22 @@ def central_difference(S, num_records=None, normalize=True, baseline_to_zero=Tru return xr.Dataset(dSdt) -def plot_normalized_derivative(ds, record_no, chn=0): +def plot_normalized_derivative(S, ds, record_no, chn=0, plot_scattering_signal=False): """ Plots the normalized derivative of the scattering signal for a given record_no and channel. Parameters ---------- + S: xarray Dataset + The original scattering signal dataset, used for optional overlay. ds: xarray Dataset The dataset containing the normalized derivative to plot. record_no: int The record number to plot. chn: int The channel number to plot (0 or 4). + plot_scattering_signal: bool + If True, overlay the original scattering signal on the plot. Returns ------- ax: matplotlib Axes @@ -114,10 +118,10 @@ def plot_normalized_derivative(ds, record_no, chn=0): inp_data = {} inp_data['time'] = xr.DataArray(np.array(time[np.newaxis]), dims=['time']) - inp_data['Data_ch' + str(chn)] = xr.DataArray( - spectra['Data_ch' + str(chn)].values[np.newaxis, :], - dims=['time', 'bins']) - inp_data = xr.Dataset(inp_data) + #inp_data['Data_ch' + str(chn)] = xr.DataArray( + # spectra['Data_ch' + str(chn)].values[np.newaxis, :], + # dims=['time', 'bins']) + #inp_data = xr.Dataset(inp_data) bins = np.arange(0, 0.00004-0.3e-6, 0.4e-6) # 0 to 0.0004 microseconds in steps of 0.4e-6 seconds bins = bins*1e6 # convert to microseconds for plotting @@ -125,14 +129,42 @@ def plot_normalized_derivative(ds, record_no, chn=0): ch_name = f'Data_ch{chn}' plt.figure(figsize=(10, 6)) ax = plt.gca() - # Plot using bins for x-axis - ax.plot(bins, spectra['Data_ch' + str(chn)].values, label=ch_name) + # --- Primary axis: normalized derivative --- + line1, = ax.plot( + bins, + spectra[ch_name].values, + label=f'{ch_name} (Normalized dS/dt)' + ) + ax.set_xlim([bins[0], bins[-1]]) - ax.set_title(f'Normalized Derivative of Scattering Signal - Channel {chn} Record {record_no}') - ax.set_xlabel('Time ($\mu$s)') + ax.set_xlabel('Time ($\\mu$s)') ax.set_ylabel('Normalized Derivative') - plt.grid() - ax.legend() + + # --- Secondary axis: scattering signal --- + if plot_scattering_signal: + ax2 = ax.twinx() + y = S[ch_name].isel(event_index=record_no).values + y = y - np.nanmin(y) # baseline shift + + line2, = ax2.plot( + bins, + y, + color = 'black', + linestyle='--', + label=f'{ch_name} (Scattering Signal)' + ) + ax2.set_ylabel('Scattering Signal (baseline shifted)') + + # Combine legends + lines = [line1, line2] + labels = [l.get_label() for l in lines] + ax.legend(lines, labels) + else: + ax.legend() + ax.set_title( + f'Normalized Derivative of Scattering Signal - Channel {chn} Record {record_no}' + ) + ax.grid() return ax @@ -171,6 +203,106 @@ class MLEConfig: grid_size: int = 401 grid_margin: float = 0.5 +def _to_dataarray( + obj: Union[xr.DataArray, xr.Dataset], + name: str, + ch: Optional[str] = None, +) -> xr.DataArray: + """ + Accept either a DataArray or Dataset. + If a Dataset is provided, select the variable named `ch`. + """ + if isinstance(obj, xr.DataArray): + return obj + + if isinstance(obj, xr.Dataset): + if ch is not None: + # Use the user input channel. + if ch not in obj.data_vars: + raise ValueError( + f"{ch!r} not found in {name}.data_vars={list(obj.data_vars)}" + ) + return obj[ch] + + if len(obj.data_vars) == 1: + only_var = next(iter(obj.data_vars)) + return obj[only_var] + + raise ValueError( + f"{name} is a Dataset with multiple variables. " + f"Provide ch. Available: {list(obj.data_vars)}" + ) + + raise TypeError(f"{name} must be an xarray DataArray or Dataset.") + + +def _moteki_kondo_subset_statistics( + yk: np.ndarray, + sk: np.ndarray, + tk: np.ndarray, + tau: float, + *, + h: float, + sigma_bar: float, + delta_sigma: float, + A1: float, + A2: float, + A3: float, +) -> tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray], Optional[float]]: + """ + Shared Moteki & Kondo subset statistics. + + Returns + ------- + ybar : np.ndarray + Mean vector [Eq. (A.4)]. + Sigma : np.ndarray + Covariance matrix [Eqs. (A.10a), (A.10b)]. + L : np.ndarray or None + Cholesky factor of Sigma, if Sigma is positive definite. + d2 : float or None + Statistical distance squared [Eq. (A.11)]. + """ + # Mean vector of the normalized derivative under the Gaussian beam model. + # Eq. (A.4): ybar_i(tau) = -(t_i - tau) / sigma_bar^2 + ybar = -(tk - tau) / (sigma_bar * sigma_bar) + + # Signal-noise amplitude from Appendix A. + # Eq. (A.6): deltaS_i = sqrt(A1^2 + A2^2 S_i + A3^2 S_i^2) + deltaS = np.sqrt(A1 * A1 + (A2 * A2) * sk + (A3 * A3) * (sk * sk)) + + # Random variance in y = S'/S from finite-difference error propagation. + # Eq. (A.7): (delta y_i)_ran = Af_d * (1/h) * (1/S_i) * deltaS_i + Af_d = np.sqrt(130.0) / 12.0 + with np.errstate(divide="ignore", invalid="ignore"): + var_rand_k = (Af_d * Af_d) / (h * h) * (deltaS * deltaS) / (sk * sk) + + if not np.all(np.isfinite(var_rand_k)): + return None, None, None, None + if np.any(var_rand_k <= 0): + return None, None, None, None + + # Systematic covariance from particle-by-particle fluctuations in sigma. + # Eq. (A.10a): Cov[y_i, y_j] = 4/sigma_bar^6 * (t_i - tau)(t_j - tau) * delta_sigma^2 + # Eq. (A.10b): Var[y_i] adds the random variance term above. + dt = (tk - tau).reshape(-1, 1) + sys_pref = 4.0 * (delta_sigma * delta_sigma) / (sigma_bar ** 6) + Sigma = sys_pref * (dt @ dt.T) + Sigma[np.diag_indices_from(Sigma)] += var_rand_k + + r = yk - ybar + + try: + L = np.linalg.cholesky(Sigma) + except np.linalg.LinAlgError: + return ybar, Sigma, None, None + + # Eq. (A.11): d^2 = (y - ybar)^T Sigma^{-1} (y - ybar) + z = np.linalg.solve(L, r) + d2 = float(z.T @ z) + + return ybar, Sigma, L, d2 + def mle_tau_moteki_kondo( S: Union[xr.DataArray, xr.Dataset], norm_deriv: Union[xr.DataArray, xr.Dataset], @@ -217,33 +349,9 @@ def mle_tau_moteki_kondo( if config is None: raise ValueError("config must be provided.") - def _to_dataarray(obj: Union[xr.DataArray, xr.Dataset], name: str) -> xr.DataArray: - """ - Accept either a DataArray or Dataset. - If a Dataset is provided, select the variable named `ch`. - """ - if isinstance(obj, xr.DataArray): - return obj - if isinstance(obj, xr.Dataset): - if ch is not None: - # Use the user input channel. - if ch not in obj.data_vars: - raise ValueError( - f"{ch!r} not found in {name}.data_vars={list(obj.data_vars)}" - ) - return obj[ch] - if len(obj.data_vars) == 1: - only_var = next(iter(obj.data_vars)) - return obj[only_var] - raise ValueError( - f"{name} is a Dataset with multiple variables. " - f"Provide ch. Available: {list(obj.data_vars)}" - ) - raise TypeError(f"{name} must be an xarray DataArray or Dataset.") - # Convert datasets to the selected DataArrays. - S = _to_dataarray(S, "S") - norm_deriv = _to_dataarray(norm_deriv, "norm_deriv") + S = _to_dataarray(S, "S", ch=ch) + norm_deriv = _to_dataarray(norm_deriv, "norm_deriv", ch=ch) # The method requires one event axis and one sample axis. if event_dim not in S.dims: @@ -291,7 +399,7 @@ def _to_dataarray(obj: Union[xr.DataArray, xr.Dataset], name: str) -> xr.DataArr raise ValueError(f"k_end must be in [0, {n_samples - p}], got {k_end}") # Optional tau grid for the 1D grid search in tau. - # Moteki & Kondo determine tau numerically by maximizing L_k(tau). + # Moteki & Kondo determine tau numerically by maximizing L_k(tau) [Appendix A.5]. if tau_grid is not None: tau_grid_np = np.asarray( tau_grid.data if isinstance(tau_grid, xr.DataArray) else tau_grid, @@ -311,7 +419,7 @@ def _to_dataarray(obj: Union[xr.DataArray, xr.Dataset], name: str) -> xr.DataArr # Time axis used in the fit. # Here we use physical time spacing h so tk is in seconds (or whatever unit h uses). # This must match sigma_bar and delta_sigma units. - t = np.arange(n_samples) * h + t = np.arange(n_samples, dtype=float) * h if h <= 0: raise ValueError("config.h must be positive.") @@ -320,72 +428,6 @@ def _to_dataarray(obj: Union[xr.DataArray, xr.Dataset], name: str) -> xr.DataArr if delta_sigma < 0: raise ValueError("config.delta_sigma must be >= 0.") - # Eq. (A.7): finite-difference amplification factor for the derivative noise. - Af_d = np.sqrt(130.0) / 12.0 - - def _logL_for_tau(yk: np.ndarray, sk: np.ndarray, tk: np.ndarray, tau: float) -> float: - """ - Log-likelihood for one k-subset and one candidate tau. - - Mean model: - ybar_i(tau) = -(t_i - tau) / sigma_bar^2 [Eq. (A.4)] - where y_i = S'_i / S_i. - - Covariance: - Cov[y_i, y_j] = 4 / sigma_bar^6 * (t_i - tau)(t_j - tau) * (delta_sigma)^2 [Eq. (A.10a)] - Var[y_i] = 4 / sigma_bar^6 * (t_i - tau)^2 * (delta_sigma)^2 - + (Af_d^2 / h^2) * (1/S_i^2) * (delta S_i)^2 [Eq. (A.10b)] - with - delta S_i = sqrt(A1^2 + A2^2 S_i + A3^2 S_i^2) [Eq. (A.6)] - and - (delta y_i)_ran = Af_d * (1/h) * (1/S_i) * delta S_i [Eq. (A.7)] - - The full likelihood is the multivariate Gaussian in Eq. (A.9). - """ - # Mean vector of the normalized derivative under the Gaussian beam model. - # This is the line I'/I = -(t - tau)/sigma^2 [Eq. (5)] used as the mean [Eq. (A.4)]. - ybar = -(tk - tau) / (sigma_bar * sigma_bar) - - # Signal-noise amplitude from Appendix A [Eq. (A.6)]. - deltaS = np.sqrt(A1 * A1 + (A2 * A2) * sk + (A3 * A3) * (sk * sk)) - - # Random variance of y = S'/S from finite-difference error propagation [Eq. (A.7)]. - with np.errstate(divide="ignore", invalid="ignore"): - var_rand_k = (Af_d * Af_d) / (h * h) * (deltaS * deltaS) / (sk * sk) - - # If any term is non-finite, this tau candidate is unusable. - if not np.all(np.isfinite(var_rand_k)): - return -np.inf - if np.any(var_rand_k <= 0): - return -np.inf - - # Systematic covariance from particle-by-particle fluctuations in sigma [Eq. (A.10a)]. - dt = (tk - tau).reshape(-1, 1) - sys_pref = 4.0 * (delta_sigma * delta_sigma) / (sigma_bar ** 6) - Sigma = sys_pref * (dt @ dt.T) - # Add the diagonal random variance term [Eq. (A.10b)]. - Sigma[np.diag_indices_from(Sigma)] += var_rand_k - - # Residual vector y - ybar. - r = yk - ybar - - # Use Cholesky factorization for numerical stability when evaluating Eq. (A.9). - try: - L = np.linalg.cholesky(Sigma) - except np.linalg.LinAlgError: - return -np.inf - - # Compute statistical distance. - # d^2 = (y - ybar)^T Sigma^{-1} (y - ybar) [Eq. (A.11)] - z = np.linalg.solve(L, r) - d2 = float(z.T @ z) - # log |Sigma| from the Cholesky factor. - logdet = 2.0 * np.sum(np.log(np.diag(L))) - - # Multivariate normal log-likelihood [Eq. (A.9)]. - p_local = yk.size - return float(-0.5 * (p_local * np.log(2.0 * np.pi) + logdet + d2)) - def _tau_hat_for_one_event(s_event: np.ndarray, y_event: np.ndarray) -> np.ndarray: """ For one event, scan all k-subsets of length p and return tau_hat(k). @@ -419,7 +461,45 @@ def _tau_hat_for_one_event(s_event: np.ndarray, y_event: np.ndarray) -> np.ndarr best_ll = -np.inf best_tau = np.nan for tau_cand in grid: - ll = _logL_for_tau(yk, sk, tk, float(tau_cand)) + _, _, _, d2 = _moteki_kondo_subset_statistics( + yk, + sk, + tk, + float(tau_cand), + h=h, + sigma_bar=sigma_bar, + delta_sigma=delta_sigma, + A1=A1, + A2=A2, + A3=A3, + ) + + if d2 is None: + ll = -np.inf + else: + # Reconstruct the log-likelihood from the same subset statistics. + # Eq. (A.9): log L = -1/2 * [p log(2pi) + log|Sigma| + d^2] + # Since _moteki_kondo_subset_statistics does not return log|Sigma|, + # we compute likelihood separately here for the grid-search step. + ybar, Sigma, L, d2 = _moteki_kondo_subset_statistics( + yk, + sk, + tk, + float(tau_cand), + h=h, + sigma_bar=sigma_bar, + delta_sigma=delta_sigma, + A1=A1, + A2=A2, + A3=A3, + ) + if L is None or d2 is None: + ll = -np.inf + else: + logdet = 2.0 * np.sum(np.log(np.diag(L))) + p_local = yk.size + ll = float(-0.5 * (p_local * np.log(2.0 * np.pi) + logdet + d2)) + if ll > best_ll: best_ll = ll best_tau = float(tau_cand) @@ -445,3 +525,191 @@ def _tau_hat_for_one_event(s_event: np.ndarray, y_event: np.ndarray) -> np.ndarr "units": "sample_index_or_time_units_of_t", }, ) + +def compute_d2_moteki_kondo( + S: Union[xr.DataArray, xr.Dataset], + norm_deriv: Union[xr.DataArray, xr.Dataset], + tau_hat: Union[np.ndarray, xr.DataArray], + p: int, + *, + ch: Optional[str] = None, + event_index: int, + event_dim: str = "event_index", + S_sample_dim: Optional[str] = None, + y_sample_dim: Optional[str] = None, + k_end: Optional[int] = None, + config: Optional[MLEConfig] = None, +) -> xr.DataArray: + """ + Compute d^2(k) for one selected event using the Moteki & Kondo statistical distance. + + This is the quantity used to quantify how well each k-subset matches the expected + I'/I line segment [Eq. (A.11)], and it is the same statistic used in Appendix A.5 + for judging candidate sub-arrays against the chi-square reference distribution. + + Parameters + ---------- + S : xr.DataArray or xr.Dataset + Scattering signal S(t). + norm_deriv : xr.DataArray or xr.Dataset + Normalized derivative S'(t)/S(t). + tau_hat : 1D array-like or xr.DataArray + tau_hat(k) for the selected event. Must have length k_end + 1. + p : int + Number of consecutive points in each k-subset. + ch : str, optional + Variable name to select when S and/or norm_deriv are Datasets. + event_index : int + Event index to evaluate. + event_dim : str + Name of the event dimension. + S_sample_dim : str, optional + Sample dimension in S. + y_sample_dim : str, optional + Sample dimension in norm_deriv. + k_end : int, optional + Largest starting k. If None, inferred from tau_hat length. + config : MLEConfig + Calibration / noise / grid settings. + + Returns + ------- + xr.DataArray + d^2(k) for the selected event, with dimension k. + """ + if config is None: + raise ValueError("config must be provided.") + + # Convert Datasets to DataArrays if needed. + S = _to_dataarray(S, "S", ch=ch) + norm_deriv = _to_dataarray(norm_deriv, "norm_deriv", ch=ch) + + # Require the event dimension. + if event_dim not in S.dims: + raise ValueError(f"{event_dim!r} not found in S.dims={S.dims}") + if event_dim not in norm_deriv.dims: + raise ValueError(f"{event_dim!r} not found in norm_deriv.dims={norm_deriv.dims}") + + # Infer sample dimensions if not provided. + if S_sample_dim is None: + s_non_event_dims = [d for d in S.dims if d != event_dim] + if len(s_non_event_dims) != 1: + raise ValueError( + f"Could not infer S sample dim. Non-event dims in S: {s_non_event_dims}" + ) + S_sample_dim = s_non_event_dims[0] + + if y_sample_dim is None: + y_non_event_dims = [d for d in norm_deriv.dims if d != event_dim] + if len(y_non_event_dims) != 1: + raise ValueError( + f"Could not infer norm_deriv sample dim. Non-event dims in norm_deriv: {y_non_event_dims}" + ) + y_sample_dim = y_non_event_dims[0] + + # Rename to common internal sample dimension. + S_std = S.rename({S_sample_dim: "sample"}) + y_std = norm_deriv.rename({y_sample_dim: "sample"}) + + # Align on common coordinates. + S_std, y_std = xr.align(S_std, y_std, join="inner") + + if event_index < 0 or event_index >= S_std.sizes[event_dim]: + raise ValueError( + f"event_index must be in [0, {S_std.sizes[event_dim] - 1}], got {event_index}" + ) + + n_samples = S_std.sizes["sample"] + if p < 2 or p > n_samples: + raise ValueError(f"p must be in [2, {n_samples}], got {p}") + + # Tau-hat is expected to be a vector over k. + tau_hat_np = np.asarray( + tau_hat.data if isinstance(tau_hat, xr.DataArray) else tau_hat, + dtype=float, + ) + if tau_hat_np.ndim != 1: + raise ValueError("tau_hat must be 1D.") + + if k_end is None: + k_end = tau_hat_np.size - 1 + if k_end < 0: + raise ValueError(f"k_end must be >= 0, got {k_end}") + if tau_hat_np.size != k_end + 1: + raise ValueError( + f"tau_hat length must equal k_end + 1. Got len(tau_hat)={tau_hat_np.size}, " + f"k_end={k_end}." + ) + if k_end > n_samples - p: + raise ValueError(f"k_end must be in [0, {n_samples - p}], got {k_end}") + + # Parameters from Appendix A. + h = float(config.h) + sigma_bar = float(config.sigma_bar) + delta_sigma = float(config.delta_sigma) + A1, A2, A3 = float(config.A1), float(config.A2), float(config.A3) + + if h <= 0: + raise ValueError("config.h must be positive.") + if sigma_bar <= 0: + raise ValueError("config.sigma_bar must be positive.") + if delta_sigma < 0: + raise ValueError("config.delta_sigma must be >= 0.") + + # Time axis used in the fit. + # This should match the time axis used in mle_tau_moteki_kondo. + t = np.arange(n_samples, dtype=float) * h + + # Select the requested event only. + s_event = np.asarray(S_std.sel({event_dim: event_index}).values, dtype=float) + y_event = np.asarray(y_std.sel({event_dim: event_index}).values, dtype=float) + + if not (np.all(np.isfinite(s_event)) and np.all(np.isfinite(y_event))): + raise ValueError(f"Selected event_index={event_index} contains non-finite values.") + + d2_vals = np.full(k_end + 1, np.nan, dtype=float) + + for k in range(k_end + 1): + # Consecutive p-point subset starting at k, exactly as in the tau_hat search. + yk = y_event[k : k + p] + sk = s_event[k : k + p] + tk = t[k : k + p] + + if not (np.all(np.isfinite(yk)) and np.all(np.isfinite(sk))): + continue + + tau_k = float(tau_hat_np[k]) + if not np.isfinite(tau_k): + continue + + # Build the subset mean/covariance using the same equations as the MLE. + # Mean: Eq. (A.4) + # Covariance: Eqs. (A.10a), (A.10b) + # Statistical distance: Eq. (A.11) + _, _, _, d2 = _moteki_kondo_subset_statistics( + yk, + sk, + tk, + tau_k, + h=h, + sigma_bar=sigma_bar, + delta_sigma=delta_sigma, + A1=A1, + A2=A2, + A3=A3, + ) + + if d2 is not None and np.isfinite(d2): + d2_vals[k] = d2 + + out = xr.DataArray( + d2_vals, + dims=("k",), + coords={"k": np.arange(k_end + 1)}, + name="d2", + attrs={ + "long_name": f"Moteki & Kondo statistical distance d^2(k) for event_index={event_index}", + "units": "dimensionless", + }, + ) + return out \ No newline at end of file diff --git a/tests/baseline/test_plot_normalized_derivative.png b/tests/baseline/test_plot_normalized_derivative.png index d2a361c..9a8846c 100644 Binary files a/tests/baseline/test_plot_normalized_derivative.png and b/tests/baseline/test_plot_normalized_derivative.png differ diff --git a/tests/test_ndm.py b/tests/test_ndm.py index 9e5d41a..c75e678 100644 --- a/tests/test_ndm.py +++ b/tests/test_ndm.py @@ -2,7 +2,7 @@ import numpy as np np.set_printoptions(threshold=np.inf) -from pysp2.util.normalized_derivative_method import MLEConfig, mle_tau_moteki_kondo +from pysp2.util.normalized_derivative_method import MLEConfig, mle_tau_moteki_kondo, compute_d2_moteki_kondo def test_central_difference(): my_sp2b = pysp2.io.read_sp2(pysp2.testing.EXAMPLE_SP2B) @@ -30,12 +30,12 @@ def test_mle_estimate_tau(): my_sp2b = pysp2.io.read_sp2(pysp2.testing.EXAMPLE_SP2B) my_ini = pysp2.io.read_config(pysp2.testing.EXAMPLE_INI) my_binary = pysp2.util.gaussian_fit(my_sp2b, my_ini, parallel=False, baseline_to_zero=True) - dSdt = pysp2.util.central_difference(my_binary, normalize=False, baseline_to_zero=True) + dSdt = pysp2.util.central_difference(my_binary, normalize=True, baseline_to_zero=True) cfg = MLEConfig( h=0.4e-6, # example: 0.4 microseconds - sigma_bar=26.28*0.4e-6, # example; use your measured average width - delta_sigma=1.2*0.4e-6,# example; use your measured width std dev + sigma_bar= 16.6*0.4e-6, # example; use your measured average width + delta_sigma=1.2*0.4e-6, # example; use your measured width std dev A1=0.37, A2=1.6e-2, A3=6.2e-4, @@ -45,21 +45,47 @@ def test_mle_estimate_tau(): tau = mle_tau_moteki_kondo( S=my_binary, norm_deriv=dSdt, - p=21, + p=10, ch="Data_ch0", event_index=499, config=cfg, ) - tau_val = my_binary['Data_ch0'].isel(event_index=499).argmax().item()*0.4e-6 + tau_val_true = my_binary['Data_ch0'].isel(event_index=499).argmax().item()*0.4e-6 # Test that the estimated tau for a subset of results is close to the true value for the event - for i in range(21, 37): - np.testing.assert_almost_equal(tau[i], tau_val, decimal=6) + for i in range(25, 34): + np.testing.assert_almost_equal(tau[i], tau_val_true, decimal=6) + + d2 = compute_d2_moteki_kondo( + S=my_binary, + norm_deriv=dSdt, + tau_hat=tau, + p=10, + ch="Data_ch0", + event_index=499, + config=cfg, + ) + + # Define k window + k_start, k_end = 18, 34 + d2_subset = d2.isel(k=slice(k_start, k_end + 1)) + # Find index of minimum d2 within subset + k_min_local = int(d2_subset.argmin(dim="k").item()) + k_min = k_start + k_min_local + # Get corresponding tau value + tau_best = tau.isel(k=k_min).item() + + # Assert closeness + np.testing.assert_allclose( + tau_best, + tau_val_true, + atol=0.01e-05, # absolute tolerance = 1e-7 + ) ## Test another event tau = mle_tau_moteki_kondo( S=my_binary, norm_deriv=dSdt, - p=21, + p=10, ch="Data_ch0", event_index=1040, config=cfg, @@ -67,6 +93,56 @@ def test_mle_estimate_tau(): tau_val = my_binary['Data_ch0'].isel(event_index=1040).argmax().item()*0.4e-6 # Test that the estimated tau for a subset of results is close to the true value for the event - for i in range(23, 38): + for i in range(31, 43): np.testing.assert_almost_equal(tau[i], tau_val, decimal=6) + + d2 = compute_d2_moteki_kondo( + S=my_binary, + norm_deriv=dSdt, + tau_hat=tau, + p=10, + ch="Data_ch0", + event_index=1040, + config=cfg, + ) + + tau_best = tau.isel(k=35).item() + # Assert closeness + np.testing.assert_allclose( + tau_best, + tau_val, + atol=0.05e-05, # absolute tolerance = 1e-7 + ) + + ## Test another event + tau = mle_tau_moteki_kondo( + S=my_binary, + norm_deriv=dSdt, + p=10, + ch="Data_ch4", + event_index=2008, + config=cfg, + ) + + d2 = compute_d2_moteki_kondo( + S=my_binary, + norm_deriv=dSdt, + tau_hat=tau, + p=10, + ch="Data_ch4", + event_index=2008, + config=cfg, + ) + + # Test that the estimated tau for a subset of results is close to the true value for the event + for i in range(39, 42): + np.testing.assert_almost_equal(tau[i], tau_val_true, decimal=6) + + tau_best = tau.isel(k=39).item() + # Assert closeness + np.testing.assert_allclose( + tau_best, + tau_val_true, + atol=0.02e-04, # absolute tolerance = 2e-6 (larger tolerance for evaporation events) + ) \ No newline at end of file diff --git a/tests/test_vis.py b/tests/test_vis.py index 40fef1a..a858ace 100644 --- a/tests/test_vis.py +++ b/tests/test_vis.py @@ -17,8 +17,8 @@ def test_plot_normalized_derivative(): my_binary = pysp2.util.gaussian_fit(my_sp2b, my_ini, parallel=False, baseline_to_zero=True) dSdt_norm = pysp2.util.central_difference(my_binary, normalize=True, baseline_to_zero=True) - # Test the plotting function for channel 0 and record number 2 - ax = plot_normalized_derivative(dSdt_norm, record_no=499, chn=0) + # Test the plotting function for channel 0 and record number 499 + ax = plot_normalized_derivative(my_sp2b, dSdt_norm, record_no=499, chn=0, plot_scattering_signal=True) fig = ax.figure return fig @@ -30,7 +30,7 @@ def test_plot_wave(): my_ini = pysp2.io.read_config(pysp2.testing.EXAMPLE_INI) my_binary = pysp2.util.gaussian_fit(my_sp2b, my_ini, parallel=False, baseline_to_zero=True) - # Test the plotting function for channel 0 and record number 2 + # Test the plotting function for channel 0 and record number 499 display = plot_wave(my_binary, record_no=499, chn=0) fig = display.axes[0].figure return fig \ No newline at end of file