diff --git a/ENDA.py b/ENDA.py index 58d2aab3146375e2223937d8f64c5508d4a6297f..17910dbd70e0ec21ca0a60e3317e5ebaaf226169 100644 --- a/ENDA.py +++ b/ENDA.py @@ -5,6 +5,7 @@ from scipy.special import erfinv,erf # Own modules from models import CBL +from observations import * # TODO: comprehensive documentation is missing. @@ -275,7 +276,6 @@ class cycle: trun,\ assimilation_interval,\ ocoord,\ - truths,\ observations,\ obs_error_sdev_assimilate,\ localization_cutoff,\ @@ -319,17 +319,6 @@ class cycle: # Turn inflation coefficients into an array inflation_coefficients_rtps = np.ones(state_vector_length)*inflation_rtps_alpha - # Compute distances between observations and state vector variables - dist = compute_distances(xcoord,ocoord) - - # Replace any NaN distances with 0 - # NaN distances only occur when the state is augmented with global - # parameters, whose location is undefined. Setting the distance to - # global parameters equal to zero ensures that the covariance - # matrix for global parameters is identity, as required by - # global updating. - np.nan_to_num(dist,copy=False) - # Initialize ensemble # Get the right model class from the nature run. # The class must have "initialize", "run" and "update" methods. @@ -351,6 +340,17 @@ class cycle: for k in range(ncycles): if verbose: print('Cycle %04u : '%k,end='') + # Compute distances between current observations and state vector variables + dist = compute_distances(xcoord,ocoord[k,:]) + + # Replace any NaN distances with 0 + # NaN distances only occur when the state is augmented with global + # parameters, whose location is undefined. Setting the distance to + # global parameters equal to zero ensures that the covariance + # matrix for global parameters is identity, as required by + # global updating. + np.nan_to_num(dist,copy=False) + # Integrate up to analysis time da.maxtime = assimilation_interval output_full_history = False @@ -379,7 +379,7 @@ class cycle: updates,model_equivalents[k,:,:],cov_xx,cov_xy,cov_yy_dum,innov_dum,increm_dum = eakf(backgrounds[k,:,:],observations[k,:], obs_error_sdev_assimilate, xcoord, - ocoord, + ocoord[k,:], dist, localization_cutoff=localization_cutoff,return_covariances_increments_and_innovations=True) increments[k,:,-npar] = increm_dum[:,-npar] @@ -391,14 +391,14 @@ class cycle: updates,model_equivalents[k,:,:] = eakf(backgrounds[k,:,:],observations[k,:], obs_error_sdev_assimilate, xcoord, - ocoord, + ocoord[k,:], dist, localization_cutoff=localization_cutoff) elif FILTER == 'LETKF': updates,model_equivalents[k,:,:] = letkf(backgrounds[k,:,:],observations[k,:], obs_error_sdev_assimilate, xcoord, - ocoord, + ocoord[k,:], dist, localization_cutoff=localization_cutoff, inflation_rho=1.0) @@ -428,7 +428,6 @@ class cycle: self.initial_state = initial_state self.backgrounds = backgrounds self.analyses = analyses - self.truths = truths self.obs_coordinates = ocoord self.observations = observations self.model_equivalents = model_equivalents @@ -446,6 +445,7 @@ class cycle: self.innov = innov self.increments = increments +""" class experiment: def __init__(self,settings): # Read attributes from dictionary of settings @@ -550,6 +550,192 @@ class experiment: # Compute diagnostics if not hasattr(self,'dg'): self.dg = diagnostics(self.da) +""" +class experiment: + def __init__(self,settings): + # Read attributes from dictionary of settings + for k, v in settings.items(): + setattr(self, k, v) + if 0 < self.inflation_rtps_alpha <= 1: + self.inflate_rtps = True + else: + self.inflate_rtps = False + + ################################################################################################ + # CASE 1: synthetic observations (OSSE with nature run) + if not hasattr(self,'nr') and self.type == 'OSSE': + # Consistency check on observation info + nobs = self.obs_coordinates.size + assert (self.obs_coordinates.size==self.obs_kinds.size==self.obs_error_sdev_assimilate.size==self.obs_error_sdev_generate.size),\ + f'obs_coordinates, obs_kinds and obs_error_sdev must have the same size,\ + got {self.obs_coordinates.size,self.obs_kinds.size,self.obs_error_sdev_assimilate.size,self.obs_error_sdev_assimilate.size}' + + # Consistency check on time coordinates + nassim = self.trun/self.assimilation_interval + assert (nassim.is_integer()),\ + f'trun/assimilation_interval must be integer,\ + got {self.trun,self.assimilation_interval,self.trun/self.assimilation_interval}' + + # If you don't have a pre-existing nature run, you need to run it + + # Define initial conditions of nature run + if self.tspinup > 0: + # Create initial state with random perturbations, then integrate + # one deterministic run for the spinup period + # Initial condition for the nature run is the last state of the spinup run + if verbose: print('Spinup : ',end='') + sp = CBL(self.cbl_settings) + sp.maxtime = self.tspinup + sp.initialize(1) + sp.run() + initial_conditions = sp.x + coords = sp.state_coordinates + else: + # No spinup, nature run gets unperturbed initial condition + sp = CBL(self.cbl_settings) + sp.initialize(1) + initial_conditions = sp.x0 + coords = sp.state_coordinates + + # Nature run: initialize, then integrate + if verbose: print('Nature run : ',end='') + nr = CBL(self.cbl_settings) + nr.maxtime = self.trun + nr.initialize(initial_conditions,coordinates=coords) + nr.run(output_full_history=True) + + # Draw observations from nature run + # Need nested loops because observation times (k) are independent + # and observation error variance depends on location (kk) + state_coordinates = nr.zt + truths = np.zeros((int(nassim),nobs))+np.nan + observations = np.zeros((int(nassim),nobs))+np.nan + for i in range(int(nassim)): + valid_time = (i+1)*self.assimilation_interval + assert(np.mod(self.assimilation_interval,nr.dt)==0), f'Assimilation interval must be a multiple of dt, got {self.assimilation_interval,nr.dt}' + time_index = np.argwhere(nr.history['time'] == valid_time)[0][0] + for j in range(nobs): + obs_coordinate = self.obs_coordinates[j] + if self.obs_kinds[j]=='theta': + variable = nr.history['theta'][:,time_index] + elif self.obs_kinds[j]=='u': + variable = nr.history['u'][:,time_index] + elif self.obs_kinds[j]=='v': + variable = nr.history['v'][:,time_index] + truths[i,j] = observation_operator(variable,state_coordinates,obs_coordinate) + # Change the seed every time you go through this, + # otherwise the perturbations are always the same; + # and you have bias instead of randomness + seed = self.rnseed+j*1000+i*100000 + RNG = np.random.default_rng(seed=seed) + observations[i,j] = truths[i,j]+\ + RNG.normal(0,self.obs_error_sdev_generate[j]) + + # In this case the observations coordinates are the same at all analysis times, + # so a 1-d vector. Expand it to 2d, with the same dimensions + # as the observations vector + if self.obs_coordinates.ndim == 1: + ocoords = self.obs_coordinates[None,:]+np.zeros(observations.shape) + + # Randomize order of observations + if self.randomize_obs: + observations,ocoords,truths = random_sort_observations(observations,ocoords,truths) + + # Store nature run, truths and observations + self.nr = nr + self.truths = truths + self.observations = observations + self.obs_coordinates = ocoords + + ################################################################################################ + # CASE 2: real (or pseudo-) observations + if not hasattr(self,'nr') and self.type == 'real': + + # Read observations and their coordinates + observations, ocoords = decode_observations(self.observation_files) + assert (observations.shape == ocoords.shape),\ + f'observation and coordinate arrays must have the same size' + + # Consistency check on time coordinates + nassim = self.trun/self.assimilation_interval + assert (nassim.is_integer()),\ + f'trun/assimilation_interval must be integer,\ + got {self.trun,self.assimilation_interval,self.trun/self.assimilation_interval}' + + # Randomize order of observations + if self.randomize_obs: + observations,ocoords = random_sort_observations(observations,ocoords) + + # Throw away any observations in the spinup time before assimilations begin + # Exclude the initial time too (so the +1 in the line below) + nspinup = self.tspinup/self.assimilation_interval+1 + observations = observations[int(nspinup):,:] + ocoords = ocoords[int(nspinup):,:] + + # Throw away any observations beyond tspinup+trun + observations = observations[:int(nassim),:] + ocoords = ocoords[:int(nassim),:] + + # There is no nature run and no truth in this case. + # The only thing you can store is the observations. + self.observations = observations + self.obs_coordinates = ocoords + + # The da cycle depends on a "nr" structure to get information + # about the model properties. + # Depending on the configuration, the da cycle could + # create the initial ensemble perturbations around + # a spun-up model state, which is stored in nr.x0. + # So, run the forecast model for the spinup time if necessary. + # Define initial conditions of nature run + if self.tspinup > 0: + # Create initial state with random perturbations, then integrate + # one deterministic run for the spinup period + # Initial condition for the nature run is the last state of the spinup run + if verbose: print('Spinup : ',end='') + sp = CBL(self.cbl_settings) + sp.maxtime = self.tspinup + sp.initialize(1) + sp.run() + initial_conditions = sp.x + coords = sp.state_coordinates + else: + # No spinup, nature run gets unperturbed initial condition + sp = CBL(self.cbl_settings) + sp.initialize(1) + initial_conditions = sp.x0 + coords = sp.state_coordinates + + # Nature run: initialize, then integrate + self.nr = CBL(self.cbl_settings) + self.nr.maxtime = self.trun + self.nr.initialize(initial_conditions,coordinates=coords) + + ################################################################################################ + # Run assimilation cycle + if not hasattr(self,'da'): + self.da = cycle(self.cbl_settings, + self.nr, + self.nens, + self.FILTER, + self.trun, + self.assimilation_interval, + self.obs_coordinates, + self.observations, + self.obs_error_sdev_assimilate, + self.localization_cutoff, + self.inflate_rtps, + self.inflation_rtps_alpha, + self.rnseed) + + # If truths (from nature run) exist, add them to the da cycle structure. + # That's necessary for the diagnostics package to work properly. + if hasattr(self,'truths'): + self.da.truths = self.truths + + # Compute diagnostics + if not hasattr(self,'dg'): + self.dg = diagnostics(self.da) class diagnostics: def __init__(self,da): @@ -562,10 +748,14 @@ class diagnostics: ntim = da.observations.shape[0] nobs = da.observations.shape[1] nens = da.backgrounds.shape[2] - t = da.truths # ntim*nobs - o = da.observations # ntim*nobs - ocoords = da.obs_coordinates # nobs + ocoords = da.obs_coordinates # nobs or ntim*nobs xcoords = da.state_coordinates # nvar + o = da.observations # ntim*nobs + if hasattr(da,'truths'): + t = da.truths # ntim*nobs + truth_exists = True + else: + truth_exists = False # Get observation-space priors from da output b = da.model_equivalents # ntim*nobs*nens @@ -573,10 +763,20 @@ class diagnostics: # To get observation-space posteriors (ntim*nvar*nens), # apply observation operator to analyses a = np.zeros((ntim,nobs,nens))+np.nan - for i in range(ntim): - for j in range(nobs): - for k in range(nens): - a[i,j,k] = observation_operator(da.analyses[i,:,k],xcoords,ocoords[j]) + + # In this case, observation coordinates do not change between analysis times + if ocoords.ndim == 1: + for i in range(ntim): + for j in range(nobs): + for k in range(nens): + a[i,j,k] = observation_operator(da.analyses[i,:,k],xcoords,ocoords[j]) + + # In this case, observation coordinates are different at each analysis time + elif ocoords.ndim == 2: + for i in range(ntim): + for j in range(nobs): + for k in range(nens): + a[i,j,k] = observation_operator(da.analyses[i,:,k],xcoords,ocoords[i,j]) # Compute ensemble mean and spread meana = a.mean(axis=2) # ntim*nobs @@ -587,16 +787,18 @@ class diagnostics: # Compute space-averaged analysis RMSE and spread aRMSD_x = np.sqrt((meana-o)**2).mean(axis=1) # ntim bRMSD_x = np.sqrt((meanb-o)**2).mean(axis=1) # ntim - aRMSE_x = np.sqrt((meana-t)**2).mean(axis=1) # ntim - bRMSE_x = np.sqrt((meanb-t)**2).mean(axis=1) # ntim + if truth_exists: + aRMSE_x = np.sqrt((meana-t)**2).mean(axis=1) # ntim + bRMSE_x = np.sqrt((meanb-t)**2).mean(axis=1) # ntim aSprd_x = sigmaa.mean(axis=1) # ntim bSprd_x = sigmab.mean(axis=1) # ntim # Compute time-averaged analysis RMSE and spread aRMSD_t = np.sqrt((meana-o)**2).mean(axis=0) # nobs bRMSD_t = np.sqrt((meanb-o)**2).mean(axis=0) # nobs - aRMSE_t = np.sqrt((meana-t)**2).mean(axis=0) # nobs - bRMSE_t = np.sqrt((meanb-t)**2).mean(axis=0) # nobs + if truth_exists: + aRMSE_t = np.sqrt((meana-t)**2).mean(axis=0) # nobs + bRMSE_t = np.sqrt((meanb-t)**2).mean(axis=0) # nobs aSprd_t = sigmaa.mean(axis=0) # nobs bSprd_t = sigmab.mean(axis=0) # nobs @@ -640,18 +842,19 @@ class diagnostics: hist_aminb_local.append(np.histogram(aminb[:,i],bins=edges)[0]/aminb[:,i].size/(edges[1]-edges[0])) # Analysis & background mean error and spread - self.aRMSE_x = aRMSE_x # ntim self.aRMSD_x = aRMSD_x # ntim self.aSprd_x = aSprd_x # ntim - self.aRMSE_t = aRMSE_t # nobs self.aRMSD_t = aRMSD_t # nobs self.aSprd_t = aSprd_t # nobs - self.bRMSE_x = bRMSE_x # ntim self.bRMSD_x = bRMSD_x # ntim self.bSprd_x = bSprd_x # ntim - self.bRMSE_t = bRMSE_t # nobs self.bRMSD_t = bRMSD_t # nobs self.bSprd_t = bSprd_t # nobs + if truth_exists: + self.aRMSE_x = aRMSE_x # ntim + self.aRMSE_t = aRMSE_t # nobs + self.bRMSE_x = bRMSE_x # ntim + self.bRMSE_t = bRMSE_t # nobs # All these with dimensions ntim*nobs self.meana = meana