Skip to content
Snippets Groups Projects
Commit ec82840e authored by lkugler's avatar lkugler
Browse files

update, includes preproc

parent 5a314861
No related branches found
No related tags found
No related merge requests found
from multiprocessing.sharedctypes import Value
import os, sys, shutil, warnings
import time as time_module
import datetime as dt
......@@ -5,7 +6,7 @@ import numpy as np
from scipy.interpolate import interp1d
from config.cfg import exp, cluster
from dartwrf.utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print
from dartwrf.utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print, shell
import dartwrf.create_obsseq as osq
from dartwrf import wrfout_add_geo
from dartwrf import obsseq
......@@ -141,6 +142,8 @@ def set_DART_nml(just_prior_values=False):
# options keys are replaced in input.nml with values
options = {
"<sampling_error_correction>": '.true.' if exp.sec else '.false.',
"<post_inflation>": '4' if exp.inflation else '0',
"<n_ens>": str(int(exp.n_ens)),
"<cov_loc_radian>": "0.00000001", # dummy value, used for types not mentioned below
"<list_obstypes>": "'" + "','".join(list_obstypes) + "'",
......@@ -173,7 +176,7 @@ def run_Hx(time, obscfg):
"""
# existing obs_seq.out is a precondition for filter
# needs to know where to take observations
osq.create_obsseqin_alltypes(time, [obscfg, ], 0)
osq.create_obsseqin_alltypes(time, [obscfg, ])
run_perfect_model_obs()
print("running H(x) : obs operator on ensemble prior")
......@@ -182,7 +185,16 @@ def run_Hx(time, obscfg):
set_DART_nml(just_prior_values=True)
# run filter to calculate prior Hx
os.system("mpirun -np 12 ./filter &> log.filter.preassim")
shell("mpirun -np 12 ./filter &> log.filter.preassim")
osf = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.final")
if hasattr(exp, "superob_km"):
df = osf.df
print("superobbing to", exp.superob_km, "km")
osf.df = osf.df.superob(window_km=exp.superob_km)
osf.to_dart(cluster.dartrundir + "/obs_seq.final")
return osf
def obs_operator_nature(time):
......@@ -243,13 +255,8 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
# ensure prior time matches assim time (can be off intentionally)
if assim_time != prior_valid_time:
print("overwriting time in prior from nature wrfout")
os.system(
cluster.ncks
+ " -A -v XTIME,Times "
+ cluster.dartrundir
+ "/wrfout_d01 "
+ wrfout_dart
)
shell(cluster.ncks+ " -A -v XTIME,Times "+
cluster.dartrundir+"/wrfout_d01 "+ wrfout_dart)
# this seems to be necessary (else wrong level selection)
wrfout_add_geo.run(cluster.dartrundir + "/../geo_em.d01.nc", wrfout_dart)
......@@ -304,29 +311,28 @@ def calc_obserr_WV73(Hx_nature, Hx_prior):
OEs[iobs] = oe_model
return OEs
def run_perfect_model_obs(nproc=12):
def run_perfect_model_obs(nproc=6):
print("generating observations - running ./perfect_model_obs")
os.chdir(cluster.dartrundir)
try_remove(cluster.dartrundir + "/obs_seq.out")
if not os.path.exists(cluster.dartrundir + "/obs_seq.in"):
raise RuntimeError("obs_seq.in does not exist in " + cluster.dartrundir)
os.system("mpirun -np "+str(nproc)+" ./perfect_model_obs > log.perfect_model_obs")
print(shell("mpirun -np "+str(nproc)+" ./perfect_model_obs > log.perfect_model_obs"))
if not os.path.exists(cluster.dartrundir + "/obs_seq.out"):
raise RuntimeError(
"obs_seq.out does not exist in " + cluster.dartrundir,
"\n look for " + cluster.dartrundir + "/log.perfect_model_obs")
def assimilate(nproc=96):
def assimilate(nproc=12):
print("time now", dt.datetime.now())
print("running filter")
os.chdir(cluster.dartrundir)
try_remove(cluster.dartrundir + "/obs_seq.final")
t = time_module.time()
#os.system('mpirun -np 12 ./filter &> log.filter')
os.system("".join([
#shell('mpirun -np 12 ./filter &> log.filter')
shell("".join([
"mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-",
str(int(nproc) - 1)," -np ",str(int(nproc))," ./filter > log.filter"]))
print("./filter took", int(time_module.time() - t), "seconds")
......@@ -443,22 +449,21 @@ def get_parametrized_error(obscfg):
# run obs operator (through filter program)
# creates obs_seq.final containing truth & prior Hx
run_Hx(time, obscfg)
osf = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.final")
osf = run_Hx(time, obscfg)
df_obs = osf.df
if hasattr(exp, "superob_km"):
print("superobbing to", exp.superob_km, "km")
t = time_module.time()
df_obs = df_obs.superob(window_km=exp.superob_km)
print("superob took", int(time_module.time() - t), "seconds")
Hx_prior = df_obs.get_prior_Hx().T
Hx_truth = df_obs.get_truth_Hx()
# compute the obs error for assimilation on the averaged grid
# since the assimilation is done on the averaged grid
if obscfg.get("sat_channel") == 6:
return calc_obserr_WV73(Hx_truth, Hx_prior)
elif obscfg.get("sat_channel") == 1:
return calc_obserr_VIS06(Hx_truth, Hx_prior)
else:
NotImplementedError('error_assimilate if False and sat_channel', obscfg.get("sat_channel"))
def set_obserr_assimilate_in_obsseqout(obsseqout, outfile="./obs_seq.out"):
for obscfg in exp.observations:
......@@ -468,39 +473,56 @@ def set_obserr_assimilate_in_obsseqout(obsseqout, outfile="./obs_seq.out"):
# modify each kind separately, one after each other
mask_kind = obsseqout.df.kind == kind
if is_assim_error_parametrized(obscfg):
if obscfg["error_assimilate"] == False:
assim_err = get_parametrized_error(obscfg)
obsseqout.df.loc[mask_kind, 'variance'] = assim_err**2
#assert np.allclose(assim_err, obsseqout.df['variance']**2)
#assert np.allclose(assim_err, obsseqout.df['variance']**2) # check
else:
# overwrite with user-defined values
obsseqout.df.loc[mask_kind, 'variance'] = obscfg["error_assimilate"]**2
obsseqout.to_dart(outfile)
def qc_obs(oso, outfile='./obs_seq.out'):
# obs should be superobbed already!
for i, obscfg in enumerate(exp.observations):
if i > 0:
raise NotImplementedError()
# run obs operator (through filter program)
# creates obs_seq.final containing truth & prior Hx
osf = run_Hx(time, obscfg)
df_final = osf.df
#from IPython import embed; embed()
Hx_prior = df_final.get_prior_Hx().T
Hx_prior_mean = np.mean(Hx_prior, axis=0)
#Hx_prior_spread = df_final['prior ensemble spread'].values
#Hx_prior_spread[Hx_prior_spread < 1e-9] = 1e-9
obs = oso.df.observations.values
n_obs_orig = len(obs)
obs_dist_to_priormean = abs(obs - Hx_prior_mean) # /Hx_prior_spread
oso.df = oso.df[obs_dist_to_priormean > 0.1]
print('removed', n_obs_orig-len(oso.df), 'observations with fgd smaller than .1')
# obs_dist_to_priormean = abs(obs - Hx_prior_mean)
# oso.df = oso.df[obs_dist_to_priormean > 5]
# print('removed', n_obs_orig-len(oso.df), 'observations with abs(FGD) smaller than 5')
oso.to_dart(outfile)
print('saved', outfile)
if __name__ == "__main__":
"""Assimilate observations (different obs types)
"""Assimilate observations
as defined in config/cfg.py
for a certain timestamp (argument) of the nature run (defined in config/clusters.py)
Workflow:
for each assimilation stage (one obs_seq.in and e.g. one observation type):
1) create obs_seq.in with obs-errors
2) prepare nature run for DART
3) create obs from nature (obs_seq.out) using pre-defined obs-errors
4) Assimilate
- adapt obs errors for assimilation
- calculate assim obs error from parametrization
1) create obs_seq.in with obs error=0
2) calculate y_nat = H(x_nature) and y_ens = H(x_ensemble)
3) calculate obs error as function of y_nat, y_ensmean
- or get assim obs error from config
- edit obs error in obs_seq.out
- assimilate
- write state to archive
1) prepare nature run & prior ensemble for DART
2) create obs_seq.in
3) create obs from nature (obs_seq.out) with user-defined obs-errors
4) Assimilate with assigned errors
Note:
assim_time (dt.datetime): time of output
......@@ -533,30 +555,22 @@ if __name__ == "__main__":
prepare_prior_ensemble(time, prior_init_time, prior_valid_time, prior_path_exp)
################################################
print(" 1) generate observations with specified obs-error")
# the obs-error we use for generating obs is user-defined
error_generate = []
for i, obscfg in enumerate(exp.observations):
levels = obscfg.get("heights", [1,])
n_obs_z = len(levels)
n_obs_3d = obscfg["n_obs"] * n_obs_z
error_generate.extend(np.zeros(n_obs_3d) + obscfg["error_generate"])
osq.create_obsseqin_alltypes(time, exp.observations, error_generate)
print(" 1) create observations with specified obs-error")
osq.create_obsseqin_alltypes(time, exp.observations)
set_DART_nml()
run_perfect_model_obs() # actually create observations that are used to assimilate
run_perfect_model_obs() # create observations
print(" 2) obs preprocessing")
# read in the actual observations
oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out")
if False: # only refl < 6 dBz
# oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out")
oso.df = oso.df[oso.df['truth'].values < 6]
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
if True:
oso.df.loc[oso.df['observations'].values < 0.293, ('observations')] = 0.293 # set reflectance to sky clear
if False: # set reflectance < surface albedo to surface albedo
oso.df.loc[oso.df['observations'].values < 0.293, ('observations')] = 0.293
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
if hasattr(exp, "superob_km"):
......@@ -565,11 +579,13 @@ if __name__ == "__main__":
copy(cluster.dartrundir + "/obs_seq.out", cluster.dartrundir + "/obs_seq.out-orig")
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
# qc_obs(oso, outfile=cluster.dartrundir + "/obs_seq.out")
################################################
print(" 2) assign observation-errors for assimilation ")
print(" 3) assign observation-errors for assimilation ")
set_obserr_assimilate_in_obsseqout(oso, outfile=cluster.dartrundir + "/obs_seq.out")
print(" 3) assimilate ")
print(" 4) assimilate ")
archive_osq_out(time)
set_DART_nml()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment