From bbd255658446b7d3009a17760790415beb159809 Mon Sep 17 00:00:00 2001 From: npounder Date: Tue, 22 May 2018 17:35:12 +0100 Subject: [PATCH 01/26] Hessian correction adapted for narrowband SAIL --- kafka/inference/kf_tools.py | 42 +++++++++++++++++++++---------------- kafka/inference/utils.py | 23 +++++++++++++++++--- kafka/linear_kf.py | 22 +++++++++++++++++-- 3 files changed, 64 insertions(+), 23 deletions(-) diff --git a/kafka/inference/kf_tools.py b/kafka/inference/kf_tools.py index 127685e..bc1a093 100644 --- a/kafka/inference/kf_tools.py +++ b/kafka/inference/kf_tools.py @@ -23,52 +23,56 @@ def band_selecta(band): return np.array([3, 4, 6, 5]) -def hessian_correction_pixel(gp, x0, C_obs_inv, innovation, band, nparams): +def hessian_correction_pixel(hessian, C_obs_inv, innovation): + ''' selecta = band_selecta(band) ddH = gp.hessian(np.atleast_2d(x0[selecta])) big_ddH = np.zeros((nparams, nparams)) for i, ii in enumerate(selecta): for j, jj in enumerate(selecta): big_ddH[ii, jj] = ddH.squeeze()[i, j] - big_hessian_corr = big_ddH*C_obs_inv*innovation - return big_hessian_corr + ''' + hessian_corr = hessian*C_obs_inv*innovation + return hessian_corr -def hessian_correction(gp, x0, R_mat, innovation, mask, state_mask, band, +def hessian_correction(hessian, R_mat, innovation, mask, state_mask, nparams): """Calculates higher order Hessian correction for the likelihood term. Needs the GP, the Observational uncertainty, the mask....""" - if not hasattr(gp, "hessian"): + if hessian is None: # The observation operator does not provide a Hessian method. We just # return 0, meaning no Hessian correction. return 0. C_obs_inv = R_mat.diagonal()[state_mask.flatten()] mask = mask[state_mask].flatten() + little_hess = [] - for i, (innov, C, m) in enumerate(zip(innovation, C_obs_inv, mask)): + for i, (innov, C, m, hess) in enumerate(zip(innovation, C_obs_inv, mask, hessian)): if not m: # Pixel is masked hessian_corr = np.zeros((nparams, nparams)) else: - # Get state for current pixel - x0_pixel = x0.squeeze()[(nparams*i):(nparams*(i + 1))] + ## Get state for current pixel + #x0_pixel = x0.squeeze()[(nparams*i):(nparams*(i + 1))] # Calculate the Hessian correction for this pixel - hessian_corr = m * hessian_correction_pixel(gp, x0_pixel, C, - innov, band, nparams) + hessian_corr = m * hessian_correction_pixel(hess, C, innov) little_hess.append(hessian_corr) - hessian_corr = block_diag(little_hess) + + hessian_corr = block_diag(hessian) return hessian_corr -def hessian_correction_multiband(gp, x0, R_mats, innovations, masks, state_mask, n_bands, - nparams): +def hessian_correction_multiband(hessians, R_mats, innovations, + masks, state_mask, n_bands, nparams): """ Non linear correction for the Hessian of the cost function. This handles multiple bands. """ little_hess_cor = [] - for R, innovation, mask, band in zip(R_mats, innovations, masks, range(n_bands)): - little_hess_cor.append(hessian_correction(gp, x0, R, innovation, mask, state_mask, band, - nparams)) - hessian_corr = sum(little_hess_cor) #block_diag(little_hess_cor) + for R, hessian, innovation, mask in zip( + R_mats, hessians, innovations, masks): + little_hess_cor.append(hessian_correction(hessian, R, innovation, + mask, state_mask, nparams)) + hessian_corr = sum(little_hess_cor) return hessian_corr @@ -317,7 +321,9 @@ def no_propagation(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, prior=None, state_propagator=None, date=None): - """No propagation. In this case, we return the original prior. As the + """ + THIS IS ONLY SUITABLE FOR BROADBAND SAIL uses TIP prior + No propagation. In this case, we return the original prior. As the information filter behaviour is the standard behaviour in KaFKA, we only return the inverse covariance matrix. **NOTE** the input parameters are there to comply with the API, but are **UNUSED**. diff --git a/kafka/inference/utils.py b/kafka/inference/utils.py index 1a43e4b..4c2425a 100644 --- a/kafka/inference/utils.py +++ b/kafka/inference/utils.py @@ -171,12 +171,26 @@ def create_nonlinear_observation_operator(n_params, emulator, metadata, LOG.info("\tDone!") - return (H0, H_matrix.tocsr()) + '''if calc_hess: + ddH = emulator.hessian(x0[mask[state_mask]]) + hess = np.zeros((n_times, nparams, nparams)) + for lil_hes , m in zip(ddH, mask[state_mask].flatten()): + if m: + big_ddH = np.zeros((nparams, nparams)) + for i, ii in enumerate(state_mapper): + for j, jj in enumerate(state_mapper): + #big_ddH[ii, jj] = .squeeze()[i, j] + pass + #hess[.append(big_ddH) + ''' + return (H0, H_matrix.tocsr(), hess) if calc_hess else (H0, H_matrix.tocsr()) + def create_prosail_observation_operator(n_params, emulator, metadata, - mask, state_mask, x_forecast, band): + mask, state_mask, x_forecast, + band, calc_hess=False): """Using an emulator of the nonlinear model around `x_forecast`. This case is quite special, as I'm focusing on a BHR SAIL version (or the JRC TIP), which have spectral parameters @@ -213,7 +227,10 @@ def create_prosail_observation_operator(n_params, emulator, metadata, LOG.info("\tDone!") - return (H0, H_matrix.tocsr()) + if calc_hess: + hess = emulator.hessian(x0[mask[state_mask]]) + + return (H0, H_matrix.tocsr(), hess) if calc_hess else (H0, H_matrix.tocsr()) diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index f4d1ce0..be7030b 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -305,12 +305,27 @@ def do_all_bands(self, timestep, current_data, x_forecast, P_forecast, # Once we have converged... # Correct hessian for higher order terms #split_points = [m.sum( ) for m in MASK] + HESSIAN = [] INNOVATIONS = np.split(innovations, n_bands) - P_correction = hessian_correction_multiband(data.emulator, x_analysis, + for band, data in enumerate(current_data): + # calculate the hessian for the solution + _,_,hessian= self._create_observation_operator(self.n_params, + data.emulator, + data.metadata, + data.mask, + self.state_mask, + x_analysis, + band, + calc_hess = True) + HESSIAN.append(hessian) + P_correction = hessian_correction_multiband(HESSIAN, UNC, INNOVATIONS, MASK, self.state_mask, n_bands, self.n_params) P_analysis_inverse = P_analysis_inverse - P_correction + # Rarely, this returns a small negative value. For now set to nan. + # May require further investigation in the future + P_analysis_inverse[P_analysis_inverse<0] = np.nan # Done with this observation, move along... @@ -407,8 +422,11 @@ def assimilate_band(self, band, timestep, x_forecast, P_forecast, data.uncertainty, innovations, data.mask, self.state_mask, band, self.n_params) + # UPDATE HESSIAN WITH HIGHER ORDER CONTRIBUTION P_analysis_inverse = P_analysis_inverse - P_correction - # P_analysis_inverse = UPDATE HESSIAN WITH HIGHER ORDER CONTRIBUTION + # Rarely, this returns a small negative value. For now set to nan. + # May require further investigation in the future + P_analysis_inverse[P_analysis_inverse<0] = np.nan import matplotlib.pyplot as plt M = self.state_mask*1. M[self.state_mask] = x_analysis[6::7] From ef84d0bdc5067393c57423f1eae924776ab199a1 Mon Sep 17 00:00:00 2001 From: npounder Date: Thu, 24 May 2018 14:19:52 +0100 Subject: [PATCH 02/26] LAI propagator and prior for narrow band SAIL --- kafka/inference/narrowbandSAIL_tools.py | 103 ++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 kafka/inference/narrowbandSAIL_tools.py diff --git a/kafka/inference/narrowbandSAIL_tools.py b/kafka/inference/narrowbandSAIL_tools.py new file mode 100644 index 0000000..c57f3a5 --- /dev/null +++ b/kafka/inference/narrowbandSAIL_tools.py @@ -0,0 +1,103 @@ +import logging +import numpy as np +from .utils import block_diag +import os +import gdal + +def sail_prior_values(): + """ + :returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + + mean = np.array([2.1, np.exp(-60. / 100.), + np.exp(-7.0 / 100.), 0.1, + np.exp(-50 * 0.0176), np.exp(-100. * 0.002), + np.exp(-4. / 2.), 70. / 90., 0.5, 0.9]) + sigma = np.array([0.01, 0.2, + 0.01, 0.05, + 0.01, 0.01, + 0.50, 0.1, 0.1, 0.1]) + + covar = np.diag(sigma ** 2).astype(np.float32) + inv_covar = np.diag(1. / sigma ** 2).astype(np.float32) + return mean, covar, inv_covar + +class SAILPrior(object): + def __init__(self, parameter_list, state_mask): + self.parameter_list = parameter_list + if isinstance(state_mask, (np.ndarray, np.generic)): + self.state_mask = state_mask + else: + self.state_mask = self._read_mask(state_mask) + # parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', + # 'lai', 'ala', 'bsoil', 'psoil'] + # self.mean = np.array([1.19, np.exp(-14.4/100.), + # np.exp(-4.0/100.), 0.1, + # np.exp(-50*0.68), np.exp(-100./21.0), + # np.exp(-3.97/2.),70./90., 0.5, 0.9]) + # sigma = np.array([0.69, 0.016, + # 0.0086, 0.1, + # 1.71e-2, 0.017, + # 0.20, 0.5, 0.5, 0.5]) + mean, c_prior, c_inv_prior = sail_prior_values() + self.mean = mean + self.covar = c_prior + self.inv_covar = c_inv_prior + + + def _read_mask(self, fname): + """Tries to read the mask as a GDAL dataset""" + if not os.path.exists(fname): + raise IOError("State mask is neither an array or a file that exists!") + g = gdal.Open(fname) + if g is None: + raise IOError("{:s} can't be opened with GDAL!".format(fname)) + mask = g.ReadAsArray() + return mask + + def process_prior ( self, time, inv_cov=True): + # Presumably, self._inference_prior has some method to retrieve + # a bunch of files for a given date... + n_pixels = self.state_mask.sum() + x0 = np.array([self.mean for i in range(n_pixels)]).flatten() + if inv_cov: + inv_covar_list = [self.inv_covar for m in range(n_pixels)] + inv_covar = block_diag(inv_covar_list, dtype=np.float32) + return x0, inv_covar + else: + covar_list = [self.covar for m in range(n_pixels)] + covar = block_diag(covar_list, dtype=np.float32) + return x0, covar + + + + +def propagate_LAI_narrowbandSAIL(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + prior=None, state_propagator=None, date=None): + ''' Propagate a single parameter and + set the rest of the parameter propagations to zero + This should be used with a prior for the remaining parameters''' + nparameters = 10 + lai_position = 6 + + x_prior, c_prior, c_inv_prior = sail_prior_values() + + x_forecast = M_matrix.dot(x_analysis) + n_pixels = len(x_analysis)//nparameters + x0 = np.tile(x_prior, n_pixels) + x0[lai_position::nparameters] = x_forecast[lai_position::nparameters] # Update LAI + lai_post_cov = P_analysis_inverse.diagonal()[lai_position::nparameters] + lai_Q = Q_matrix.diagonal()[lai_position::nparameters] + + c_inv_prior_mat = [] + for cov, Q in zip(lai_post_cov, lai_Q): + # inflate uncertainty + lai_inv_cov = 1.0/((1.0/cov)+Q) + little_P_forecast_inverse = c_inv_prior.copy() + little_P_forecast_inverse[lai_position, lai_position] = lai_inv_cov + c_inv_prior_mat.append(little_P_forecast_inverse) + P_forecast_inverse=block_diag(c_inv_prior_mat, dtype=np.float32) + return x0, None, P_forecast_inverse \ No newline at end of file From d227efae4b4b8806d129ac433b63eb37cb2598d0 Mon Sep 17 00:00:00 2001 From: npounder Date: Mon, 4 Jun 2018 14:52:47 +0100 Subject: [PATCH 03/26] Add hessian calculation to the broadband SAIL observation operator --- kafka/inference/utils.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/kafka/inference/utils.py b/kafka/inference/utils.py index 4c2425a..6f21a46 100644 --- a/kafka/inference/utils.py +++ b/kafka/inference/utils.py @@ -125,7 +125,8 @@ def create_linear_observation_operator(obs_op, n_params, metadata, def create_nonlinear_observation_operator(n_params, emulator, metadata, - mask, state_mask, x_forecast, band): + mask, state_mask, x_forecast, + band, calc_hess=False): """Using an emulator of the nonlinear model around `x_forecast`. This case is quite special, as I'm focusing on a BHR SAIL version (or the JRC TIP), which have spectral parameters @@ -171,18 +172,17 @@ def create_nonlinear_observation_operator(n_params, emulator, metadata, LOG.info("\tDone!") - '''if calc_hess: + if calc_hess: ddH = emulator.hessian(x0[mask[state_mask]]) - hess = np.zeros((n_times, nparams, nparams)) - for lil_hes , m in zip(ddH, mask[state_mask].flatten()): + hess = np.zeros((n_times, n_params, n_params)) + for n, (lil_hess, m) in enumerate(zip(ddH, mask[state_mask].flatten())): if m: - big_ddH = np.zeros((nparams, nparams)) + big_hess = np.zeros((n_params, n_params)) for i, ii in enumerate(state_mapper): for j, jj in enumerate(state_mapper): - #big_ddH[ii, jj] = .squeeze()[i, j] - pass - #hess[.append(big_ddH) - ''' + big_hess[ii, jj] = lil_hess.squeeze()[i, j] + hess[n,...] = big_hess + return (H0, H_matrix.tocsr(), hess) if calc_hess else (H0, H_matrix.tocsr()) From e4330f2433815036d717293ba7728058bfb8e7ec Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 11:18:49 +0100 Subject: [PATCH 04/26] warning in logfile when hessian correction results in negative values --- kafka/linear_kf.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index be7030b..568d63c 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -426,7 +426,13 @@ def assimilate_band(self, band, timestep, x_forecast, P_forecast, P_analysis_inverse = P_analysis_inverse - P_correction # Rarely, this returns a small negative value. For now set to nan. # May require further investigation in the future - P_analysis_inverse[P_analysis_inverse<0] = np.nan + negative_values = P_analysis_inverse<0 + if any(negative_values): + P_analysis_inverse[negative_values] = np.nan + LOG.warning("{} negative values in inverse covariance matrix".format( + sum(negative_values))) + + import matplotlib.pyplot as plt M = self.state_mask*1. M[self.state_mask] = x_analysis[6::7] From 4150bbcf9444ae752950004b9bb8be4f0905aadb Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 18:47:06 +0100 Subject: [PATCH 05/26] separate broadband SAIL utils into seperate file TIP prior and LAI propagator for broadband SAIL (or TIP now in one place) common code for propagating a single parameter now in single function --- kafka/inference/broadbandSAIL_tools | 0 kafka/inference/kf_tools.py | 89 ++++++------------------- kafka/inference/narrowbandSAIL_tools.py | 43 +++--------- kafka/linear_kf.py | 3 +- 4 files changed, 32 insertions(+), 103 deletions(-) create mode 100644 kafka/inference/broadbandSAIL_tools diff --git a/kafka/inference/broadbandSAIL_tools b/kafka/inference/broadbandSAIL_tools new file mode 100644 index 0000000..e69de29 diff --git a/kafka/inference/kf_tools.py b/kafka/inference/kf_tools.py index bc1a093..46ed9cb 100644 --- a/kafka/inference/kf_tools.py +++ b/kafka/inference/kf_tools.py @@ -16,22 +16,8 @@ class NoHessianMethod(Exception): def __init__(self, message): self.message = message -def band_selecta(band): - if band == 0: - return np.array([0, 1, 6, 2]) - else: - return np.array([3, 4, 6, 5]) - def hessian_correction_pixel(hessian, C_obs_inv, innovation): - ''' - selecta = band_selecta(band) - ddH = gp.hessian(np.atleast_2d(x0[selecta])) - big_ddH = np.zeros((nparams, nparams)) - for i, ii in enumerate(selecta): - for j, jj in enumerate(selecta): - big_ddH[ii, jj] = ddH.squeeze()[i, j] - ''' hessian_corr = hessian*C_obs_inv*innovation return hessian_corr @@ -53,8 +39,6 @@ def hessian_correction(hessian, R_mat, innovation, mask, state_mask, # Pixel is masked hessian_corr = np.zeros((nparams, nparams)) else: - ## Get state for current pixel - #x0_pixel = x0.squeeze()[(nparams*i):(nparams*(i + 1))] # Calculate the Hessian correction for this pixel hessian_corr = m * hessian_correction_pixel(hess, C, innov) little_hess.append(hessian_corr) @@ -100,43 +84,6 @@ def blend_prior(prior_mean, prior_cov_inverse, x_forecast, P_forecast_inverse): return x_combined, combined_cov_inv -def tip_prior(): - """The JRC-TIP prior in a convenient function which is fun for the whole - family. Note that the effective LAI is here defined in transformed space - where TLAI = exp(-0.5*LAIe). - - Returns - ------- - The mean prior vector, covariance and inverse covariance matrices.""" - # broadly TLAI 0->7 for 1sigma - sigma = np.array([0.12, 0.7, 0.0959, 0.15, 1.5, 0.2, 0.5]) - x0 = np.array([0.17, 1.0, 0.1, 0.7, 2.0, 0.18, np.exp(-0.5*1.5)]) - # The individual covariance matrix - little_p = np.diag(sigma**2).astype(np.float32) - little_p[5, 2] = 0.8862*0.0959*0.2 - little_p[2, 5] = 0.8862*0.0959*0.2 - - inv_p = np.linalg.inv(little_p) - return x0, little_p, inv_p - -def tip_prior_noLAI(prior): - n_pixels = prior['n_pixels'] - mean, prior_cov_inverse = tip_prior(prior) - - -def tip_prior_full(prior): - # This is yet to be properly defined. For now it will create the TIP prior and - # prior just contains the size of the array - this function will be replaced with - # the real code when we know what the priors look like. - x_prior, c_prior, c_inv_prior = tip_prior() - n_pixels = prior['n_pixels'] - mean = np.array([x_prior for i in range(n_pixels)]).flatten() - c_inv_prior_mat = [c_inv_prior for n in range(n_pixels)] - prior_cov_inverse=block_diag(c_inv_prior_mat, dtype=np.float32) - - return mean, prior_cov_inverse - - def propagate_and_blend_prior(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, prior=None, state_propagator=None, date=None): @@ -244,10 +191,11 @@ def propagate_information_filter_SLOW(x_analysis, P_analysis, P_analysis_inverse S= P_analysis_inverse.dot(Q_matrix) A = (sp.eye(n) + S).tocsc() P_forecast_inverse = spl.spsolve(A, P_analysis_inverse) - logging.info("DOne with propagation") + logging.info("Done with propagation") return x_forecast, None, P_forecast_inverse + def propagate_information_filter_approx_SLOW(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, prior=None, state_propagator=None, date=None): @@ -293,35 +241,38 @@ def propagate_information_filter_approx_SLOW(x_analysis, P_analysis, P_analysis_ return x_forecast, None, P_forecast_inverse -def propagate_information_filter_LAI(x_analysis, P_analysis, - P_analysis_inverse, - M_matrix, Q_matrix, - prior=None, state_propagator=None, date=None): - +def propagate_single_parameter(x_analysis, P_analysis, P_analysis_inverse, + M_matrix, Q_matrix, n_param, location, + x_prior, c_inv_prior): + """ Propagate a single parameter and + set the rest of the parameter propagations to the prior. + This should be used with a prior for the remaining parameters""" x_forecast = M_matrix.dot(x_analysis) - x_prior, c_prior, c_inv_prior = tip_prior() - n_pixels = len(x_analysis)//7 - x0 = np.array([x_prior for i in range(n_pixels)]).flatten() - x0[6::7] = x_forecast[6::7] # Update LAI - lai_post_cov = P_analysis_inverse.diagonal()[6::7] - lai_Q = Q_matrix.diagonal()[6::7] + n_pixels = len(x_analysis)//n_param + x0 = np.tile(x_prior, n_pixels) + x0[location::n_param] = x_forecast[location::n_param] # Update LAI + lai_post_cov = P_analysis_inverse.diagonal()[location::n_param] + lai_Q = Q_matrix.diagonal()[location::n_param] c_inv_prior_mat = [] - for n in range(n_pixels): + for cov, Q in zip(lai_post_cov, lai_Q): # inflate uncertainty - lai_inv_cov = 1.0/((1.0/lai_post_cov[n])+lai_Q[n]) + lai_inv_cov = 1.0/((1.0/cov)+Q) little_P_forecast_inverse = c_inv_prior.copy() - little_P_forecast_inverse[6, 6] = lai_inv_cov + little_P_forecast_inverse[location::location] = lai_inv_cov c_inv_prior_mat.append(little_P_forecast_inverse) P_forecast_inverse=block_diag(c_inv_prior_mat, dtype=np.float32) - return x0, None, P_forecast_inverse + def no_propagation(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, prior=None, state_propagator=None, date=None): """ + THIS PROPAGATOR SHOULD NOT BE USED ANY MORE. It is better to set + the state_propagator to None and to use the Prior exlicitly. + THIS IS ONLY SUITABLE FOR BROADBAND SAIL uses TIP prior No propagation. In this case, we return the original prior. As the information filter behaviour is the standard behaviour in KaFKA, we diff --git a/kafka/inference/narrowbandSAIL_tools.py b/kafka/inference/narrowbandSAIL_tools.py index c57f3a5..e2e3608 100644 --- a/kafka/inference/narrowbandSAIL_tools.py +++ b/kafka/inference/narrowbandSAIL_tools.py @@ -1,15 +1,18 @@ import logging +import gdal import numpy as np -from .utils import block_diag import os -import gdal + +from .utils import block_diag +from .kf_tools import propagate_single_parameter def sail_prior_values(): """ :returns ------- The mean prior vector, covariance and inverse covariance matrices.""" - + #parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', + # 'lai', 'ala', 'bsoil', 'psoil'] mean = np.array([2.1, np.exp(-60. / 100.), np.exp(-7.0 / 100.), 0.1, np.exp(-50 * 0.0176), np.exp(-100. * 0.002), @@ -30,16 +33,6 @@ def __init__(self, parameter_list, state_mask): self.state_mask = state_mask else: self.state_mask = self._read_mask(state_mask) - # parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', - # 'lai', 'ala', 'bsoil', 'psoil'] - # self.mean = np.array([1.19, np.exp(-14.4/100.), - # np.exp(-4.0/100.), 0.1, - # np.exp(-50*0.68), np.exp(-100./21.0), - # np.exp(-3.97/2.),70./90., 0.5, 0.9]) - # sigma = np.array([0.69, 0.016, - # 0.0086, 0.1, - # 1.71e-2, 0.017, - # 0.20, 0.5, 0.5, 0.5]) mean, c_prior, c_inv_prior = sail_prior_values() self.mean = mean self.covar = c_prior @@ -71,8 +64,6 @@ def process_prior ( self, time, inv_cov=True): return x0, covar - - def propagate_LAI_narrowbandSAIL(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, @@ -84,20 +75,8 @@ def propagate_LAI_narrowbandSAIL(x_analysis, P_analysis, lai_position = 6 x_prior, c_prior, c_inv_prior = sail_prior_values() - - x_forecast = M_matrix.dot(x_analysis) - n_pixels = len(x_analysis)//nparameters - x0 = np.tile(x_prior, n_pixels) - x0[lai_position::nparameters] = x_forecast[lai_position::nparameters] # Update LAI - lai_post_cov = P_analysis_inverse.diagonal()[lai_position::nparameters] - lai_Q = Q_matrix.diagonal()[lai_position::nparameters] - - c_inv_prior_mat = [] - for cov, Q in zip(lai_post_cov, lai_Q): - # inflate uncertainty - lai_inv_cov = 1.0/((1.0/cov)+Q) - little_P_forecast_inverse = c_inv_prior.copy() - little_P_forecast_inverse[lai_position, lai_position] = lai_inv_cov - c_inv_prior_mat.append(little_P_forecast_inverse) - P_forecast_inverse=block_diag(c_inv_prior_mat, dtype=np.float32) - return x0, None, P_forecast_inverse \ No newline at end of file + return propagate_single_parameter(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + nparameters, lai_position, + x_prior, c_inv_prior) diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index 568d63c..1da2811 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -37,7 +37,6 @@ from .inference import create_linear_observation_operator from .inference import create_nonlinear_observation_operator from .inference import iterate_time_grid -from .inference import propagate_information_filter_LAI # eg from .inference import hessian_correction from .inference import hessian_correction_multiband from .inference.kf_tools import propagate_and_blend_prior @@ -64,7 +63,7 @@ class LinearKalman (object): rather grotty "0-th" order models!""" def __init__(self, observations, output, state_mask, create_observation_operator, parameters_list, - state_propagation=propagate_information_filter_LAI, + state_propagation=None, linear=True, diagnostics=True, prior=None): """The class creator takes (i) an observations object, (ii) an output writer object, (iii) the state mask (a boolean 2D array indicating which From 4baa4233bfdff3996aa604849903b73e70751b00 Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 19:05:26 +0100 Subject: [PATCH 06/26] example script to run with latest propagators --- kafka_test_Py36.py | 181 ++++++++++++++++++--------------------------- 1 file changed, 73 insertions(+), 108 deletions(-) diff --git a/kafka_test_Py36.py b/kafka_test_Py36.py index a1d86f1..7e4f3f2 100644 --- a/kafka_test_Py36.py +++ b/kafka_test_Py36.py @@ -1,8 +1,8 @@ #!/usr/bin/env python import logging -logging.basicConfig( - level=logging.getLevelName(logging.DEBUG), +logging.basicConfig( + level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename="the_log.log") import os @@ -11,7 +11,6 @@ import numpy as np import gdal import osr -import scipy.sparse as sp # from multiply.inference-engine blah blah blah try: @@ -19,18 +18,17 @@ except ImportError: pass -import kafka from kafka.input_output import BHRObservations, KafkaOutput from kafka import LinearKalman -from kafka.inference import block_diag -from kafka.inference import propagate_information_filter_LAI -from kafka.inference import no_propagation +from kafka.inference.broadbandSAIL_tools import propagate_LAI_broadbandSAIL from kafka.inference import create_nonlinear_observation_operator +from kafka.inference.broadbandSAIL_tools import JRCPrior # Probably should be imported from somewhere else, but I can't see # where from ATM... No biggy + def reproject_image(source_img, target_img, dstSRSs=None): """Reprojects/Warps an image to fit exactly another image. Additionally, you can set the destination SRS if you want @@ -54,73 +52,6 @@ def reproject_image(source_img, target_img, dstSRSs=None): dstSRS=dstSRS) return g - - -###class DummyInferencePrior(_WrappingInferencePrior): - ###""" - ###This class is merely a dummy. - ###""" - - ###def process_prior(self, parameters: List[str], time: Union[str, datetime], state_grid: np.array, - - -class JRCPrior(object): - """Dummpy 2.7/3.6 prior class following the same interface as 3.6 only - version.""" - - def __init__ (self, parameter_list, state_mask): - """It makes sense to have the list of parameters and state mask - defined at this point, as they won't change during processing.""" - self.mean, self.covar, self.inv_covar = self._tip_prior() - self.parameter_list = parameter_list - if isinstance(state_mask, (np.ndarray, np.generic) ): - self.state_mask = state_mask - else: - self.state_mask = self._read_mask(state_mask) - - def _read_mask(self, fname): - """Tries to read the mask as a GDAL dataset""" - if not os.path.exists(fname): - raise IOError("State mask is neither an array or a file that exists!") - g = gdal.Open(fname) - if g is None: - raise IOError("{:s} can't be opened with GDAL!".format(fname)) - mask = g.ReadAsArray() - return mask - - def _tip_prior(self): - """The JRC-TIP prior in a convenient function which is fun for the whole - family. Note that the effective LAI is here defined in transformed space - where TLAI = exp(-0.5*LAIe). - - Returns - ------- - The mean prior vector, covariance and inverse covariance matrices.""" - # broadly TLAI 0->7 for 1sigma - sigma = np.array([0.12, 0.7, 0.0959, 0.15, 1.5, 0.2, 0.5]) - x0 = np.array([0.17, 1.0, 0.1, 0.7, 2.0, 0.18, np.exp(-0.5*2.)]) - # The individual covariance matrix - little_p = np.diag(sigma**2).astype(np.float32) - little_p[5, 2] = 0.8862*0.0959*0.2 - little_p[2, 5] = 0.8862*0.0959*0.2 - - inv_p = np.linalg.inv(little_p) - return x0, little_p, inv_p - - def process_prior ( self, time, inv_cov=True): - # Presumably, self._inference_prior has some method to retrieve - # a bunch of files for a given date... - n_pixels = self.state_mask.sum() - x0 = np.array([self.mean for i in range(n_pixels)]).flatten() - if inv_cov: - inv_covar_list = [self.inv_covar for m in range(n_pixels)] - inv_covar = block_diag(inv_covar_list, dtype=np.float32) - return x0, inv_covar - else: - covar_list = [self.covar for m in xrange(n_pixels)] - covar = block_diag(covar_list, dtype=np.float32) - return x0, covar - class KafkaOutputMemory(object): """A very simple class to output the state.""" def __init__(self, parameter_list): @@ -134,57 +65,91 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, self.output[timestep] = solution +def mkdir_p(path): + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise + + if __name__ == "__main__": - + + # Set up logging + Log = logging.getLogger(__name__+".kafka_test_x.py") + + runname = 'Arros_0-25' #Used in output directory as a unique identifier + propagator = propagate_LAI_broadbandSAIL parameter_list = ["w_vis", "x_vis", "a_vis", - "w_nir", "x_nir", "a_nir", "TeLAI"] - - tile = "h11v04" - start_time = "2006185" - + "w_nir", "x_nir", "a_nir", "TeLAI"] + + ## parameters for Bondville data. + #tile = "h11v04" # Bondville + #start_time = "2006001" # Bondville + #cd43a1_dir="/data/MODIS/h11v04/MCD43" + ## Bondville chip + #masklim = ((2200, 2450), (450, 700)) # Bondville, h11v04 + + # Parameters for Spanish tile + tile = "h17v05" # Spain + start_time = "2017001" # Spain + mcd43a1_dir="/data/MODIS/h17v05/MCD43" + # chips in h17v05 Spain, select one + masklim = ((650, 730), (1180, 1280)) # Arros, rice + #masklim = ((900,940), (1300,1340)) = True # Alcornocales + #masklim = ((640,700), (1400,1500)) = True # Campinha + + + + path = "/home/npounder/output/kafka/28Mar/kafkaout_{}".format(runname) + if not os.path.exists(path): + mkdir_p(path) + emulator = "./SAIL_emulator_both_500trainingsamples.pkl" - - if os.path.exists("/storage/ucfajlg/Ujia/MCD43/"): - mcd43a1_dir = "/storage/ucfajlg/Ujia/MCD43/" - else: - mcd43a1_dir="/data/MODIS/h11v04/MCD43" - ####tilewidth = 75 - ###n_pixels = tilewidth*tilewidth + + mask = np.zeros((2400,2400),dtype=np.bool8) - #mask[900:940, 1300:1340] = True # Alcornocales - #mask[640:700, 1400:1500] = True # Campinha - #mask[650:730, 1180:1280] = True # Arros - mask[ 2200:2395, 450:700 ] = True # Bondville, h11v04 + mask[masklim[0][0]:masklim[0][1], + masklim[1][0]:masklim[1][1]] = True - bhr_data = BHRObservations(emulator, tile, mcd43a1_dir, start_time, - end_time=None, mcd43a2_dir=None) + bhr_data = BHRObservations(emulator, tile, mcd43a1_dir, start_time, + end_time=None, mcd43a2_dir=None, period=8) + + Log.info("runname = {}".format(runname)) + Log.info("propagator = {}".format(propagator)) + Log.info("tile = {}".format(tile)) + Log.info("start_time = {}".format(start_time)) + Log.info("mask = {}".format(masklim)) projection, geotransform = bhr_data.define_output() - output = KafkaOutput(parameter_list, geotransform, projection, "/tmp/") + output = KafkaOutput(parameter_list, geotransform, projection, path) the_prior = JRCPrior(parameter_list, mask) - - kf = LinearKalman(bhr_data, output, mask, + + # prior = None when using propagate_LAI_broadbandSAIL + kf = LinearKalman(bhr_data, output, mask, create_nonlinear_observation_operator,parameter_list, - state_propagation=None, - prior=the_prior, + state_propagation=propagator, + prior=None, linear=False) # Get starting state... We can request the prior object for this x_forecast, P_forecast_inv = the_prior.process_prior(None) - + Q = np.zeros_like(x_forecast) - Q[6::7] = 0.025 - + Q[6::7] = 0.25 + kf.set_trajectory_model() kf.set_trajectory_uncertainty(Q) - - base = datetime(2006,7,4) - num_days = 180 + + # This determines the time grid of the retrieved state parameters + base = datetime(2017, 1, 1) + num_days = 366 time_grid = [] - for x in range( 0, num_days, 16): - time_grid.append( base + timedelta(days = x) ) + for x in range( 0, num_days, 8): + time_grid.append(base + timedelta(days=x)) - kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) - + kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) \ No newline at end of file From d19d7037ecbf5276a37b6515010ce7fbcc9e5d96 Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 19:10:51 +0100 Subject: [PATCH 07/26] Added some comments --- kafka_test_Py36.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/kafka_test_Py36.py b/kafka_test_Py36.py index 7e4f3f2..3657b6c 100644 --- a/kafka_test_Py36.py +++ b/kafka_test_Py36.py @@ -81,6 +81,11 @@ def mkdir_p(path): Log = logging.getLogger(__name__+".kafka_test_x.py") runname = 'Arros_0-25' #Used in output directory as a unique identifier + # To run without propagation set propagator to None and set a + # prior in LinearKalman. + # If propagator is set to propagate_LAI_broadbandSAIL then the + # prior in LinearKalman must be set to None because this propagator + # includes a prior propagator = propagate_LAI_broadbandSAIL parameter_list = ["w_vis", "x_vis", "a_vis", "w_nir", "x_nir", "a_nir", "TeLAI"] @@ -127,6 +132,8 @@ def mkdir_p(path): output = KafkaOutput(parameter_list, geotransform, projection, path) + # If using a separate prior then this is passed to LinearKalman + # Otherwise this is just used to set the starting state vector. the_prior = JRCPrior(parameter_list, mask) # prior = None when using propagate_LAI_broadbandSAIL From 8ed511ded213915e98b910a0c77b24fd8f6c2072 Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 19:19:45 +0100 Subject: [PATCH 08/26] example scripts to run with latest propagators --- kafka_test_Py36.py | 4 +- kafka_test_S2.py | 151 ++++++++++++++++----------------------------- 2 files changed, 56 insertions(+), 99 deletions(-) diff --git a/kafka_test_Py36.py b/kafka_test_Py36.py index 3657b6c..46d0e5b 100644 --- a/kafka_test_Py36.py +++ b/kafka_test_Py36.py @@ -81,6 +81,7 @@ def mkdir_p(path): Log = logging.getLogger(__name__+".kafka_test_x.py") runname = 'Arros_0-25' #Used in output directory as a unique identifier + # To run without propagation set propagator to None and set a # prior in LinearKalman. # If propagator is set to propagate_LAI_broadbandSAIL then the @@ -108,7 +109,7 @@ def mkdir_p(path): - path = "/home/npounder/output/kafka/28Mar/kafkaout_{}".format(runname) + path = "/tmp/kafkaout_{}".format(runname) if not os.path.exists(path): mkdir_p(path) @@ -122,7 +123,6 @@ def mkdir_p(path): bhr_data = BHRObservations(emulator, tile, mcd43a1_dir, start_time, end_time=None, mcd43a2_dir=None, period=8) - Log.info("runname = {}".format(runname)) Log.info("propagator = {}".format(propagator)) Log.info("tile = {}".format(tile)) Log.info("start_time = {}".format(start_time)) diff --git a/kafka_test_S2.py b/kafka_test_S2.py index 6f7c7b9..f40c304 100644 --- a/kafka_test_S2.py +++ b/kafka_test_S2.py @@ -4,7 +4,7 @@ logging.basicConfig( level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - filename="the_log.log") + filename="Campinha_noprop_2017.log") import os from datetime import datetime, timedelta import numpy as np @@ -28,14 +28,13 @@ import kafka from kafka.input_output import Sentinel2Observations, KafkaOutput from kafka import LinearKalman -from kafka.inference import block_diag -from kafka.inference import propagate_information_filter_LAI -from kafka.inference import no_propagation +from kafka.inference.narrowbandSAIL_tools import propagate_LAI_narrowbandSAIL from kafka.inference import create_prosail_observation_operator - +from kafka.inference.narrowbandSAIL_tools import SAILPrior # Probably should be imported from somewhere else, but I can't see # where from ATM... No biggy + def reproject_image(source_img, target_img, dstSRSs=None): """Reprojects/Warps an image to fit exactly another image. Additionally, you can set the destination SRS if you want @@ -59,82 +58,6 @@ def reproject_image(source_img, target_img, dstSRSs=None): dstSRS=dstSRS) return g - - -###class DummyInferencePrior(_WrappingInferencePrior): - ###""" - ###This class is merely a dummy. - ###""" - - ###def process_prior(self, parameters: List[str], time: Union[str, datetime], state_grid: np.array, - -class SAILPrior(object): - def __init__ (self, parameter_list, state_mask): - self.parameter_list = parameter_list - if isinstance(state_mask, (np.ndarray, np.generic) ): - self.state_mask = state_mask - else: - self.state_mask = self._read_mask(state_mask) - #parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', - # 'lai', 'ala', 'bsoil', 'psoil'] - #self.mean = np.array([1.19, np.exp(-14.4/100.), - #np.exp(-4.0/100.), 0.1, - #np.exp(-50*0.68), np.exp(-100./21.0), - #np.exp(-3.97/2.),70./90., 0.5, 0.9]) - #sigma = np.array([0.69, 0.016, - #0.0086, 0.1, - #1.71e-2, 0.017, - #0.20, 0.5, 0.5, 0.5]) - self.mean = np.array([2.1, np.exp(-60./100.), - np.exp(-7.0/100.), 0.1, - np.exp(-50*0.0176), np.exp(-100.*0.002), - np.exp(-4./2.), 70./90., 0.5, 0.9]) - sigma = np.array([0.01, 0.2, - 0.01, 0.05, - 0.01, 0.01, - 0.50, 0.1, 0.1, 0.1]) - - self.covar = np.diag(sigma**2).astype(np.float32) - self.inv_covar = np.diag(1./sigma**2).astype(np.float32) - #self.inv_covar[3,3]=0 - ########self.mean = - ########self.variance = - ########lai_m2_m2: [3.1733 1.7940] - - ########cab_ug_cm2: [14.4369 1.8284] - - ########car_u_cm2: [3.9650 0.8948] - - ########cdm_g_cm2: [20.9675 6.4302] - - ########cw_cm: [0.6781 0.6749] - - ########N_: [1.1937 0.6902] - - def _read_mask(self, fname): - """Tries to read the mask as a GDAL dataset""" - if not os.path.exists(fname): - raise IOError("State mask is neither an array or a file that exists!") - g = gdal.Open(fname) - if g is None: - raise IOError("{:s} can't be opened with GDAL!".format(fname)) - mask = g.ReadAsArray() - return mask - - def process_prior ( self, time, inv_cov=True): - # Presumably, self._inference_prior has some method to retrieve - # a bunch of files for a given date... - n_pixels = self.state_mask.sum() - x0 = np.array([self.mean for i in range(n_pixels)]).flatten() - if inv_cov: - inv_covar_list = [self.inv_covar for m in range(n_pixels)] - inv_covar = block_diag(inv_covar_list, dtype=np.float32) - return x0, inv_covar - else: - covar_list = [self.covar for m in range(n_pixels)] - covar = block_diag(covar_list, dtype=np.float32) - return x0, covar - class KafkaOutputMemory(object): """A very simple class to output the state.""" def __init__(self, parameter_list): @@ -147,55 +70,89 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, solution[param] = x_analysis[ii::7] self.output[timestep] = solution +def mkdir_p(path): + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise if __name__ == "__main__": - + + # Set up logging + Log = logging.getLogger(__name__+".kafka_test_x.py") + + + runname = 'Campinha_2017_Q0-1' #Used in output directory as a unique identifier + + # To run without propagation set propagator to None and set a + # prior in LinearKalman. + # If propagator is set to propagate_LAI_narrowbandSAIL then the + # prior in LinearKalman must be set to None because this propagator + # includes a prior + propagator = propagate_LAI_narrowbandSAIL + parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', 'lai', 'ala', 'bsoil', 'psoil'] - + start_time = "2017001" - + + Log.info("propagator = {}".format(propagator)) + Log.info("tile = {}".format(tile)) + Log.info("start_time = {}".format(start_time)) + + path = "/tmp/kafkaout_{}".format(runname) + if not os.path.exists(path): + mkdir_p(path) + #emulator_folder = "/home/ucfafyi/DATA/Multiply/emus/sail/" emulator_folder = "/home/glopez/Multiply/src/py36/emus/sail" - + #data_folder = "/data/nemesis/S2_data/30/S/WJ/" data_folder = "/data/001_planet_sentinel_study/sentinel/30/S/WJ" state_mask = "./Barrax_pivots.tif" - + + s2_observations = Sentinel2Observations(data_folder, - emulator_folder, + emulator_folder, state_mask) projection, geotransform = s2_observations.define_output() output = KafkaOutput(parameter_list, geotransform, - projection, "/tmp/") + projection, path) + # If using a separate prior then this is passed to LinearKalman + # Otherwise this is just used to set the starting state vector. the_prior = SAILPrior(parameter_list, state_mask) g = gdal.Open(state_mask) mask = g.ReadAsArray().astype(np.bool) + # prior = None when using propagate_LAI_narrowbandSAIL kf = LinearKalman(s2_observations, output, mask, create_prosail_observation_operator, parameter_list, - state_propagation=None, - prior=the_prior, + state_propagation=propagator, + prior=None,#the_prior, linear=False) # Get starting state... We can request the prior object for this x_forecast, P_forecast_inv = the_prior.process_prior(None) - + Q = np.zeros_like(x_forecast) - + Q[6::10] = 0.1 + kf.set_trajectory_model() kf.set_trajectory_uncertainty(Q) - - base = datetime(2017,7,3) - num_days = 10 - time_grid = list((base + timedelta(days=x) + + # This determines the time grid of the retrieved state parameters + base = datetime(2017,1,1) + num_days = 366 + time_grid = list((base + timedelta(days=x) for x in range(0, num_days, 2))) kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) - From d9c6df9ba09f9324c1315ce932f8838835380a1a Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 19:25:33 +0100 Subject: [PATCH 09/26] remove unecessary tile logging --- kafka_test_S2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka_test_S2.py b/kafka_test_S2.py index f40c304..b66ab3d 100644 --- a/kafka_test_S2.py +++ b/kafka_test_S2.py @@ -4,7 +4,7 @@ logging.basicConfig( level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - filename="Campinha_noprop_2017.log") + filename="the_log.log") import os from datetime import datetime, timedelta import numpy as np @@ -100,7 +100,6 @@ def mkdir_p(path): start_time = "2017001" Log.info("propagator = {}".format(propagator)) - Log.info("tile = {}".format(tile)) Log.info("start_time = {}".format(start_time)) path = "/tmp/kafkaout_{}".format(runname) @@ -143,6 +142,7 @@ def mkdir_p(path): # Get starting state... We can request the prior object for this x_forecast, P_forecast_inv = the_prior.process_prior(None) + # Inflation amount for propagation Q = np.zeros_like(x_forecast) Q[6::10] = 0.1 From daba58263aecdd04e9a389f9fa9680e951cc79bd Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 20:12:40 +0100 Subject: [PATCH 10/26] match filepaths to latest output --- kafka_test_Py36.py | 39 +++++++++++++++++++++------------------ kafka_test_S2.py | 14 +++++++------- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/kafka_test_Py36.py b/kafka_test_Py36.py index 46d0e5b..1c4e889 100644 --- a/kafka_test_Py36.py +++ b/kafka_test_Py36.py @@ -4,7 +4,7 @@ logging.basicConfig( level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - filename="the_log.log") + filename="logfiles/MODIS_demo.log") import os from datetime import datetime, timedelta @@ -80,7 +80,7 @@ def mkdir_p(path): # Set up logging Log = logging.getLogger(__name__+".kafka_test_x.py") - runname = 'Arros_0-25' #Used in output directory as a unique identifier + runname = 'Bondville_0-05' #Used in output directory as a unique identifier # To run without propagation set propagator to None and set a # prior in LinearKalman. @@ -92,24 +92,28 @@ def mkdir_p(path): "w_nir", "x_nir", "a_nir", "TeLAI"] ## parameters for Bondville data. - #tile = "h11v04" # Bondville - #start_time = "2006001" # Bondville - #cd43a1_dir="/data/MODIS/h11v04/MCD43" + tile = "h11v04" + start_time = "2006001" + time_grid_start = datetime(2006, 1, 1) + num_days = 366 + mcd43a1_dir="/data/MODIS/h11v04/MCD43" ## Bondville chip - #masklim = ((2200, 2450), (450, 700)) # Bondville, h11v04 - - # Parameters for Spanish tile - tile = "h17v05" # Spain - start_time = "2017001" # Spain - mcd43a1_dir="/data/MODIS/h17v05/MCD43" - # chips in h17v05 Spain, select one - masklim = ((650, 730), (1180, 1280)) # Arros, rice + masklim = ((2200, 2400), (450, 700)) # Bondville, h11v04 + + ## Parameters for Spanish tile + #tile = "h17v05" # Spain + #start_time = "2017001" # Spain + #time_grid_start = datetime(2017, 1, 1) + #num_days = 366 + #mcd43a1_dir="/data/MODIS/h17v05/MCD43" + ## chips in h17v05 Spain, select one + #masklim = ((650, 730), (1180, 1280)) # Arros, rice #masklim = ((900,940), (1300,1340)) = True # Alcornocales #masklim = ((640,700), (1400,1500)) = True # Campinha - path = "/tmp/kafkaout_{}".format(runname) + path = "/home/npounder/output/kafka/demo/MODIS/kafkaout_{}".format(runname) if not os.path.exists(path): mkdir_p(path) @@ -146,17 +150,16 @@ def mkdir_p(path): # Get starting state... We can request the prior object for this x_forecast, P_forecast_inv = the_prior.process_prior(None) + # Inflation amount for propagation Q = np.zeros_like(x_forecast) - Q[6::7] = 0.25 + Q[6::7] = 0.05 kf.set_trajectory_model() kf.set_trajectory_uncertainty(Q) # This determines the time grid of the retrieved state parameters - base = datetime(2017, 1, 1) - num_days = 366 time_grid = [] for x in range( 0, num_days, 8): - time_grid.append(base + timedelta(days=x)) + time_grid.append(time_grid_start + timedelta(days=x)) kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) \ No newline at end of file diff --git a/kafka_test_S2.py b/kafka_test_S2.py index b66ab3d..56be621 100644 --- a/kafka_test_S2.py +++ b/kafka_test_S2.py @@ -4,7 +4,7 @@ logging.basicConfig( level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - filename="the_log.log") + filename="logfiles/demo_Campinha_2017_Q0-05.log") import os from datetime import datetime, timedelta import numpy as np @@ -85,7 +85,7 @@ def mkdir_p(path): Log = logging.getLogger(__name__+".kafka_test_x.py") - runname = 'Campinha_2017_Q0-1' #Used in output directory as a unique identifier + runname = 'Campinha_2017_Q0-05' #Used in output directory as a unique identifier # To run without propagation set propagator to None and set a # prior in LinearKalman. @@ -98,11 +98,13 @@ def mkdir_p(path): 'lai', 'ala', 'bsoil', 'psoil'] start_time = "2017001" + time_grid_start = datetime(2017, 1, 1) + num_days = 366 Log.info("propagator = {}".format(propagator)) Log.info("start_time = {}".format(start_time)) - path = "/tmp/kafkaout_{}".format(runname) + path = "/home/npounder/output/kafka/demo/S2/kafkaout_{}".format(runname) if not os.path.exists(path): mkdir_p(path) @@ -144,15 +146,13 @@ def mkdir_p(path): # Inflation amount for propagation Q = np.zeros_like(x_forecast) - Q[6::10] = 0.1 + Q[6::10] = 0.05 kf.set_trajectory_model() kf.set_trajectory_uncertainty(Q) # This determines the time grid of the retrieved state parameters - base = datetime(2017,1,1) - num_days = 366 - time_grid = list((base + timedelta(days=x) + time_grid = list((time_grid_start + timedelta(days=x) for x in range(0, num_days, 2))) kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) From addc9413e939f141a4221d1e5615be0fb292e510 Mon Sep 17 00:00:00 2001 From: npounder Date: Wed, 20 Jun 2018 18:02:17 +0100 Subject: [PATCH 11/26] broadband sail tools --- kafka/inference/broadbandSAIL_tools.py | 97 ++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 kafka/inference/broadbandSAIL_tools.py diff --git a/kafka/inference/broadbandSAIL_tools.py b/kafka/inference/broadbandSAIL_tools.py new file mode 100644 index 0000000..447fc91 --- /dev/null +++ b/kafka/inference/broadbandSAIL_tools.py @@ -0,0 +1,97 @@ +import logging +import gdal +import numpy as np +import os + +from .utils import block_diag +from .kf_tools import propagate_single_parameter + + +def tip_prior_values(): + """The JRC-TIP prior in a convenient function which is fun for the whole + family. Note that the effective LAI is here defined in transformed space + where TLAI = exp(-0.5*LAIe). + + Returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + # broadly TLAI 0->7 for 1sigma + sigma = np.array([0.12, 0.7, 0.0959, 0.15, 1.5, 0.2, 0.5]) + x0 = np.array([0.17, 1.0, 0.1, 0.7, 2.0, 0.18, np.exp(-0.5*1.5)]) + # The individual covariance matrix + little_p = np.diag(sigma**2).astype(np.float32) + little_p[5, 2] = 0.8862*0.0959*0.2 + little_p[2, 5] = 0.8862*0.0959*0.2 + + inv_p = np.linalg.inv(little_p) + return x0, little_p, inv_p + + +class JRCPrior(object): + """Dummpy 2.7/3.6 prior class following the same interface as 3.6 only + version.""" + + def __init__(self, parameter_list, state_mask): + """It makes sense to have the list of parameters and state mask + defined at this point, as they won't change during processing.""" + self.mean, self.covar, self.inv_covar = self._tip_prior() + self.parameter_list = parameter_list + if isinstance(state_mask, (np.ndarray, np.generic) ): + self.state_mask = state_mask + else: + self.state_mask = self._read_mask(state_mask) + + def _read_mask(self, fname): + """Tries to read the mask as a GDAL dataset""" + if not os.path.exists(fname): + raise IOError("State mask is neither an array or a file that exists!") + g = gdal.Open(fname) + if g is None: + raise IOError("{:s} can't be opened with GDAL!".format(fname)) + mask = g.ReadAsArray() + return mask + + def _tip_prior(self): + """The JRC-TIP prior in a convenient function which is fun for the whole + family. Note that the effective LAI is here defined in transformed space + where TLAI = exp(-0.5*LAIe). + + Returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + mean, c_prior, c_inv_prior = tip_prior_values() + self.mean = mean + self.covar = c_prior + self.inv_covar = c_inv_prior + return mean, c_prior, c_inv_prior + + def process_prior ( self, time, inv_cov=True): + # Presumably, self._inference_prior has some method to retrieve + # a bunch of files for a given date... + n_pixels = self.state_mask.sum() + x0 = np.array([self.mean for i in range(n_pixels)]).flatten() + if inv_cov: + inv_covar_list = [self.inv_covar for m in range(n_pixels)] + inv_covar = block_diag(inv_covar_list, dtype=np.float32) + return x0, inv_covar + else: + covar_list = [self.covar for m in range(n_pixels)] + covar = block_diag(covar_list, dtype=np.float32) + return x0, covar + + +def propagate_LAI_broadbandSAIL(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + prior=None, state_propagator=None, date=None): + """Propagate a single parameter and + set the rest of the parameter propagations to the prior. + This should be used with a prior for the remaining parameters""" + nparameters = 7 + lai_position = 6 + x_prior, c_prior, c_inv_prior = tip_prior_values() + return propagate_single_parameter(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + nparameters, lai_position, + x_prior, c_inv_prior) From 2ac9a43352c1ced969aff1b13f734b37961f900f Mon Sep 17 00:00:00 2001 From: Nicola Pounder Date: Wed, 20 Jun 2018 18:03:03 +0100 Subject: [PATCH 12/26] Delete empty file --- kafka/inference/broadbandSAIL_tools | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 kafka/inference/broadbandSAIL_tools diff --git a/kafka/inference/broadbandSAIL_tools b/kafka/inference/broadbandSAIL_tools deleted file mode 100644 index e69de29..0000000 From 7955fa26fc927134cfe84b6ac100d13e947ce0ed Mon Sep 17 00:00:00 2001 From: gerardolopez Date: Thu, 21 Jun 2018 16:46:49 +0000 Subject: [PATCH 13/26] Save state to npz files --- kafka/inference/broadbandSAIL_tools.py | 97 ++++++++++++++++++++++++++ kafka/input_output/observations.py | 17 ++++- kafka_test_S2.py | 32 +++++++-- 3 files changed, 138 insertions(+), 8 deletions(-) create mode 100644 kafka/inference/broadbandSAIL_tools.py diff --git a/kafka/inference/broadbandSAIL_tools.py b/kafka/inference/broadbandSAIL_tools.py new file mode 100644 index 0000000..447fc91 --- /dev/null +++ b/kafka/inference/broadbandSAIL_tools.py @@ -0,0 +1,97 @@ +import logging +import gdal +import numpy as np +import os + +from .utils import block_diag +from .kf_tools import propagate_single_parameter + + +def tip_prior_values(): + """The JRC-TIP prior in a convenient function which is fun for the whole + family. Note that the effective LAI is here defined in transformed space + where TLAI = exp(-0.5*LAIe). + + Returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + # broadly TLAI 0->7 for 1sigma + sigma = np.array([0.12, 0.7, 0.0959, 0.15, 1.5, 0.2, 0.5]) + x0 = np.array([0.17, 1.0, 0.1, 0.7, 2.0, 0.18, np.exp(-0.5*1.5)]) + # The individual covariance matrix + little_p = np.diag(sigma**2).astype(np.float32) + little_p[5, 2] = 0.8862*0.0959*0.2 + little_p[2, 5] = 0.8862*0.0959*0.2 + + inv_p = np.linalg.inv(little_p) + return x0, little_p, inv_p + + +class JRCPrior(object): + """Dummpy 2.7/3.6 prior class following the same interface as 3.6 only + version.""" + + def __init__(self, parameter_list, state_mask): + """It makes sense to have the list of parameters and state mask + defined at this point, as they won't change during processing.""" + self.mean, self.covar, self.inv_covar = self._tip_prior() + self.parameter_list = parameter_list + if isinstance(state_mask, (np.ndarray, np.generic) ): + self.state_mask = state_mask + else: + self.state_mask = self._read_mask(state_mask) + + def _read_mask(self, fname): + """Tries to read the mask as a GDAL dataset""" + if not os.path.exists(fname): + raise IOError("State mask is neither an array or a file that exists!") + g = gdal.Open(fname) + if g is None: + raise IOError("{:s} can't be opened with GDAL!".format(fname)) + mask = g.ReadAsArray() + return mask + + def _tip_prior(self): + """The JRC-TIP prior in a convenient function which is fun for the whole + family. Note that the effective LAI is here defined in transformed space + where TLAI = exp(-0.5*LAIe). + + Returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + mean, c_prior, c_inv_prior = tip_prior_values() + self.mean = mean + self.covar = c_prior + self.inv_covar = c_inv_prior + return mean, c_prior, c_inv_prior + + def process_prior ( self, time, inv_cov=True): + # Presumably, self._inference_prior has some method to retrieve + # a bunch of files for a given date... + n_pixels = self.state_mask.sum() + x0 = np.array([self.mean for i in range(n_pixels)]).flatten() + if inv_cov: + inv_covar_list = [self.inv_covar for m in range(n_pixels)] + inv_covar = block_diag(inv_covar_list, dtype=np.float32) + return x0, inv_covar + else: + covar_list = [self.covar for m in range(n_pixels)] + covar = block_diag(covar_list, dtype=np.float32) + return x0, covar + + +def propagate_LAI_broadbandSAIL(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + prior=None, state_propagator=None, date=None): + """Propagate a single parameter and + set the rest of the parameter propagations to the prior. + This should be used with a prior for the remaining parameters""" + nparameters = 7 + lai_position = 6 + x_prior, c_prior, c_inv_prior = tip_prior_values() + return propagate_single_parameter(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + nparameters, lai_position, + x_prior, c_inv_prior) diff --git a/kafka/input_output/observations.py b/kafka/input_output/observations.py index cb6022e..8e291f6 100644 --- a/kafka/input_output/observations.py +++ b/kafka/input_output/observations.py @@ -325,7 +325,7 @@ def get_band_data(self, the_date, band_no): class KafkaOutput(object): """A very simple class to output the state.""" def __init__(self, parameter_list, geotransform, projection, folder, - fmt="GTiff"): + fmt="GTiff", state_folder:str=None): """The inference engine works on tiles, so we get the tilewidth (we assume the tiles are square), the GDAL-friendly geotransform and projection, as well as the destination directory and the @@ -333,6 +333,7 @@ def __init__(self, parameter_list, geotransform, projection, folder, self.geotransform = geotransform self.projection = projection self.folder = folder + self.state_folder = state_folder self.fmt = fmt self.parameter_list = parameter_list @@ -352,6 +353,7 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, A = np.zeros(state_mask.shape, dtype=np.float32) A[state_mask] = x_analysis[ii::n_params] dst_ds.GetRasterBand(1).WriteArray(A) + for ii, param in enumerate(self.parameter_list): fname = os.path.join(self.folder, "%s_%s_unc.tif" % (param, timestep.strftime("A%Y%j"))) @@ -366,7 +368,20 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, A[state_mask] = 1./np.sqrt(P_analysis_inv.diagonal()[ii::n_params]) dst_ds.GetRasterBand(1).WriteArray(A) + if not self.state_folder == None and os.path.exists( self.state_folder ): + # Dump to disk P_analysis_inv as sparse matrix in npz + fname = os.path.join(self.state_folder, "P_analysis_inv_%s.npz" % + ( timestep.strftime("A%Y%j") ) ) + sp.save_npz( fname, P_analysis_inv ) + + fname = os.path.join(self.state_folder, "P_analysis_%s.npz" % + ( timestep.strftime("A%Y%j") ) ) + np.savez( fname, P_analysis ) + # Dump as well the whole x_analysis in a single vector + fname = os.path.join(self.state_folder, "x_analysis_%s.npz" % + ( timestep.strftime("A%Y%j") ) ) + np.savez( fname, x_analysis ) if __name__ == "__main__": emulator = "../SAIL_emulator_both_500trainingsamples.pkl" diff --git a/kafka_test_S2.py b/kafka_test_S2.py index 56be621..01b106e 100644 --- a/kafka_test_S2.py +++ b/kafka_test_S2.py @@ -5,6 +5,7 @@ level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename="logfiles/demo_Campinha_2017_Q0-05.log") + import os from datetime import datetime, timedelta import numpy as np @@ -97,22 +98,26 @@ def mkdir_p(path): parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', 'lai', 'ala', 'bsoil', 'psoil'] - start_time = "2017001" - time_grid_start = datetime(2017, 1, 1) + #start_time = "2017001" + #time_grid_start = datetime(2017, 1, 1) + start_time = "2017049" + time_grid_start = datetime(2017, 2, 18) num_days = 366 Log.info("propagator = {}".format(propagator)) Log.info("start_time = {}".format(start_time)) - path = "/home/npounder/output/kafka/demo/S2/kafkaout_{}".format(runname) + path = "/tmp/kafka/demo/S2/kafkaout_{}".format(runname) if not os.path.exists(path): mkdir_p(path) #emulator_folder = "/home/ucfafyi/DATA/Multiply/emus/sail/" - emulator_folder = "/home/glopez/Multiply/src/py36/emus/sail" + #emulator_folder = "/home/glopez/Multiply/src/py36/emus/sail" + emulator_folder = "/data/archive/emulators/s2_prosail" #data_folder = "/data/nemesis/S2_data/30/S/WJ/" - data_folder = "/data/001_planet_sentinel_study/sentinel/30/S/WJ" + #data_folder = "/data/001_planet_sentinel_study/sentinel/30/S/WJ" + data_folder = "/data/S2_L2/30/S/WJ/" state_mask = "./Barrax_pivots.tif" @@ -141,8 +146,20 @@ def mkdir_p(path): prior=None,#the_prior, linear=False) - # Get starting state... We can request the prior object for this - x_forecast, P_forecast_inv = the_prior.process_prior(None) + # Check if there's a prior from a previous timestep + P_inv_fname = "P_analysis_inv_%s.npz" % time_grid_start.strftime("A%Y%j") + P_inv_fname = os.path.join( path, P_inv_fname ) + + x_fname = "x_analysis_%s.npz" % time_grid_start.strftime("A%Y%j") + x_fname = os.path.join( path, x_fname ) + + if os.path.exists( P_inv_fname ) and os.path.exists( x_fname ): + # Load stored matrices... + x_forecast = np.load( x_fname )['arr_0'] + P_forecast_inv = sp.load_npz( P_inv_fname ) + else: + # Get starting state... We can request the prior object for this + x_forecast, P_forecast_inv = the_prior.process_prior(None) # Inflation amount for propagation Q = np.zeros_like(x_forecast) @@ -156,3 +173,4 @@ def mkdir_p(path): for x in range(0, num_days, 2))) kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) + From 8119c6cd1e3cc7a84082b4ae7ac8770b2b42dc71 Mon Sep 17 00:00:00 2001 From: npounder Date: Mon, 20 Aug 2018 15:50:54 +0100 Subject: [PATCH 14/26] Hessian calc for prosail handles case with masked elements --- kafka/inference/utils.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/kafka/inference/utils.py b/kafka/inference/utils.py index 6f21a46..c7bd5c4 100644 --- a/kafka/inference/utils.py +++ b/kafka/inference/utils.py @@ -228,7 +228,15 @@ def create_prosail_observation_operator(n_params, emulator, metadata, LOG.info("\tDone!") if calc_hess: - hess = emulator.hessian(x0[mask[state_mask]]) + hess = np.zeros((n_times, n_params, n_params), + dtype=np.float32) + hess_ = emulator.hessian(x0[mask[state_mask]]) + + n = 0 + for i, m in enumerate(mask[state_mask].flatten()): + if m: + hess[i, ...] = hess_[n] + n += 1 return (H0, H_matrix.tocsr(), hess) if calc_hess else (H0, H_matrix.tocsr()) @@ -357,7 +365,7 @@ def spsolve2(a, b): a_lu = spl.splu(a.tocsc()) # LU decomposition for sparse a out = sp.lil_matrix((a.shape[1], b.shape[1]), dtype=np.float32) b_csc = b.tocsc() - for j in xrange(b.shape[1]): + for j in range(b.shape[1]): bb = np.array(b_csc[j, :].todense()).squeeze() out[j, j] = a_lu.solve(bb)[j] return out.tocsr() From b4504b4ca4d98278e06a18aef9d2328bd2f58296 Mon Sep 17 00:00:00 2001 From: npounder Date: Mon, 20 Aug 2018 18:34:54 +0100 Subject: [PATCH 15/26] place error in logfile when a retrieval of a fully masked observation is attempted --- kafka/linear_kf.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index 1da2811..c9d9e05 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -183,8 +183,8 @@ def run(self, time_grid, x_forecast, P_forecast, P_forecast_inverse, x_forecast, P_forecast, P_forecast_inverse = self.advance( x_analysis, P_analysis, P_analysis_inverse, self.trajectory_model, self.trajectory_uncertainty) - is_first = False + if len(locate_times) == 0: # Just advance the time x_analysis = x_forecast @@ -256,14 +256,22 @@ def do_all_bands(self, timestep, current_data, x_forecast, P_forecast, # Also extract single band information from nice package # this allows us to use the same interface as current # Deferring processing to a new solver method in solvers.py - - H_matrix_= self._create_observation_operator(self.n_params, + + try: + H_matrix_= self._create_observation_operator(self.n_params, data.emulator, data.metadata, data.mask, self.state_mask, x_prev, - band) + band, + calc_hess = False) + except ValueError as e: + if (sum(data.mask[self.state_mask])== 0): + LOG.error("All observations masked out") + raise + + H_matrix.append(H_matrix_) Y.append(data.observations) MASK.append(data.mask) From 34256d9658d46bd1bfb681edd1ac18f5648302b7 Mon Sep 17 00:00:00 2001 From: Tonio Fincke Date: Thu, 29 Nov 2018 14:06:36 +0100 Subject: [PATCH 16/26] return 0 when value error --- kafka/inference/kf_tools.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/kafka/inference/kf_tools.py b/kafka/inference/kf_tools.py index 46ed9cb..d7a4db4 100644 --- a/kafka/inference/kf_tools.py +++ b/kafka/inference/kf_tools.py @@ -51,13 +51,17 @@ def hessian_correction_multiband(hessians, R_mats, innovations, masks, state_mask, n_bands, nparams): """ Non linear correction for the Hessian of the cost function. This handles multiple bands. """ - little_hess_cor = [] - for R, hessian, innovation, mask in zip( - R_mats, hessians, innovations, masks): - little_hess_cor.append(hessian_correction(hessian, R, innovation, - mask, state_mask, nparams)) - hessian_corr = sum(little_hess_cor) - return hessian_corr + R, hessian, innovation, mask = 0 + try: + little_hess_cor = [] + for R, hessian, innovation, mask in zip( + R_mats, hessians, innovations, masks): + little_hess_cor.append(hessian_correction(hessian, R, innovation, mask, state_mask, nparams)) + hessian_corr = sum(little_hess_cor) + return hessian_corr + except ValueError: + logging.info(R.shape, hessian.shape, innovation.shape, mask.shape) + return 0 def blend_prior(prior_mean, prior_cov_inverse, x_forecast, P_forecast_inverse): From 0a9dacd7e49244dec3ebf049bd1971ec23100db3 Mon Sep 17 00:00:00 2001 From: Tonio Fincke Date: Thu, 29 Nov 2018 14:19:10 +0100 Subject: [PATCH 17/26] actually bail out after 25 iterations --- kafka/linear_kf.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index c9d9e05..667042d 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -240,12 +240,12 @@ def assimilate_multiple_bands(self, locate_times, x_forecast, P_forecast, def do_all_bands(self, timestep, current_data, x_forecast, P_forecast, P_forecast_inverse, convergence_tolerance=1e-3, min_iterations=2): - not_converged = True + converged = False # Linearisation point is set to x_forecast for first iteration x_prev = x_forecast*1. n_iter = 1 n_bands = len(current_data) - while not_converged: + while not converged: Y = [] MASK = [] UNC = [] @@ -300,11 +300,11 @@ def do_all_bands(self, timestep, current_data, x_forecast, P_forecast, if (convergence_norm < convergence_tolerance) and ( n_iter >= min_iterations): # Converged! - not_converged = False - elif n_iter > 25: + converged = True + elif n_iter >= 25: # Too many iterations LOG.warning("Bailing out after 25 iterations!!!!!!") - not_converged = False + converged = True x_prev = x_analysis*1. n_iter += 1 From d9119a7d28ff3a4ed5368f720ac6e20c6f750047 Mon Sep 17 00:00:00 2001 From: Tonio Fincke Date: Fri, 30 Nov 2018 12:55:51 +0100 Subject: [PATCH 18/26] fix --- kafka/inference/kf_tools.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kafka/inference/kf_tools.py b/kafka/inference/kf_tools.py index d7a4db4..4789ad8 100644 --- a/kafka/inference/kf_tools.py +++ b/kafka/inference/kf_tools.py @@ -51,7 +51,10 @@ def hessian_correction_multiband(hessians, R_mats, innovations, masks, state_mask, n_bands, nparams): """ Non linear correction for the Hessian of the cost function. This handles multiple bands. """ - R, hessian, innovation, mask = 0 + R = [] + hessian = [] + innovation = [] + mask = [] try: little_hess_cor = [] for R, hessian, innovation, mask in zip( From 29389fa7dff37ad6c909e562bee48a6c84891b67 Mon Sep 17 00:00:00 2001 From: Tonio Fincke Date: Fri, 30 Nov 2018 12:56:11 +0100 Subject: [PATCH 19/26] removed unused imports --- kafka/linear_kf.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index 667042d..8ea29bc 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -27,15 +27,8 @@ import scipy.sparse as sp -# from scipy.spatial.distance import squareform, pdist - -# from utils import matrix_squeeze, spsolve2, reconstruct_array - from .inference import variational_kalman from .inference import variational_kalman_multiband -from .inference import locate_in_lut, run_emulator, create_uncertainty -from .inference import create_linear_observation_operator -from .inference import create_nonlinear_observation_operator from .inference import iterate_time_grid from .inference import hessian_correction from .inference import hessian_correction_multiband From 039485601df2f232210d3e5b23291f893cf75c36 Mon Sep 17 00:00:00 2001 From: Tonio Fincke Date: Fri, 30 Nov 2018 12:57:37 +0100 Subject: [PATCH 20/26] performance improvement --- kafka/inference/utils.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/kafka/inference/utils.py b/kafka/inference/utils.py index c7bd5c4..aee3f7c 100644 --- a/kafka/inference/utils.py +++ b/kafka/inference/utils.py @@ -66,7 +66,7 @@ def run_emulator(gp, x, tol=None): # We select the unique values in vector x # Note that we could have done this using e.g. a histogram # or some other method to select solutions "close enough" - unique_vectors = np.vstack({tuple(row) for row in x}) + unique_vectors, unique_indices, unique_inverse = np.unique(x, axis=0, return_index=True, return_inverse=True) if len(unique_vectors) == 1: # Prior! cluster_labels = np.zeros(x.shape[0], dtype=np.int16) elif len(unique_vectors) > 1e6: @@ -80,26 +80,24 @@ def run_emulator(gp, x, tol=None): # Assign each element of x to a LUT/cluster entry cluster_labels = locate_in_lut(unique_vectors, x) # Runs emulator for emulation subset - try: - H_, dH_ = gp.predict(unique_vectors, do_unc=False) - except ValueError: - # Needed for newer gp version - H_, _, dH_ = gp.predict(unique_vectors, do_unc=False) + prediction = gp.predict(unique_vectors, do_unc=False) + if len(prediction) == 2: + H_, dH_ = prediction + else: + H_, _, dH_ = prediction H = np.zeros(x.shape[0]) dH = np.zeros_like(x) - try: + if 'cluster_labels' in locals(): nclust = cluster_labels.shape - except NameError: - for i, uniq in enumerate(unique_vectors): - passer = np.all(x == uniq, axis=1) - H[passer] = H_[i] - dH[passer, :] = dH_[i, :] + for label in np.unique(cluster_labels): + H[cluster_labels == label] = H_[label] + dH[cluster_labels == label, :] = dH_[label, :] return H, dH - - for label in np.unique(cluster_labels): - H[cluster_labels == label] = H_[label] - dH[cluster_labels == label, :] = dH_[label, :] + H[unique_indices] = H_ + H = H[unique_inverse] + dH[unique_indices] = dH_ + dH = dH[unique_inverse] return H, dH @@ -160,6 +158,7 @@ def create_nonlinear_observation_operator(n_params, emulator, metadata, # It might be here that we do some sort of clustering H0_, dH = run_emulator(emulator, x0[mask[state_mask]]) + H0_, dH = run_emulator2(emulator, x0[mask[state_mask]]) LOG.info("Storing emulators in H matrix") # This loop can be JIT'ed too @@ -215,6 +214,7 @@ def create_prosail_observation_operator(n_params, emulator, metadata, # It might be here that we do some sort of clustering H0_, dH = run_emulator(emulator, x0[mask[state_mask]]) + H0_, dH = run_emulator2(emulator, x0[mask[state_mask]]) LOG.info("Storing emulators in H matrix") # This loop can be JIT'ed too From ca1cab9488ad79cc57589e550fe5ff63f1440a94 Mon Sep 17 00:00:00 2001 From: Tonio Fincke Date: Fri, 30 Nov 2018 13:26:47 +0100 Subject: [PATCH 21/26] fix --- kafka/inference/utils.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/kafka/inference/utils.py b/kafka/inference/utils.py index aee3f7c..70dc6dc 100644 --- a/kafka/inference/utils.py +++ b/kafka/inference/utils.py @@ -30,9 +30,6 @@ import scipy.sparse as sp import scipy.sparse.linalg as spl -import datetime as dt -import os -import gdal import logging LOG = logging.getLogger(__name__) @@ -158,7 +155,6 @@ def create_nonlinear_observation_operator(n_params, emulator, metadata, # It might be here that we do some sort of clustering H0_, dH = run_emulator(emulator, x0[mask[state_mask]]) - H0_, dH = run_emulator2(emulator, x0[mask[state_mask]]) LOG.info("Storing emulators in H matrix") # This loop can be JIT'ed too @@ -214,7 +210,6 @@ def create_prosail_observation_operator(n_params, emulator, metadata, # It might be here that we do some sort of clustering H0_, dH = run_emulator(emulator, x0[mask[state_mask]]) - H0_, dH = run_emulator2(emulator, x0[mask[state_mask]]) LOG.info("Storing emulators in H matrix") # This loop can be JIT'ed too From b32339eeff8770fd221475829eb2950e1cffd1e9 Mon Sep 17 00:00:00 2001 From: Tonio Fincke Date: Mon, 3 Dec 2018 16:41:22 +0100 Subject: [PATCH 22/26] also write out state mask --- kafka/input_output/observations.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kafka/input_output/observations.py b/kafka/input_output/observations.py index 8e291f6..94ef238 100644 --- a/kafka/input_output/observations.py +++ b/kafka/input_output/observations.py @@ -383,6 +383,11 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, ( timestep.strftime("A%Y%j") ) ) np.savez( fname, x_analysis ) + # ... and the state mask ... + fname = os.path.join(self.state_folder, "state_mask_%s.npz" % + ( timestep.strftime("A%Y%j") ) ) + np.savez( fname, state_mask ) + if __name__ == "__main__": emulator = "../SAIL_emulator_both_500trainingsamples.pkl" tile = "h17v05" From f289504686c778f06653de1ee5532a0b8cbf961c Mon Sep 17 00:00:00 2001 From: Tonio Fincke Date: Tue, 4 Dec 2018 15:34:29 +0100 Subject: [PATCH 23/26] advance at the end of the loop not at the beginning --- kafka/inference/utils.py | 7 +------ kafka/input_output/observations.py | 15 ++++++++------- kafka/linear_kf.py | 21 ++++++++------------- 3 files changed, 17 insertions(+), 26 deletions(-) diff --git a/kafka/inference/utils.py b/kafka/inference/utils.py index 70dc6dc..7264897 100644 --- a/kafka/inference/utils.py +++ b/kafka/inference/utils.py @@ -36,7 +36,6 @@ def iterate_time_grid(time_grid, the_dates): - is_first = True istart_date = time_grid[0] for ii, timestep in enumerate(time_grid[1:]): # First locate all available observations for time step of interest. @@ -52,11 +51,7 @@ def iterate_time_grid(time_grid, the_dates): for iobs in locate_times: LOG.info("\t->{}".format(iobs.strftime("%Y-%m-%d"))) istart_date = timestep - if is_first: - yield timestep, locate_times, True - is_first = False - else: - yield timestep, locate_times, False + yield timestep, locate_times def run_emulator(gp, x, tol=None): diff --git a/kafka/input_output/observations.py b/kafka/input_output/observations.py index 94ef238..159d808 100644 --- a/kafka/input_output/observations.py +++ b/kafka/input_output/observations.py @@ -368,15 +368,15 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, A[state_mask] = 1./np.sqrt(P_analysis_inv.diagonal()[ii::n_params]) dst_ds.GetRasterBand(1).WriteArray(A) + def dump_state(self, timestep, x_analysis, P_analysis, P_analysis_inv, state_mask): if not self.state_folder == None and os.path.exists( self.state_folder ): # Dump to disk P_analysis_inv as sparse matrix in npz - fname = os.path.join(self.state_folder, "P_analysis_inv_%s.npz" % - ( timestep.strftime("A%Y%j") ) ) - sp.save_npz( fname, P_analysis_inv ) - - fname = os.path.join(self.state_folder, "P_analysis_%s.npz" % - ( timestep.strftime("A%Y%j") ) ) - np.savez( fname, P_analysis ) + if P_analysis_inv is not None: + fname = os.path.join(self.state_folder, "P_analysis_inv_%s.npz" % ( timestep.strftime("A%Y%j") ) ) + sp.save_npz( fname, P_analysis_inv ) + if P_analysis is not None: + fname = os.path.join(self.state_folder, "P_analysis_%s.npz" % ( timestep.strftime("A%Y%j") ) ) + np.savez( fname, P_analysis ) # Dump as well the whole x_analysis in a single vector fname = os.path.join(self.state_folder, "x_analysis_%s.npz" % @@ -388,6 +388,7 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, ( timestep.strftime("A%Y%j") ) ) np.savez( fname, state_mask ) + if __name__ == "__main__": emulator = "../SAIL_emulator_both_500trainingsamples.pkl" tile = "h17v05" diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index 8ea29bc..662a5c6 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -166,18 +166,9 @@ def run(self, time_grid, x_forecast, P_forecast, P_forecast_inverse, The time_grid ought to be a list with the time steps given in the same form as self.observation_times""" - for timestep, locate_times, is_first in iterate_time_grid( - time_grid, self.observations.dates): - + for timestep, locate_times in iterate_time_grid(time_grid, self.observations.dates): self.current_timestep = timestep - if not is_first: - LOG.info("Advancing state, %s" % timestep.strftime("%Y-%m-%d")) - x_forecast, P_forecast, P_forecast_inverse = self.advance( - x_analysis, P_analysis, P_analysis_inverse, - self.trajectory_model, self.trajectory_uncertainty) - is_first = False - if len(locate_times) == 0: # Just advance the time x_analysis = x_forecast @@ -187,7 +178,6 @@ def run(self, time_grid, x_forecast, P_forecast, P_forecast_inverse, else: # We do have data, so we assimilate - x_analysis, P_analysis, P_analysis_inverse = self.assimilate_multiple_bands( locate_times, x_forecast, P_forecast, P_forecast_inverse, @@ -196,8 +186,13 @@ def run(self, time_grid, x_forecast, P_forecast, P_forecast_inverse, iter_obs_op=iter_obs_op, is_robust=is_robust, diag_str=diag_str) LOG.info("Dumping results to disk") - self.output.dump_data(timestep, x_analysis, P_analysis, - P_analysis_inverse, self.state_mask, self.n_params) + self.output.dump_data(timestep, x_analysis, P_analysis, P_analysis_inverse, self.state_mask, self.n_params) + LOG.info("Advancing state, %s" % timestep.strftime("%Y-%m-%d")) + x_forecast, P_forecast, P_forecast_inverse = self.advance(x_analysis, P_analysis, P_analysis_inverse, + self.trajectory_model, + self.trajectory_uncertainty) + self.output.dump_state(timestep, x_forecast, P_forecast, P_forecast_inverse, self.state_mask) + def assimilate_multiple_bands(self, locate_times, x_forecast, P_forecast, P_forecast_inverse, From f2fc4b12e3d90b96bb81af4ff111590da7df542c Mon Sep 17 00:00:00 2001 From: Tonio Fincke Date: Tue, 11 Dec 2018 18:29:18 +0100 Subject: [PATCH 24/26] do not perform hessian correction --- kafka/linear_kf.py | 58 ++++++++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index 662a5c6..eaed7c2 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -300,27 +300,28 @@ def do_all_bands(self, timestep, current_data, x_forecast, P_forecast, # Once we have converged... # Correct hessian for higher order terms #split_points = [m.sum( ) for m in MASK] - HESSIAN = [] - INNOVATIONS = np.split(innovations, n_bands) - for band, data in enumerate(current_data): + #todo include this part. Rather than commenting out, we should decide whether to correct or not + # HESSIAN = [] + # INNOVATIONS = np.split(innovations, n_bands) + # for band, data in enumerate(current_data): # calculate the hessian for the solution - _,_,hessian= self._create_observation_operator(self.n_params, - data.emulator, - data.metadata, - data.mask, - self.state_mask, - x_analysis, - band, - calc_hess = True) - HESSIAN.append(hessian) - P_correction = hessian_correction_multiband(HESSIAN, - UNC, INNOVATIONS, MASK, - self.state_mask, n_bands, - self.n_params) - P_analysis_inverse = P_analysis_inverse - P_correction + # _,_,hessian= self._create_observation_operator(self.n_params, + # data.emulator, + # data.metadata, + # data.mask, + # self.state_mask, + # x_analysis, + # band, + # calc_hess = True) + # HESSIAN.append(hessian) + # P_correction = hessian_correction_multiband(HESSIAN, + # UNC, INNOVATIONS, MASK, + # self.state_mask, n_bands, + # self.n_params) + # P_analysis_inverse = P_analysis_inverse - P_correction # Rarely, this returns a small negative value. For now set to nan. # May require further investigation in the future - P_analysis_inverse[P_analysis_inverse<0] = np.nan + # P_analysis_inverse[P_analysis_inverse<0] = np.nan # Done with this observation, move along... @@ -412,20 +413,21 @@ def assimilate_band(self, band, timestep, x_forecast, P_forecast, x_prev = x_analysis n_iter += 1 + #todo include this part. Rather than commenting out, we should decide whether to correct or not # Correct hessian for higher order terms - P_correction = hessian_correction(data.emulator, x_analysis, - data.uncertainty, innovations, - data.mask, self.state_mask, band, - self.n_params) + # P_correction = hessian_correction(data.emulator, x_analysis, + # data.uncertainty, innovations, + # data.mask, self.state_mask, band, + # self.n_params) # UPDATE HESSIAN WITH HIGHER ORDER CONTRIBUTION - P_analysis_inverse = P_analysis_inverse - P_correction + # P_analysis_inverse = P_analysis_inverse - P_correction # Rarely, this returns a small negative value. For now set to nan. # May require further investigation in the future - negative_values = P_analysis_inverse<0 - if any(negative_values): - P_analysis_inverse[negative_values] = np.nan - LOG.warning("{} negative values in inverse covariance matrix".format( - sum(negative_values))) + # negative_values = P_analysis_inverse<0 + # if any(negative_values): + # P_analysis_inverse[negative_values] = np.nan + # LOG.warning("{} negative values in inverse covariance matrix".format( + # sum(negative_values))) import matplotlib.pyplot as plt From 29170e26eebc417743e19b314bbec2239ddeb467 Mon Sep 17 00:00:00 2001 From: Tonio Fincke Date: Thu, 27 Jun 2019 15:35:17 +0200 Subject: [PATCH 25/26] added version --- kafka/__init__.py | 1 + kafka/version.py | 1 + setup.py | 5 +++++ 3 files changed, 7 insertions(+) create mode 100644 kafka/version.py diff --git a/kafka/__init__.py b/kafka/__init__.py index 92f30df..04f407a 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -1,3 +1,4 @@ +from .version import __version__ from .inference import * from .input_output import * from .linear_kf import LinearKalman diff --git a/kafka/version.py b/kafka/version.py new file mode 100644 index 0000000..5e3048b --- /dev/null +++ b/kafka/version.py @@ -0,0 +1 @@ +__version__ = '0.1' \ No newline at end of file diff --git a/setup.py b/setup.py index c2a3ed1..9312531 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,12 @@ 'matplotlib' ] +__version__ = None +with open('kafka/version.py') as f: + exec(f.read()) + setup(name='KaFKA', + version=__version__, description='MULTIPLY KaFKA inference engine', author='MULTIPLY Team', packages=find_packages(), From 0432dfed34c09e2860fb5b90fd526977e5f52675 Mon Sep 17 00:00:00 2001 From: Tonio Fincke Date: Thu, 7 Nov 2019 15:18:04 +0100 Subject: [PATCH 26/26] updated requirements --- setup.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/setup.py b/setup.py index 9312531..75d58d3 100644 --- a/setup.py +++ b/setup.py @@ -2,14 +2,15 @@ from setuptools import setup, find_packages -requirements = [ - 'pytest', - 'numpy', - 'scipy', - 'gdal', +# requirements = [ +# 'pytest', +# 'numpy', +# 'scipy', +# 'gdal', # 'BRDF_descriptors', # Not available for automatic installation - 'matplotlib' -] + # 'matplotlib' +# ] +requirements = [] __version__ = None with open('kafka/version.py') as f: