Skip to content
Snippets Groups Projects
Commit 0222f5d6 authored by Philipp Griewank's avatar Philipp Griewank
Browse files

Welcome to the show!

This is a cleaned up and slightly modified version of the linear advection model I have been playing around with.
parents
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
#Contains all functions related to setting up and running the ensemble filter. There is quite a bit of overlap with the model functions, but I felt it was a bit cleaner to try to separate them a bit.
import numpy as np
from model_functions import *
def set_da_constants(ncyc=10,nt=1,dt=500,u_std=0.5,dhdt_std=1e-4,True_std_obs=0.1,used_std_obs=0.1,pert_std_obs=0.00,obs_loc_h=np.arange(5,101,15),nens=100,nexp=1,init_noise=0.,fixed_seed=True,loc=None,init_spread = False, init_spread_h=0.5,init_spread_x = 500.):
"""
Sets constants needed for data assimilation and stores them in a dictionary.
There is some confusting misnaming going on, e.g. "u_std_ens" = u_std, but nothing that needs to be changed immediately
nt is not really relevant as long as we use the linear advection model, but I left it in from Yvonne in case we start applying it to different models.
TODO:
- Figure out what size std_obs should be.
- Add localization
- Add different assimilation options (e.g. LETKF)
"""
DA_const = {}
DA_const["ncyc"] = ncyc # Number of assimilation cycles
DA_const["nens"] = nens # number of ensemble members
DA_const["nexp"] = nexp # number of parallel experiments to run
DA_const["nt"] = nt # Number of model timesteps between observations
DA_const["dt"] = dt # time duration of model timesteps
#Ensemble Errors and ensemble
DA_const["u_std_ens"] = u_std # Standard deviation of model u
DA_const["dhdt_std_ens"] = dhdt_std # Standard deviation of model dydt
DA_const["fixed_seed"] = fixed_seed # If True, then the ensemble number is used as a seed, so that the u and dhdt values are always randomized the same way
#Ensemble initialization
DA_const["init_noise"] = init_noise # spread of noise added to initial conditions to avoid singular matrices
DA_const["init_spread"] = init_spread # If True, adds an x and h displacement to initial ensemble spread
DA_const["init_spread_h"] = init_spread_h # initial spread of ensemble in h
DA_const["init_spread_x"] = init_spread_x # initial spread of ensemble in x
#Observations
DA_const["True_std_obs"] = True_std_obs # Standard deviation of true observation error of y
DA_const["used_std_obs"] = used_std_obs # Standard deviation of assumed observation error used to calculate R
DA_const["pert_std_obs"] = pert_std_obs # Standard deviation of pertubations added to the true observation for each ensemble member individually when updating
DA_const["obs_loc"] = obs_loc_h # index array of which state elements are observed
#Localization
DA_const["loc"] = loc # localization yes or no, no for now
return DA_const
def create_states_dict(j,states,m_const,da_const):
"""
Generates the initial analysis and truth.
Also creates the "states" dictionary where the analysis ensemble, background ensemble, truth and observations are all going to be stored.
Very memory hungry, as everything from all assimilation time steps and all experiments is stored.
Works find for simple model though.
A fixed seed is used by default so that the model errors of all ensemble members is constant. But this can also be randomized.
Alternative version would be to generate the model errors and store them, but this has not happened yet. Would be necessary to test some parameter estimation tests.
Modified version of the Yvonne setup to work with the linear advection model
Todo:
- Describe states dictionary here.
- make model errors stored variables so enable parameter estimation
- Maybe make a more sensible name, such as init_da_ensembles
- I am not convinced this is the best way to generate the initial analysis.
- Might be best to change to dictionary time axis. Forecast and anaylis at the same time have difference of 1 in the time integer.
"""
nx = m_const["nx"]
#initial conditions
h_initial = gaussian_initial_condition(m_const["x_grid"],m_const["h_init_std"])
#Generate truth
if da_const["fixed_seed"]==True: np.random.seed(j)
u_truth = np.random.normal(m_const["u_ref"],m_const["u_std_truth"])
dhdt_truth = np.random.normal(m_const["dhdt_ref"],m_const["dhdt_std_truth"])
truth = linear_advection_model(h_initial,u_truth,dhdt_truth,m_const["dx"],da_const["dt"],da_const["nt"])
an = np.zeros((nx,da_const["nens"]))
#First rows full of nans :)
bg = np.zeros((nx,da_const["nens"]))
bg[:] = np.nan
obs = np.zeros(nx)
obs[:] = np.nan
for i in range(da_const["nens"]):
if da_const["fixed_seed"]==True: np.random.seed(i+j*da_const["nens"])
if da_const["init_noise"]>0:
h_ens = np.random.normal(h_initial,da_const["init_noise"])
else:
h_ens = h_initial
if da_const["init_spread"]>0:
#initial spread generated by moving waves forward and backward and up and down using
#da_const["init_spread_h"] and da_const["init_spread_x"]
x_displace = np.random.normal(0.,da_const["init_spread_x"])
h_displace = np.random.normal(0.,da_const["init_spread_h"])
u_tmp = x_displace/da_const["dt"]
h_ens = semi_lagrangian_advection(h_ens,m_const["dx"],u_tmp,da_const["dt"])
h_ens = h_ens+h_displace
u_ens = np.random.normal(m_const["u_ref"],da_const["u_std_ens"])
dhdt_ens = np.random.normal(m_const["dhdt_ref"],da_const["dhdt_std_ens"])
an[:,i] = linear_advection_model(h_ens,u_ens,dhdt_ens,m_const["dx"],da_const["dt"],da_const["nt"])
states[j]={}
states[j]['bg']=[bg]
states[j]['an']=[an]
states[j]['truth']=[truth]
states[j]['obs']=[obs]
return an, truth, states
return DA_const
def generate_obs(truth,states,m_const,da_const,j,t):
"""
Generates the truth and observations for the next assimilation step.
If a fixed_seed is used the model "errors" of the truth remain the same, but noise of the obs is still reseeded differently each time.
"""
#Generate new truth constants and integrate in time
if da_const["fixed_seed"]==True: np.random.seed(j)
u_truth = np.random.normal(m_const["u_ref"],m_const["u_std_truth"])
dhdt_truth = np.random.normal(m_const["dhdt_ref"],m_const["dhdt_std_truth"])
truth = linear_advection_model(truth,u_truth,dhdt_truth,m_const["dx"],da_const["dt"],da_const["nt"])
#make obs by adding some noise, a fixed seed should change over time so that the differences are not always the same for each measurement location
if da_const["fixed_seed"]==True: np.random.seed((j+1)*t)
obs = truth + np.random.normal(0,da_const["True_std_obs"],m_const["nx"])
states["truth"] = states["truth"]+[truth]
states["obs"] = states["obs"]+[obs]
return truth, obs, states
def predict(analysis,states,da_const,m_const,j):
"""
Runs the analysis ensemble forward in time using the model to generate the next forecast/background ensemble for the next assimilation step.
Outlook:
- Add different models than the linear_advection_model?
"""
bg = np.zeros((m_const["nx"],da_const["nens"]))
for i in range(da_const["nens"]):
if da_const["fixed_seed"]==True: np.random.seed(i+j*da_const["nens"])
u_ens = np.random.normal(m_const["u_ref"],da_const["u_std_ens"])
dhdt_ens = np.random.normal(m_const["dhdt_ref"],da_const["dhdt_std_ens"])
bg[:,i] = linear_advection_model(analysis[:,i],u_ens,dhdt_ens,m_const["dx"],da_const["dt"],da_const["nt"])
states["bg"] = states["bg"]+[bg]
return bg, states
def update_noloc(background,obs,R,H,states,da_const,m_const):
"""
Computes the analysis by individually changing each ensemble member of the forecast/background through the shared ensemble Kalman Gain and the observations.
Todo:
-Figure out exactly which version of the EnKF is used here, so that I can refer to it properly
"""
# Compute the background error covariance matrix
P = np.cov(background)
# define relative weights of observations and background in terms of the Kalman Gain matrix of size
K = KalmanGain(P, R, H)
# Compute the analysis for each ensemble members
an = np.zeros((m_const["nx"],da_const["nens"]))
for i in range(da_const["nens"]):
an[:,i] = get_analysis(background[:,i],obs,K,H,da_const)
states["an"] = states["an"]+[an]
return an, states
def KalmanGain(P,R,H):
"""
Computes the Kalman gain matrix: K = PH^T(HPH^T+R)^(-1)
"""
P_H = np.dot(P,H.transpose())
S = np.dot(H,P_H)+R
K = np.dot(P_H,np.linalg.inv(S))
return K
def get_analysis(bg,obs,K,H,da_const):
"""
Computes analysis: an = bg + K(H*bg-obs_pert), where obs_pert are perturbed observations
"""
obs_pert = np.dot(H,obs+np.random.normal(0,da_const["pert_std_obs"],len(obs)))
bg_obs = np.dot(H,bg)
#obs_orig =np.dot(H,obs)
#print('obs_orig-bg_obs',np.sum(obs_orig-bg_obs),'obs_pert-bg_obs',np.sum(obs_pert-bg_obs),'update/np.sum(x)',np.sum(np.dot(K,obs_pert-bg_obs) )/np.sum(bg),'max(K)',np.max(K))
an = bg + np.dot(K,obs_pert-bg_obs)
return an
def run_linear_advection_EnKF(m_const,da_const):
"""
The heart and soul of the whole linear advection EnKF filter.
Steps:
- Computes constant Matrices H and R
- Initializes states dictionary with first truth and analysis ensemble
- And so on
Todo: Improve the documentation
"""
"""
constant matrices that follow from previously defined constants
"""
H = np.identity(m_const["nx"])[da_const["obs_loc"],:] # Observation operator
R = da_const["used_std_obs"]**2*np.identity(len(da_const["obs_loc"])) # Observation error corvariance matrix
"""
create dictionary to store background, analysis, truth and observations for verification
"""
states = {}
#Construct localization matrix C if loc!=None
if da_const["loc"]:
C = LocMat(da_const["loc"])
else:
C=None
"""
Loop over experiments
"""
for j in range(da_const["nexp"]):
"""
Initialize a random truth and a random analysis ensemble stored in "states"
"""
analysis, truth, states = create_states_dict(j,states,m_const,da_const)
"""
START DATA ASSIMILATION CYCLE
"""
for t in range(0,da_const["ncyc"]):
"""
Run the truth forward in time until the next assimilation step and create observations.
Note that this step is usually provided by nature and measurements obtained from weather stations,
wind profilers, radiosondes, aircraft reports, radars, satellites, etc.
"""
truth, obs, states[j] = generate_obs(truth,states[j],m_const,da_const,j,t)
"""
Predict
"""
# Predict the state at the next assimilation step by running the analysis forward in time
background, states[j] = predict(analysis,states[j],da_const,m_const,j)
"""
Update
"""
# Combine background and observations to get the analysis
analysis, states[j] = update_noloc(background,obs,R,H,states[j],da_const,m_const)
"""
END DATA ASSIMILATION CYCLE
"""
"""
end loop over experiments
"""
return states
def get_spread_and_rmse(states,da_const,m_const):
"""
computes RMSE over space and time respectively, averaged over all experiments for the background ensmeble mean and analysis ensemble mean.
Structure of the output is as follows: rmse['dim']['quan'] where dim = space or time, quan = bg or an, bf will also be calculated when available
Returns spread and rmse over time and space averaged over all experiments. So increasing the number of experiments acts smooths out the time errors.
"""
n = m_const["nx"]
ncyc = da_const["ncyc"]+1
nexp = da_const["nexp"]
nens = da_const["nens"]
time = np.arange(0,ncyc)
quantities = ['an','bg']
if 'bf' in states[0].keys(): quantities.append('bf')
tr_matrix = np.zeros([nexp,ncyc,n])
for j in range(nexp):
tr_matrix[j,:,:] = np.array(states[j]["truth"][:])
rmse = {}
spread = {}
for dim in ['time','space']: # Choose dim to average results
rmse[dim]={}
spread[dim]={}
for quan in quantities: # Loop over fields
rmse[dim][quan]={}
spread[dim][quan] = {}
rmse[dim][quan]['mean']=0.
spread[dim][quan]['mean']=0.
for j in range(nexp): # Loop over experiments
ens_matrix = np.array(states[j][quan][:])
if dim == "space": # Average over time to get one value per gridpoint
rmse[dim][quan][j] = np.zeros(n)
spread[dim][quan][j] = np.zeros(n)
for x in range(n): # Loop DA cycles
rmse[dim][quan][j][x] = np.nanmean(L2norm(ens_matrix[:,x,:].T-tr_matrix[j,:,x]))
spread[dim][quan][j][x] = np.nanmean(np.std(ens_matrix[:,x,:], axis=1, ddof=1))
if dim == "time": # Average over space so that one value remains per timestep
rmse[dim][quan][j] = np.zeros(ncyc)
spread[dim][quan][j] = np.zeros(ncyc)
for t in time: # Loop DA cycles
rmse[dim][quan][j][t] = np.mean(L2norm(ens_matrix[t,:,:].T-tr_matrix[j,t,:]))
spread[dim][quan][j][t] = np.mean(np.std(ens_matrix[t,:,:], axis=1, ddof=1))
rmse[dim][quan]['mean'] = rmse[dim][quan]['mean'] + rmse[dim][quan][j]/float(nexp)
spread[dim][quan]['mean'] = spread[dim][quan]['mean'] + spread[dim][quan][j] / float(nexp)
rmse[dim][quan] = rmse[dim][quan]['mean']
spread[dim][quan] = spread[dim][quan]['mean']
return rmse, spread
def L2norm(error):
"""
Computes the l2 norm over the first dimension of the input array
"""
return np.sqrt(np.average(error**2,axis=0))
def predict_blind(background,states,da_const,m_const,j):
"""
Runs the background ensemble forward in time using the model to predict the truth at the next assimilation step.
This is than saved as 'bf', for blind forecast.
"""
bf = np.zeros((m_const["nx"],da_const["nens"]))
for i in range(da_const["nens"]):
if da_const["fixed_seed"]==True: np.random.seed(i+j*da_const["nens"])
u_ens = np.random.normal(m_const["u_ref"],da_const["u_std_ens"])
dhdt_ens = np.random.normal(m_const["dhdt_ref"],da_const["dhdt_std_ens"])
bf[:,i] = linear_advection_model(background[:,i],u_ens,dhdt_ens,m_const["dx"],da_const["dt"],da_const["nt"])
#bg[:,i] = linear_advection_model(analysis[:,i],da_const["dt"],da_const["std_model"],m_const["C"],m_const["a"])
states["bf"] = states["bf"]+[bf]
return bf, states
def add_blind_forecast(states,m_const,da_const):
"""
Takes a states dictionary created by run_linear_advection_EnKF and adds a 'bf' field which stands for blind forecast, also known as free forecast.
This blind forecast is generated directly from the background ensemble, without assimilating first.
Only makes sense if the randomized errors are carefully generated from the same seed, otherwise the model errors used to make the background ensemble will change when creating the blind forecast.
This feels dangerously close to philipp making a custom solution for everything himself again.
"""
"""
Loop over experiments
"""
for j in range(da_const["nexp"]):
#initializing new dictionary field, this is where things can easily go wrong, so pay attention
states[j]['bf'] = []
#add two first initial nan states, which we take from the initial forecast field
states[j]['bf'].append(states[j]['bg'][0])
states[j]['bf'].append(states[j]['bg'][0])
"""
START blind forecasting
"""
for t in range(2,da_const["ncyc"]+1):
"""
Predict
"""
#print('blind forecast time',t)
# Predict the state at the next assimilation step by running the analysis forward in time
blind_forecast, states[j] = predict_blind(states[j]['bg'][t-1],states[j],da_const,m_const,j)
"""
END blind forecasting
"""
"""
end loop over experiments
"""
return states
def sum_mid_tri(x):
"""
Default response function, is a simple sum over the middle of the state vector
"""
nx = len(x)
idx_str = int(nx/3.)
idx_end = int(2*nx/3.)
return np.sum(x[idx_str:idx_end])
def add_response(states,func_J=sum_mid_tri):
"""
Goes through the analysis, background, truth, and blind forecast if it exists and applies the given response function to the ensemble members.
Isn't pretty, could be made more elegantly to loop over the given variables instead of hard coding them.
Also poorly handles that there are differing number of bg, an, and bf fields.
Also so far doesn't allow computing the truth as a response function, as only the individual ensemble members are passed to func_J with no other information
ToDo:
- Allow using more diverse func_Js
- tidy up, is a mess
"""
nexp = len(states)
ncyc = len(states[0]["bg"])
nx = states[0]["bg"][0][:,0].shape[0]
nens = states[0]["bg"][0][0,:].shape[0]
#Poor way of checking if a blind forecast also exists
bf_flag = 0
if 'bf' in states[0]: bf_flag = 1
for e in range(nexp):
states[e]["response"]={}
#Initialize dictionary fields, not the cleanest solution, but fuck it
c = 0
an_response = np.zeros(nens)
bg_response = np.zeros(nens)
if bf_flag: bf_response = np.zeros(nens)
for n in range(nens):
an_response[n] = func_J(states[e]["an"][0][:,n])
bg_response[n] = func_J(states[e]["bg"][0][:,n])
if bf_flag: bf_response[n] = func_J(states[e]["bf"][0][:,n])
states[e]["response"]["an"] =[an_response]
states[e]["response"]["bg"] =[bg_response]
states[e]["response"]["truth"]=[func_J(states[e]["truth"][0])]
if bf_flag : states[e]["response"]["bf"] =[bf_response]
for c in range(1,ncyc):
an_response = np.zeros(nens)
bg_response = np.zeros(nens)
bf_response = np.zeros(nens)
for n in range(nens):
an_response[n] = func_J(states[e]["an"][c][:,n])
bg_response[n] = func_J(states[e]["bg"][c][:,n])
if bf_flag: bf_response[n] = func_J(states[e]["bf"][c][:,n])
states[e]["response"]["bg"] =states[e]["response"]["bg"]+[bg_response]
if bf_flag: states[e]["response"]["bf"] =states[e]["response"]["bf"]+[bf_response]
states[e]["response"]["an"] =states[e]["response"]["an"]+[an_response]
states[e]["response"]["truth"]=states[e]["response"]["truth"]+[func_J(states[e]["truth"][c])]
return states
def var_reduction_estimate(states,m_const,da_const,j=0,obs = []):
"""
just loops over a full experiment
Calculates an estimated var reduction of the response fucntion for all observations, as well as for each observation individually
if a sepcific obs list is applied, that will be used instead of all observations
following naming of Hakim 2020
"""
#For now I am not worried about being efficient
#j=0
#t=3
nobs = len(da_const["obs_loc"])
if obs == []:
obs = np.arange(nobs)
nens = da_const["nens"]
ncyc = len(states[j]['response']['bf'])
var_reduction_total = []
var_reduction_individual = []
real_reduction = []
for t in range(1,ncyc-1):
R = da_const["used_std_obs"]**2*np.identity(len(obs)) # Observation error corvariance matrix
H = np.identity(m_const["nx"])[da_const["obs_loc"][obs],:]
x = states[j]['bg'][t][:,:]
dx = x.T-np.mean(x,axis=1)
dx = dx.T
A = np.dot(dx,dx.T)/(dx.shape[1]-1)
J= states[j]["response"]['bf'][t+1][:]
dJ = J-np.mean(J)
HAHt = np.dot(H,np.dot(A,H.T))
HAHtRinv= np.linalg.inv(HAHt+R)
dJHdxt = np.dot(dJ,np.dot(H,dx).T)/(nens-1)
vr_total = -np.dot(dJHdxt,np.dot(HAHtRinv,dJHdxt))
var_reduction_total.append(vr_total)
#And now the loop for the indivudual reductions
vr_individual = np.zeros(nobs)
for o in range(nobs):
R = da_const["used_std_obs"]**2*np.identity(1) # Observation error corvariance matrix
H = np.identity(m_const["nx"])[da_const["obs_loc"][o],:]
HAHt = np.dot(H,np.dot(A,H.T))
HAHtRinv= np.linalg.inv(HAHt+R)
dJHdxt = np.dot(dJ,np.dot(H,dx).T)/(nens-1)
vr_individual[o] = - np.dot(dJHdxt,np.dot(HAHtRinv,dJHdxt))
var_reduction_individual.append(vr_individual)
J_f=states[j]["response"]['bg'][t+1][:]
dJ_f = J_f-np.mean(J_f)
real_reduction.append(np.var(dJ_f) - np.var(dJ))
return var_reduction_total,var_reduction_individual,real_reduction
def var_reduction_estimate_iterative(states,m_const,da_const,j=0,dJ_update_flag=0):
#from scipy.linalg import sqrtm
"""
Iteratively goes through all observations, estimates the impact on the response function, updates dx and dJ, and continues on.
Currently starts with the location with the largest estimated impact, and then goes to the next highest, and so on. But the ordering should be unimportant.
It return the total variance reduction estimate for all the individual observations.
Following naming of Hakim 2020.
It was a bit unclear in the paper, but we decided that we would update dx and dJ simultaneously by bringing dJ into the state vector. This was also confirmed by Hakim via email.
This results in reduction estimates which are pretty much identical down to machine precision with those reached by including all observations at once.
Is coded very inefficiently, does not make benefit of only looking at the problem in observation space "reduced model space".
Currently is not tested for inputing multiple observations at once.
Output:
Individual variance reduction per each observation
Options, dJ_update_flag
0: Hakim version, updates dJ using "appended state" approach
1: Modified Hakim. Updates dJ, but then scales dJ to perfectly match the estimated variance reduction perfectly. Should be a very small tweak
2: Scales dJ to match the reduced variance estimate. Should be extremely cheap and robust, but totally neglects covariance information.
2: Doesn't update dJ at all, only used to check with what Tanya did
"""
#For now I am not worried about being efficient
nobs = len(da_const["obs_loc"])
#if obs == []:
obs = np.arange(nobs)
j=0
nens = da_const["nens"]
ncyc = len(states[j]['response']['bf'])
var_reduction_total = []
var_reduction_individual = []
observation_ranking = []
real_reduction = []
#R is always the same
R = da_const["used_std_obs"]**2*np.identity(1) # Observation error corvariance matrix
for t in range(1,ncyc-1):
#for t in [10]:
x = states[j]['bg'][t][:,:]
dx = x.T-np.mean(x,axis=1)
dx = dx.T
A = np.dot(dx,dx.T)/(dx.shape[1]-1)
J= states[j]["response"]['bf'][t+1][:]
dJ = J-np.mean(J)
obs_ordered = []
obs_remain = list(obs)
vr_individual = np.zeros(nobs)
for o in range(nobs): #loop over the number of observations
#for o in range(1): #loop over the number of observations
vr_ind_tmp = np.zeros(nobs)
for oo in obs_remain: #loop over all observation locastions which have not been used yet used
H = np.identity(m_const["nx"])[da_const["obs_loc"][oo],:]
HAHt = np.dot(H,np.dot(A,H.T))
HAHtRinv= np.linalg.inv(HAHt+R)
dJHdxt = np.dot(dJ,np.dot(H,dx).T)/(nens-1)
vr_ind_tmp[oo] = - np.dot(dJHdxt,np.dot(HAHtRinv,dJHdxt))
#print(vr_ind_tmp)
ind_min = np.where(vr_ind_tmp==np.min(vr_ind_tmp))[0][0]
#print(ind_min)
vr_individual[ind_min] = vr_ind_tmp[ind_min]
obs_remain.remove(ind_min)
H = np.identity(m_const["nx"])[da_const["obs_loc"][ind_min],:]
#New we add in Tanyas reduction
E = np.matmul(A,H.T)
E = np.matmul(H,E)
E = E + R
alpha = 1./(1.+np.sqrt(R/E))
if dJ_update_flag==0 or dJ_update_flag==1:
#Moded Tanyas code because I had en error.
#Turned out it was a matrix multiplication issue I solved by switching the final matmul with an np.outer. It has the right dimension, but I fear my version will only work if its single point measurements.
#Now we include dJ into dx to include it in the update
dxJ = np.vstack([dx,dJ])
AxJ = np.dot(dxJ,dxJ.T)/(dxJ.shape[1]-1)
HxJ = np.hstack([H,np.array(0)])
HAHt = np.dot(HxJ,np.dot(AxJ,HxJ.T))
HAHtRinv= np.linalg.inv(HAHt+R)
K = np.dot(AxJ,HxJ.T)*HAHtRinv
# Update state vector
HdxJ = np.matmul(HxJ,dxJ)
dxJ = dxJ - alpha*np.outer(K,HdxJ)
dx = dxJ[:-1,:]
old_var_dJ = np.var(dJ)
dJ = dxJ[-1,:]
new_var_dJ = np.var(dJ)
if dJ_update_flag==1:
var_scaling = (old_var_dJ+ vr_ind_tmp[ind_min])/new_var_dJ
dJ=np.sqrt(var_scaling)*dJ
#print(old_var_dJ+vr_ind_tmp[ind_min],new_var_dJ,np.var(dJ))
if dJ_update_flag==2 or dJ_update_flag==3:
if dJ_update_flag==2:
var_scaling=(np.var(dJ)+vr_ind_tmp[ind_min])/np.var(dJ)
#New dJ
dJ=np.sqrt(var_scaling)*dJ
#Update dx
#Moded Tanyas code because I had en error.
#Turned out it was a matrix multiplication issue I solved by switching the final matmul with an np.outer. It has the right dimension, but I fear my version will only work if its single point measurements.
HAHt = np.dot(H,np.dot(A,H.T))
HAHtRinv= np.linalg.inv(HAHt+R)
K = np.dot(A,H.T)*HAHtRinv
# Update state vector
Hdx = np.matmul(H,dx)
dx = dx - alpha*np.outer(K,Hdx)
#Recalculating A makes things worse if you don't rescale dJ as well
A = np.dot(dx,dx.T)/(dx.shape[1]-1)
var_reduction_individual.append(vr_individual)
return var_reduction_individual
This diff is collapsed.
#!/usr/bin/env python
#collection of all functions which determine the model setup and time evolution
import numpy as np
def set_model_constants(nx=101,dx=100,u=2.33333333,u_std=0,dhdt=0,dhdt_std=0,h_init_std = 500):
"""
Sets all the constants used to describe the model.
Currently has only gaussian initial conditions.
ToDo: Add different initial conditions as an option.
"""
const = {}
#grid
const["nx"] = nx
const["dx"] = dx
const["x_grid"] = np.arange(nx)*dx
#properties of truth wave
const["h_init_std"] = h_init_std # width of the initial gaussian distribution
const["u_ref"] = u # reference u around which ensemble is generated, truth can
const["u_std_truth"] = u_std # standard deviation of u for truth
const["dhdt_ref"] = dhdt # reference dhdt around which ensemble is generated
const["dhdt_std_truth"] = dhdt_std # standard deviation of dhdt for truth
return const
def gaussian_initial_condition(x,sig):
"""
Generates a gaussian which is commonly used for the initial conditions.
It always uses a mean of zero, and then mirrors values along the x axis to account for periodic boundary conditions.
This is not very flexible, and also not super clean because it just assumes that the number of x grid points is even
But since it works making it nice is very far down the priority list.
"""
mu = 0
y = np.exp(-np.power(x - mu, 2.) / (2 * np.power(sig, 2.)))
half_idx = int(x.shape[0]/2)
y[-half_idx:] = y[half_idx-1::-1]
return y
def linear_advection_model(y,u,dydt,dx,dt,nt):
"""
This is a simple wrapper for the advection and growth routines which make up the time evolution of the model.
"""
for n in range(nt):
y = semi_lagrangian_advection(y,dx,u,dt)
y = linear_growth(y,dydt,dt)
return y
def semi_lagrangian_advection(y,dx,u,dt):
"""
Advects a 1D field for a given time difference and speed.
A linear polation is applied between the nearest grid points. Getting the periodic boundary right is a bit of annoying book keeping.
Works by calculating the fractional displacement and the integer displacement.
Only works with a constant dx.
"""
dx_frac = u*dt/dx-np.floor(u*dt/dx)
dx_int = int(np.floor(u*dt/dx))
##initial quality check, shouldn't be needed anymore
#if np.abs(u*dt - (dx_frac+dx_int)*dx)>0.00001:
# print(u,u*dt,(dx_frac+dx_int)*dx)
if abs(dx_int)>len(y):
#this is what to do if things move through the whole domain in one step
if dx_int >0 :
dx_int = dx_int - int(dx_int/float(len(y)))*len(y)
if dx_int<0:
dx_int = dx_int + int(-dx_int/float(len(y)))*len(y)
y = np.hstack([(1-dx_frac)*y[0]+(dx_frac)*y[-1],(1-dx_frac)*y[1:]+(dx_frac)*y[:-1]])
y = np.hstack([y[-dx_int:],y[:-dx_int]])
return y
def linear_growth(y,dydt,dt):
"""
Applies linear growth rate. Very highly advanced math.
"""
return y + dydt*dt
#!/usr/bin/env python
# Collection of various plot scripts.
# All plot scripts return the figure and axes object. Final tweaks and saving are intended to happen in notebooks
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
def ensemble_plotter(states,m_const,da_const,j=0,t_start=0,t_end=0):
"""
Plots the full forecast/background ensemble with observations, as well as the resulting analysis.
Plots all timesteps from t_start till t_end.
Initial starting conditions have no background state
ToDo:
intital state needs to be added as an exception with no backbround state.
Input:
j : experiment number
t_start and t_end: time frame to plot
Returns:
figure and axes
"""
sns.set()
sns.set_style('whitegrid')
nc = da_const['ncyc']
if t_end ==0: t_end = nc
na = t_end-t_start
alpha = np.sqrt(1/da_const['nens'])
fig, ax = plt.subplots(na,2,figsize=(7.5,na*1.5),sharex='all',sharey='all')
#left bg, right analysis
#for c in range(nc):
for c in range(na):
for i in range(da_const["nens"]):
ax[c,0].plot(m_const['x_grid'],states[j]['bg'][c+t_start][:,i],'r',alpha =alpha,zorder=1)
ax[c,1].plot(m_const['x_grid'],states[j]['an'][c+t_start][:,i],'magenta',alpha =alpha,zorder=1)
ax[c,0].plot(m_const['x_grid'],np.mean(states[j]['bg'][c+t_start][:,:],axis=1),'k--',alpha =1,zorder=2,label='ens mean')
ax[c,0].plot(m_const["x_grid"],states[j]['truth'][c+t_start],'k',zorder=10,label='truth')
ax[c,1].plot(m_const['x_grid'],np.mean(states[j]['an'][c+t_start][:,:],axis=1),'k--',alpha =1,zorder=2,label='ens mean')
ax[c,1].plot(m_const["x_grid"],states[j]['truth'][c+t_start],'k',zorder=10,label='truth')
ax[c,0].scatter(m_const["x_grid"][da_const["obs_loc"]],states[j]['obs'][c+t_start][da_const["obs_loc"]],zorder=3,label='observations')
ax[c,1].scatter(m_const["x_grid"][da_const["obs_loc"]],states[j]['obs'][c+t_start][da_const["obs_loc"]],zorder=3,label='observations')
ax[c,0].set_ylabel('assim step '+str(c+t_start) + '\n h [m]')
ax[0,0].set_title('background ensemble and observations')
ax[0,1].set_title('analysis ensemble after update')
ax[0,1].set_xlim([m_const["x_grid"][0],m_const["x_grid"][-1]])
plt.subplots_adjust(wspace=0.01,hspace=0.01)
# ax[0].set_xticklabels([])
ax[na-1,1].set_xlabel('x [m]')
ax[na-1,0].set_xlabel('x [m]')
plt.legend()
return fig,ax
def B_plotter(states,j=0,ncyc=0,matrix='bg'):
"""
Plots the covariance plotter of either the forecast/background ensemble ('bg'), analysis 'an', or the free/blind forecast 'bf'
"""
fig, ax = plt.subplots(1,1,figsize=(5,5))
ax.set_ylabel('[h1,h2,h3,...]')
ax.set_xlabel('[h1,h2,h3,...]')
#B = np.cov(ensemble_diff.transpose())
B = np.cov(states[j][matrix][ncyc],ddof=1)
limit = np.max(np.abs(B))/4.*3.
pm = ax.pcolormesh(B,vmin=-limit,vmax=limit,cmap='RdBu_r')
plt.colorbar(pm,shrink = 0.8)
ax.set_aspect('equal')
return fig,ax
def plot_scores_spread(rmse, spread):
"""
Plots the evolution of the RMSE and spread over time and space.
Could be expanded to also plot the blind forecast if available
Todo:
plot next to each other with a shared y axis
"""
sns.set_style('whitegrid')
figs = {}
for dim in ["space", "time"]:
figs[dim], ax = plt.subplots(1, sharex='col', figsize=(10, 4))
#for i, var in enumerate(['RMSE over ' + dim + ' h', 'RMSE over ' + dim + ' u']):
ax.set_title('RMSE over ' + dim, fontsize=24)
ax.tick_params(labelsize=18)
if 'bf' in rmse['time'].keys():
ax.plot(rmse[dim]['bf'], 'b', label="bf rmse="+str(round(np.nanmean(rmse[dim]['bf']),3)))
ax.plot(spread[dim]['bf'], 'b', label="bf spread="+str(round(np.nanmean(spread[dim]['bf']),3)), ls='--')
ax.plot(rmse[dim]['bg'] , 'r' , label="bg rmse=" +str(round(np.nanmean(rmse[dim]['bg']),3)),lw=3)
ax.plot(spread[dim]['bg'], 'r' , label="bg spread="+str(round(np.nanmean(spread[dim]['bg']),3)), ls='--',lw=3)
ax.plot(rmse[dim]['an'] , 'magenta', label="an rmse=" +str(round(np.mean(rmse[dim]['an']),3)))
ax.plot(spread[dim]['an'], 'magenta', label="an spread="+str(round(np.mean(spread[dim]['an']),3)), ls='--')
ax.set_xlim(left=0)
ax.legend()
if dim == "space":
ax.set_xlabel("space in grid points", fontsize=18)
if dim == "time":
ax.set_xlabel("time in assimilation cycles", fontsize=18)
return figs
def plot_response_function_pdf_split(states,j=0,t_start=0,t_end=0,left_var='bg',right_var='an'):
"""
Makes a split Seaborn violin plot of the respone function values. By default plots the analysis and forecast, but can also plot the free/blind forecast
Is not super efficient because it first transforms the response values into a panda dataframe to get the seaborn plot to work.
"""
import seaborn as sns
import pandas as pd
sns.set()
fig,ax = plt.subplots(1,1,figsize=(6,4))
ncyc,nens = np.array(states[j]['response']['bg']).shape
if t_end==0: t_end=ncyc
#Got to figure out how to make sure the times line up, since the time integers of an, bg, and bf are all 1 step delayed
dict_label ={'an':'analysis','bg':'forecast-t1','bf':'forecast-t2'}
dict_color ={'an':'magenta','bg':'r','bf':'b'}
#t_diff = t_adjust[right_var]-t_adjust[left_var]
nt = t_end-t_start
plot_data = {
'response' : np.hstack([np.array(states[j]['response'][left_var ][t_start:t_end]).ravel(),
np.array(states[j]['response'][right_var][t_start:t_end]).ravel()]),
'cyc' : np.hstack([np.repeat(np.arange(nt),nens),np.repeat(np.arange(nt),nens)])+t_start,
'type' : [dict_label[left_var]]*nt*nens+[dict_label[right_var]]*nt*nens}
sns.violinplot(data=plot_data,y='response',x='cyc',hue='type', inner=None, orient="v",split=True,palette={dict_label[left_var]:dict_color[left_var],dict_label[right_var]:dict_color[right_var]},cut=0,bw=0.25)
plt.plot(states[j]['response']['truth'][t_start:t_end],'k--',marker='o',label='truth')
plt.legend(loc='lower center')
ax.set_xlabel('assimilation step')
ax.set_ylabel('PDF of response function J')
#Moves the left and right pdf a bit appart so you can see where they end. Thank you stackoverflow
inner=None
width = 0.75
delta = 0.05
final_width = width - delta
offset_violinplot_halves(ax, delta, final_width, inner, 'vertical')
return fig,ax
def offset_violinplot_halves(ax, delta, width, inner, direction):
"""
From Stackoverflow!
This function offsets the halves of a violinplot to compare tails
or to plot something else in between them. This is specifically designed
for violinplots by Seaborn that use the option `split=True`.
For lines, this works on the assumption that Seaborn plots everything with
integers as the center.
Args:
<ax> The axis that contains the violinplots.
<delta> The amount of space to put between the two halves of the violinplot
<width> The total width of the violinplot, as passed to sns.violinplot()
<inner> The type of inner in the seaborn
<direction> Orientation of violinplot. 'hotizontal' or 'vertical'.
Returns:
- NA, modifies the <ax> directly
"""
import matplotlib.collections
# offset stuff
if inner == 'sticks':
lines = ax.get_lines()
for line in lines:
if direction == 'horizontal':
data = line.get_ydata()
print(data)
if int(data[0] + 1)/int(data[1] + 1) < 1:
# type is top, move neg, direction backwards for horizontal
data -= delta
else:
# type is bottom, move pos, direction backward for hori
data += delta
line.set_ydata(data)
elif direction == 'vertical':
data = line.get_xdata()
print(data)
if int(data[0] + 1)/int(data[1] + 1) < 1:
# type is left, move neg
data -= delta
else:
# type is left, move pos
data += delta
line.set_xdata(data)
for ii, item in enumerate(ax.collections):
# axis contains PolyCollections and PathCollections
if isinstance(item, matplotlib.collections.PolyCollection):
# get path
path, = item.get_paths()
vertices = path.vertices
half_type = _wedge_dir(vertices, direction)
# shift x-coordinates of path
if half_type in ['top','bottom']:
if inner in ["sticks", None]:
if half_type == 'top': # -> up
vertices[:,1] -= delta
elif half_type == 'bottom': # -> down
vertices[:,1] += delta
elif half_type in ['left', 'right']:
if inner in ["sticks", None]:
if half_type == 'left': # -> left
vertices[:,0] -= delta
elif half_type == 'right': # -> down
vertices[:,0] += delta
def _wedge_dir(vertices, direction):
"""
Args:
<vertices> The vertices from matplotlib.collections.PolyCollection
<direction> Direction must be 'horizontal' or 'vertical' according to how
your plot is laid out.
Returns:
- a string in ['top', 'bottom', 'left', 'right'] that determines where the
half of the violinplot is relative to the center.
"""
if direction == 'horizontal':
result = (direction, len(set(vertices[1:5,1])) == 1)
elif direction == 'vertical':
result = (direction, len(set(vertices[-3:-1,0])) == 1)
outcome_key = {('horizontal', True): 'bottom',
('horizontal', False): 'top',
('vertical', True): 'left',
('vertical', False): 'right'}
# if the first couple x/y values after the start are the same, it
# is the input direction. If not, it is the opposite
return outcome_key[result]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment