Skip to content
Snippets Groups Projects
Commit 1d1a4b2b authored by lkugler's avatar lkugler
Browse files

superob

parent bf5467b1
No related branches found
No related tags found
No related merge requests found
*.pyc
slurm-scripts/
logs/
dask-worker-space/
DART_WRF.egg*
build*
\ No newline at end of file
...@@ -7,7 +7,7 @@ class ExperimentConfiguration(object): ...@@ -7,7 +7,7 @@ class ExperimentConfiguration(object):
pass pass
exp = ExperimentConfiguration() exp = ExperimentConfiguration()
exp.expname = "exp_v1.19_wb-random_Radar" exp.expname = "exp_v1.19_wb-random_WV_superob2-12"
exp.model_dx = 2000 exp.model_dx = 2000
exp.n_ens = 40 exp.n_ens = 40
exp.n_nodes = 10 exp.n_nodes = 10
...@@ -23,8 +23,9 @@ exp.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/2021-05-04/ ...@@ -23,8 +23,9 @@ exp.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/2021-05-04/
# needs a horizontal scale too, to calculate the vertical normalization # needs a horizontal scale too, to calculate the vertical normalization
# since you can not specify different vertical localizations for diff. variables # since you can not specify different vertical localizations for diff. variables
exp.cov_loc_vert_km_horiz_km = (1, 30) exp.cov_loc_vert_km_horiz_km = (1, 30)
exp.superob_km = 12
n_obs = 961 #121: 30km, 256:16x16 (20km); 961: 10km resoltn # radar: n_obs for each observation height level n_obs = 22801 #22801: 2km, 121: 30km, 256:16x16 (20km); 961: 10km resoltn # radar: n_obs for each observation height level
vis = dict(plotname='VIS 0.6µm', plotunits='[1]', vis = dict(plotname='VIS 0.6µm', plotunits='[1]',
kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs, kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs,
...@@ -64,7 +65,7 @@ psfc = dict(plotname='SYNOP Pressure', plotunits='[dBz]', ...@@ -64,7 +65,7 @@ psfc = dict(plotname='SYNOP Pressure', plotunits='[dBz]',
cov_loc_radius_km=32) cov_loc_radius_km=32)
exp.observations = [radar] # 108, wv73, vis] exp.observations = [wv73] # 108, wv73, vis]
#exp.update_vars = ['T', 'QVAPOR', 'QCLOUD', 'QICE','CLDFRA'] #exp.update_vars = ['T', 'QVAPOR', 'QCLOUD', 'QICE','CLDFRA']
exp.update_vars = ['U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'CLDFRA'] exp.update_vars = ['U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'CLDFRA']
......
...@@ -8,6 +8,7 @@ from config.cfg import exp, cluster ...@@ -8,6 +8,7 @@ from config.cfg import exp, cluster
from utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print from utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print
import create_obsseq as osq import create_obsseq as osq
import wrfout_add_geo import wrfout_add_geo
import obsseq
earth_radius_km = 6370 earth_radius_km = 6370
...@@ -16,11 +17,13 @@ x_ci = [0, 5, 10.5, 13, 16] # average cloud impact ...@@ -16,11 +17,13 @@ x_ci = [0, 5, 10.5, 13, 16] # average cloud impact
y_oe = [1, 4.5, 10, 12, 13] # adjusted observation error y_oe = [1, 4.5, 10, 12, 13] # adjusted observation error
oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True) oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True)
def oe_73(ci): def oe_73(ci):
if ci < 13: if ci < 13:
return oe_73_linear(ci) return oe_73_linear(ci)
else: else:
return 13. return 13.0
def cloudimpact_73(bt_mod, bt_obs): def cloudimpact_73(bt_mod, bt_obs):
""" """
...@@ -34,15 +37,16 @@ def cloudimpact_73(bt_mod, bt_obs): ...@@ -34,15 +37,16 @@ def cloudimpact_73(bt_mod, bt_obs):
ci = (ci_obs + ci_mod) / 2 ci = (ci_obs + ci_mod) / 2
return ci return ci
def read_prior_obs(f_obs_prior): def read_prior_obs(f_obs_prior):
""" """
docstring docstring
""" """
obsseq = open(f_obs_prior, 'r').readlines() obsseq = open(f_obs_prior, "r").readlines()
OBSs = [] OBSs = []
# read observations from obs_seq.final # read observations from obs_seq.final
for i, line in enumerate(obsseq): for i, line in enumerate(obsseq):
if ' OBS ' in line: if " OBS " in line:
observed = float(obsseq[i + 1].strip()) observed = float(obsseq[i + 1].strip())
truth = float(obsseq[i + 2].strip()) truth = float(obsseq[i + 2].strip())
prior_ensmean = float(obsseq[i + 3].strip()) prior_ensmean = float(obsseq[i + 3].strip())
...@@ -51,23 +55,27 @@ def read_prior_obs(f_obs_prior): ...@@ -51,23 +55,27 @@ def read_prior_obs(f_obs_prior):
for j in range(5, 5 + exp.n_ens): for j in range(5, 5 + exp.n_ens):
prior_ens.append(float(obsseq[i + j].strip())) prior_ens.append(float(obsseq[i + j].strip()))
OBSs.append(dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens))) OBSs.append(
dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens))
)
return OBSs return OBSs
def read_truth_obs_obsseq(f): def read_truth_obs_obsseq(f):
"""Reads observed and true values from obs_seq.out/final files.""" """Reads observed and true values from obs_seq.out/final files."""
obsseq = open(f, 'r').readlines() obsseq = open(f, "r").readlines()
true = [] true = []
obs = [] obs = []
# read observations from obs_seq.out # read observations from obs_seq.out
for i, line in enumerate(obsseq): for i, line in enumerate(obsseq):
if ' OBS ' in line: if " OBS " in line:
observed = float(obsseq[i + 1].strip()) observed = float(obsseq[i + 1].strip())
truth = float(obsseq[i + 2].strip()) truth = float(obsseq[i + 2].strip())
true.append(truth) true.append(truth)
obs.append(observed) obs.append(observed)
return true, obs return true, obs
def replace_errors_obsseqout(f, new_errors): def replace_errors_obsseqout(f, new_errors):
"""Replaces existing observation errors in obs_seq.final files """Replaces existing observation errors in obs_seq.final files
...@@ -75,42 +83,49 @@ def replace_errors_obsseqout(f, new_errors): ...@@ -75,42 +83,49 @@ def replace_errors_obsseqout(f, new_errors):
shape must match the number of observations shape must match the number of observations
""" """
debug = True debug = True
obsseq = open(f, 'r').readlines() obsseq = open(f, "r").readlines()
# find number of lines between two ' OBS ' lines: # find number of lines between two ' OBS ' lines:
first_obs = None first_obs = None
for i, line in enumerate(obsseq): for i, line in enumerate(obsseq):
if ' OBS ' in line: if " OBS " in line:
if first_obs is None: if first_obs is None:
first_obs = i first_obs = i
else: else:
second_obs = i second_obs = i
break break
lines_between = second_obs - first_obs lines_between = second_obs - first_obs
lines_obserr_after_obsnr = lines_between - 1 # obserr line is one before ' OBS ' line lines_obserr_after_obsnr = (
lines_between - 1
) # obserr line is one before ' OBS ' line
# replace values in list obsseq # replace values in list obsseq
i_obs = 0 i_obs = 0
for i, line in enumerate(obsseq): for i, line in enumerate(obsseq):
if ' OBS ' in line: if " OBS " in line:
line_error_obs_i = i + lines_obserr_after_obsnr line_error_obs_i = i + lines_obserr_after_obsnr
previous_err_var = obsseq[line_error_obs_i] previous_err_var = obsseq[line_error_obs_i]
new_err_obs_i = new_errors[i_obs] ** 2 # variance in obs_seq.out new_err_obs_i = new_errors[i_obs] ** 2 # variance in obs_seq.out
if debug: if debug:
print(line.strip(), 'previous err var ', float(previous_err_var.strip()), 'new error', new_err_obs_i) print(
obsseq[line_error_obs_i] = ' '+str(new_err_obs_i)+' \n' line.strip(),
"previous err var ",
float(previous_err_var.strip()),
"new error",
new_err_obs_i,
)
obsseq[line_error_obs_i] = " " + str(new_err_obs_i) + " \n"
i_obs += 1 # next iteration i_obs += 1 # next iteration
with open(f, 'w') as file: with open(f, "w") as file:
for line in obsseq: for line in obsseq:
file.write(line) file.write(line)
print('replaced obs errors in', f) print("replaced obs errors in", f)
def set_DART_nml(just_prior_values=False): def set_DART_nml(just_prior_values=False):
def to_radian_horizontal(cov_loc_horiz_km): def to_radian_horizontal(cov_loc_horiz_km):
cov_loc_radian = cov_loc_horiz_km / earth_radius_km cov_loc_radian = cov_loc_horiz_km / earth_radius_km
return cov_loc_radian return cov_loc_radian
...@@ -119,96 +134,111 @@ def set_DART_nml(just_prior_values=False): ...@@ -119,96 +134,111 @@ def set_DART_nml(just_prior_values=False):
vert_norm_rad = earth_radius_km * cov_loc_vert_km / cov_loc_horiz_km * 1000 vert_norm_rad = earth_radius_km * cov_loc_vert_km / cov_loc_horiz_km * 1000
return vert_norm_rad return vert_norm_rad
list_obstypes = [obscfg['kind'] for obscfg in exp.observations] list_obstypes = [obscfg["kind"] for obscfg in exp.observations]
list_cov_loc_radius_km = [obscfg['cov_loc_radius_km'] for obscfg in exp.observations] list_cov_loc_radius_km = [
obscfg["cov_loc_radius_km"] for obscfg in exp.observations
]
list_cov_loc_radian = [str(to_radian_horizontal(a)) for a in list_cov_loc_radius_km] list_cov_loc_radian = [str(to_radian_horizontal(a)) for a in list_cov_loc_radius_km]
if just_prior_values: if just_prior_values:
template = cluster.scriptsdir+'/../templates/input.eval.nml' template = cluster.scriptsdir + "/../templates/input.eval.nml"
else: else:
template = cluster.scriptsdir+'/../templates/input.nml' template = cluster.scriptsdir + "/../templates/input.nml"
copy(template, cluster.dartrundir+'/input.nml') copy(template, cluster.dartrundir + "/input.nml")
# options keys are replaced in input.nml with values # options keys are replaced in input.nml with values
options = {'<n_ens>': str(int(exp.n_ens)), options = {
'<cov_loc_radian>': '0.00000001', # dummy value, used for types not mentioned below "<n_ens>": str(int(exp.n_ens)),
'<list_obstypes>': "'" + "','".join(list_obstypes) + "'", "<cov_loc_radian>": "0.00000001", # dummy value, used for types not mentioned below
'<list_cutoffs>': ", ".join(list_cov_loc_radian), "<list_obstypes>": "'" + "','".join(list_obstypes) + "'",
"<list_cutoffs>": ", ".join(list_cov_loc_radian),
} }
# if cov_loc_vert_km: # if cov_loc_vert_km:
options['<horiz_dist_only>'] = '.true.' options["<horiz_dist_only>"] = ".true."
cov_loc_vert_km, cov_loc_horiz_km = exp.cov_loc_vert_km_horiz_km cov_loc_vert_km, cov_loc_horiz_km = exp.cov_loc_vert_km_horiz_km
vert_norm_hgt = to_vertical_normalization(cov_loc_vert_km, cov_loc_horiz_km) vert_norm_hgt = to_vertical_normalization(cov_loc_vert_km, cov_loc_horiz_km)
options['<vert_norm_hgt>'] = str(vert_norm_hgt) options["<vert_norm_hgt>"] = str(vert_norm_hgt)
# else: # else:
# options['<horiz_dist_only>'] = '.true.' # options['<horiz_dist_only>'] = '.true.'
# options['<vert_norm_hgt>'] = '50000.0' # dummy value # options['<vert_norm_hgt>'] = '50000.0' # dummy value
for key, value in options.items(): for key, value in options.items():
sed_inplace(cluster.dartrundir+'/input.nml', key, value) sed_inplace(cluster.dartrundir + "/input.nml", key, value)
# input.nml for RTTOV # input.nml for RTTOV
rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml' rttov_nml = cluster.scriptsdir + "/../templates/obs_def_rttov.VIS.nml"
append_file(cluster.dartrundir+'/input.nml', rttov_nml) append_file(cluster.dartrundir + "/input.nml", rttov_nml)
def obs_operator_ensemble(): def obs_operator_ensemble():
# assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01 # assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01
print('running obs operator on ensemble forecast') print("running obs operator on ensemble forecast")
os.chdir(cluster.dartrundir) os.chdir(cluster.dartrundir)
list_ensemble_truths = [] list_ensemble_truths = []
t = time_module.time() t = time_module.time()
for iens in range(1, exp.n_ens + 1): for iens in range(1, exp.n_ens + 1):
print('observation operator for ens #'+str(iens)) print("observation operator for ens #" + str(iens))
# ens members are already linked to advance_temp<i>/wrfout_d01 # ens members are already linked to advance_temp<i>/wrfout_d01
copy(cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01', copy(
cluster.dartrundir+'/wrfout_d01') cluster.dartrundir + "/advance_temp" + str(iens) + "/wrfout_d01",
cluster.dartrundir + "/wrfout_d01",
)
# DART may need a wrfinput file as well, which serves as a template for dimension sizes # DART may need a wrfinput file as well, which serves as a template for dimension sizes
symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01') symlink(
cluster.dartrundir + "/wrfout_d01", cluster.dartrundir + "/wrfinput_d01"
)
# run perfect_model obs (forward operator) # run perfect_model obs (forward operator)
os.system('mpirun -np 12 ./perfect_model_obs > /dev/null') os.system("mpirun -np 12 ./perfect_model_obs > /dev/null")
# truth values in obs_seq.out are H(x) values # truth values in obs_seq.out are H(x) values
true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') true, _ = read_truth_obs_obsseq(cluster.dartrundir + "/obs_seq.out")
list_ensemble_truths.append(true) list_ensemble_truths.append(true)
print('obs operator ensemble took', int(time_module.time()-t), 'seconds') print("obs operator ensemble took", int(time_module.time() - t), "seconds")
n_obs = len(list_ensemble_truths[0]) n_obs = len(list_ensemble_truths[0])
np_array = np.full((exp.n_ens, n_obs), np.nan) np_array = np.full((exp.n_ens, n_obs), np.nan)
for i in range(exp.n_ens): for i in range(exp.n_ens):
np_array[i, :] = list_ensemble_truths[i] np_array[i, :] = list_ensemble_truths[i]
return np_array return np_array
def obs_operator_nature(time): def obs_operator_nature(time):
print('getting true values in obs space from nature run') print("getting true values in obs space from nature run")
prepare_nature_dart(time) prepare_nature_dart(time)
run_perfect_model_obs() run_perfect_model_obs()
true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') true, _ = read_truth_obs_obsseq(cluster.dartrundir + "/obs_seq.out")
return true return true
def link_nature_to_dart_truth(time): def link_nature_to_dart_truth(time):
# get wrfout_d01 from nature run # get wrfout_d01 from nature run
shutil.copy(time.strftime(exp.nature_wrfout), shutil.copy(time.strftime(exp.nature_wrfout), cluster.dartrundir + "/wrfout_d01")
cluster.dartrundir+'/wrfout_d01')
# DART may need a wrfinput file as well, which serves as a template for dimension sizes # DART may need a wrfinput file as well, which serves as a template for dimension sizes
symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01') symlink(cluster.dartrundir + "/wrfout_d01", cluster.dartrundir + "/wrfinput_d01")
print('linked', time.strftime(exp.nature_wrfout), 'to', cluster.dartrundir+'/wrfout_d01') print(
"linked",
time.strftime(exp.nature_wrfout),
"to",
cluster.dartrundir + "/wrfout_d01",
)
def prepare_nature_dart(time): def prepare_nature_dart(time):
print('linking nature to DART & georeferencing') print("linking nature to DART & georeferencing")
link_nature_to_dart_truth(time) link_nature_to_dart_truth(time)
wrfout_add_geo.run(cluster.dartrundir+'/../geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') wrfout_add_geo.run(
cluster.dartrundir + "/../geo_em.d01.nc", cluster.dartrundir + "/wrfout_d01"
)
def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_path_exp): def prepare_prior_ensemble(
assim_time, prior_init_time, prior_valid_time, prior_path_exp
):
"""Prepares DART files for running filter """Prepares DART files for running filter
i.e. i.e.
- links first guess state to DART first guess filenames - links first guess state to DART first guess filenames
...@@ -217,54 +247,60 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ ...@@ -217,54 +247,60 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
- writes txt files so DART knows what input and output is - writes txt files so DART knows what input and output is
- removes probably pre-existing files which could lead to problems - removes probably pre-existing files which could lead to problems
""" """
print('prepare prior state estimate') print("prepare prior state estimate")
for iens in range(1, exp.n_ens + 1): for iens in range(1, exp.n_ens + 1):
print('link wrfout file to DART background file') print("link wrfout file to DART background file")
wrfout_run = prior_path_exp \ wrfout_run = (
+prior_init_time.strftime('/%Y-%m-%d_%H:%M/') \ prior_path_exp
+str(iens) \ + prior_init_time.strftime("/%Y-%m-%d_%H:%M/")
+prior_valid_time.strftime('/wrfout_d01_%Y-%m-%d_%H:%M:%S') + str(iens)
dart_ensdir = cluster.dartrundir+'/advance_temp'+str(iens) + prior_valid_time.strftime("/wrfout_d01_%Y-%m-%d_%H:%M:%S")
wrfout_dart = dart_ensdir+'/wrfout_d01' )
dart_ensdir = cluster.dartrundir + "/advance_temp" + str(iens)
wrfout_dart = dart_ensdir + "/wrfout_d01"
os.makedirs(dart_ensdir, exist_ok=True) os.makedirs(dart_ensdir, exist_ok=True)
print('copy', wrfout_run, 'to', wrfout_dart) print("copy", wrfout_run, "to", wrfout_dart)
copy(wrfout_run, wrfout_dart) copy(wrfout_run, wrfout_dart)
symlink(wrfout_dart, dart_ensdir+'/wrfinput_d01') symlink(wrfout_dart, dart_ensdir + "/wrfinput_d01")
# ensure prior time matches assim time (can be off intentionally) # ensure prior time matches assim time (can be off intentionally)
if assim_time != prior_valid_time: if assim_time != prior_valid_time:
print('overwriting time in prior from nature wrfout') print("overwriting time in prior from nature wrfout")
os.system(cluster.ncks+' -A -v XTIME,Times ' os.system(
+cluster.dartrundir+'/wrfout_d01 '+wrfout_dart) cluster.ncks
+ " -A -v XTIME,Times "
+ cluster.dartrundir
+ "/wrfout_d01 "
+ wrfout_dart
)
# this seems to be necessary (else wrong level selection) # this seems to be necessary (else wrong level selection)
wrfout_add_geo.run(cluster.dartrundir+'/../geo_em.d01.nc', wrfout_dart) wrfout_add_geo.run(cluster.dartrundir + "/../geo_em.d01.nc", wrfout_dart)
fpath = cluster.dartrundir+'/input_list.txt' fpath = cluster.dartrundir + "/input_list.txt"
print('writing', fpath) print("writing", fpath)
try_remove(fpath) try_remove(fpath)
with open(fpath, 'w') as f: with open(fpath, "w") as f:
for iens in range(1, exp.n_ens + 1): for iens in range(1, exp.n_ens + 1):
f.write('./advance_temp'+str(iens)+'/wrfout_d01') f.write("./advance_temp" + str(iens) + "/wrfout_d01")
f.write('\n') f.write("\n")
fpath = cluster.dartrundir+'/output_list.txt' fpath = cluster.dartrundir + "/output_list.txt"
print('writing', fpath) print("writing", fpath)
try_remove(fpath) try_remove(fpath)
with open(fpath, 'w') as f: with open(fpath, "w") as f:
for iens in range(1, exp.n_ens + 1): for iens in range(1, exp.n_ens + 1):
f.write('./filter_restart_d01.'+str(iens).zfill(4)) f.write("./filter_restart_d01." + str(iens).zfill(4))
f.write('\n') f.write("\n")
print('removing preassim and filter_restart') print("removing preassim and filter_restart")
os.system('rm -rf '+cluster.dartrundir+'/preassim_*') os.system("rm -rf " + cluster.dartrundir + "/preassim_*")
os.system('rm -rf '+cluster.dartrundir+'/filter_restart*') os.system("rm -rf " + cluster.dartrundir + "/filter_restart*")
os.system('rm -rf '+cluster.dartrundir+'/output_mean*') os.system("rm -rf " + cluster.dartrundir + "/output_mean*")
os.system('rm -rf '+cluster.dartrundir+'/output_sd*') os.system("rm -rf " + cluster.dartrundir + "/output_sd*")
os.system('rm -rf '+cluster.dartrundir+'/perfect_output_*') os.system("rm -rf " + cluster.dartrundir + "/perfect_output_*")
os.system('rm -rf '+cluster.dartrundir+'/obs_seq.fina*') os.system("rm -rf " + cluster.dartrundir + "/obs_seq.fina*")
def calc_obserr_WV73(Hx_nature, Hx_prior): def calc_obserr_WV73(Hx_nature, Hx_prior):
...@@ -279,30 +315,41 @@ def calc_obserr_WV73(Hx_nature, Hx_prior): ...@@ -279,30 +315,41 @@ def calc_obserr_WV73(Hx_nature, Hx_prior):
mean_CI = np.mean(CIs) mean_CI = np.mean(CIs)
oe_nature = oe_73(mean_CI) oe_nature = oe_73(mean_CI)
print('oe_nature:', oe_nature, ', bt_y:', bt_y, ', mean_CI:', mean_CI) print("oe_nature:", oe_nature, ", bt_y:", bt_y, ", mean_CI:", mean_CI)
OEs[iobs] = oe_nature OEs[iobs] = oe_nature
return OEs return OEs
def run_perfect_model_obs(): def run_perfect_model_obs():
print('generating observations - running ./perfect_model_obs') print("generating observations - running ./perfect_model_obs")
os.chdir(cluster.dartrundir) os.chdir(cluster.dartrundir)
try_remove(cluster.dartrundir+'/obs_seq.out') try_remove(cluster.dartrundir + "/obs_seq.out")
if not os.path.exists(cluster.dartrundir+'/obs_seq.in'): if not os.path.exists(cluster.dartrundir + "/obs_seq.in"):
raise RuntimeError('obs_seq.in does not exist in '+cluster.dartrundir) raise RuntimeError("obs_seq.in does not exist in " + cluster.dartrundir)
os.system('mpirun -np 12 ./perfect_model_obs > log.perfect_model_obs') os.system("mpirun -np 12 ./perfect_model_obs > log.perfect_model_obs")
if not os.path.exists(cluster.dartrundir+'/obs_seq.out'): if not os.path.exists(cluster.dartrundir + "/obs_seq.out"):
raise RuntimeError('obs_seq.out does not exist in '+cluster.dartrundir, raise RuntimeError(
'\n look for '+cluster.dartrundir+'/log.perfect_model_obs') "obs_seq.out does not exist in " + cluster.dartrundir,
"\n look for " + cluster.dartrundir + "/log.perfect_model_obs",
)
def assimilate(nproc=96): def assimilate(nproc=96):
print('time now', dt.datetime.now()) print("time now", dt.datetime.now())
print('running filter') print("running filter")
os.chdir(cluster.dartrundir) os.chdir(cluster.dartrundir)
try_remove(cluster.dartrundir+'/obs_seq.final') try_remove(cluster.dartrundir + "/obs_seq.final")
t = time_module.time() t = time_module.time()
os.system('mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-'+str(int(nproc)-1)+' -np '+str(int(nproc))+' ./filter > log.filter') os.system(
print('./filter took', int(time_module.time()-t), 'seconds') "mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-"
+ str(int(nproc) - 1)
+ " -np "
+ str(int(nproc))
+ " ./filter > log.filter"
)
print("./filter took", int(time_module.time() - t), "seconds")
# currently unused # currently unused
# def recycle_output(): # def recycle_output():
...@@ -333,6 +380,7 @@ def assimilate(nproc=96): ...@@ -333,6 +380,7 @@ def assimilate(nproc=96):
############### archiving ############### archiving
def archive_osq_final(time, posterior_1min=False): def archive_osq_final(time, posterior_1min=False):
"""Save obs_seq.final file for later. """Save obs_seq.final file for later.
time (dt.datetime) : time of sampling values from files time (dt.datetime) : time of sampling values from files
...@@ -340,13 +388,13 @@ def archive_osq_final(time, posterior_1min=False): ...@@ -340,13 +388,13 @@ def archive_osq_final(time, posterior_1min=False):
""" """
if posterior_1min: if posterior_1min:
archive_dir = cluster.archivedir+'/obs_seq_final_1min/' archive_dir = cluster.archivedir + "/obs_seq_final_1min/"
else: else:
archive_dir = cluster.archivedir+'/obs_seq_final/' archive_dir = cluster.archivedir + "/obs_seq_final/"
mkdir(archive_dir) mkdir(archive_dir)
fout = archive_dir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final') fout = archive_dir + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.final")
copy(cluster.dartrundir+'/obs_seq.final', fout) copy(cluster.dartrundir + "/obs_seq.final", fout)
print(fout, 'saved.') print(fout, "saved.")
# try: # try:
# print('archive regression diagnostics') # what are regression diagnostics?! # print('archive regression diagnostics') # what are regression diagnostics?!
...@@ -354,32 +402,50 @@ def archive_osq_final(time, posterior_1min=False): ...@@ -354,32 +402,50 @@ def archive_osq_final(time, posterior_1min=False):
# except Exception as e: # except Exception as e:
# warnings.warn(str(e)) # warnings.warn(str(e))
def archive_filteroutput(time): def archive_filteroutput(time):
print('archiving output') print("archiving output")
archive_assim = cluster.archivedir + time.strftime('/%Y-%m-%d_%H:%M/assim_stage0/') archive_assim = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/")
mkdir(archive_assim) mkdir(archive_assim)
copy(cluster.dartrundir+'/input.nml', archive_assim+'/input.nml') copy(cluster.dartrundir + "/input.nml", archive_assim + "/input.nml")
for iens in range(1, exp.n_ens + 1): # single members for iens in range(1, exp.n_ens + 1): # single members
copy(cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4), copy(
archive_assim+'/filter_restart_d01.'+str(iens).zfill(4)) cluster.dartrundir + "/filter_restart_d01." + str(iens).zfill(4),
archive_assim + "/filter_restart_d01." + str(iens).zfill(4),
)
try: # not necessary for next forecast run try: # not necessary for next forecast run
for iens in range(1, exp.n_ens + 1): for iens in range(1, exp.n_ens + 1):
copy(cluster.dartrundir+'/postassim_member_'+str(iens).zfill(4)+'.nc', copy(
archive_assim+'/postassim_member_'+str(iens).zfill(4)+'.nc') cluster.dartrundir + "/postassim_member_" + str(iens).zfill(4) + ".nc",
archive_assim + "/postassim_member_" + str(iens).zfill(4) + ".nc",
)
for f in ['output_mean.nc', 'output_sd.nc']: # copy mean and sd to archive for f in ["output_mean.nc", "output_sd.nc"]: # copy mean and sd to archive
copy(cluster.dartrundir+'/'+f, archive_assim+'/'+f) copy(cluster.dartrundir + "/" + f, archive_assim + "/" + f)
except Exception as e: except Exception as e:
warnings.warn(str(e)) warnings.warn(str(e))
def archive_osq_out(time): def archive_osq_out(time):
dir_obsseq = cluster.archivedir+'/obs_seq_out/' dir_obsseq = cluster.archivedir + "/obs_seq_out/"
os.makedirs(dir_obsseq, exist_ok=True) os.makedirs(dir_obsseq, exist_ok=True)
copy(cluster.dartrundir+'/obs_seq.out', dir_obsseq+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.out')) copy(
cluster.dartrundir + "/obs_seq.out",
dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out"),
)
def superob():
if hasattr(exp, "superob_km"):
t = time_module.time()
obs = obsseq.ObsSeqOut(cluster.dartrundir + "/obs_seq.out")
print("superobbing to", exp.superob_km, "km")
obs.superob(window_km=exp.superob_km)
obs.to_dart(f=cluster.dartrundir + "/obs_seq.out")
print("superob took", int(time_module.time() - t), "seconds")
if __name__ == "__main__": if __name__ == "__main__":
...@@ -412,75 +478,100 @@ if __name__ == "__main__": ...@@ -412,75 +478,100 @@ if __name__ == "__main__":
python assim.py 2008-08-07_12:00 2008-08-06:00 2008-08-07_13:00 /home/fs71386/lkugler/data/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem python assim.py 2008-08-07_12:00 2008-08-06:00 2008-08-07_13:00 /home/fs71386/lkugler/data/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem
""" """
time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') time = dt.datetime.strptime(sys.argv[1], "%Y-%m-%d_%H:%M")
prior_init_time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M') prior_init_time = dt.datetime.strptime(sys.argv[2], "%Y-%m-%d_%H:%M")
prior_valid_time = dt.datetime.strptime(sys.argv[3], '%Y-%m-%d_%H:%M') prior_valid_time = dt.datetime.strptime(sys.argv[3], "%Y-%m-%d_%H:%M")
prior_path_exp = str(sys.argv[4]) prior_path_exp = str(sys.argv[4])
archive_time = cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/') archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/")
os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in
os.chdir(cluster.dartrundir) os.chdir(cluster.dartrundir)
os.system(cluster.python+' '+cluster.scripts_rundir+'/link_dart_rttov.py') # link DART binaries to run_DART os.system(
os.system('rm -f input.nml obs_seq.in obs_seq.out obs_seq.final') # remove any existing observation files cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py"
) # link DART binaries to run_DART
os.system(
"rm -f input.nml obs_seq.in obs_seq.out obs_seq.final"
) # remove any existing observation files
set_DART_nml() set_DART_nml()
print('prepare nature') print("prepare nature")
prepare_nature_dart(time) # link WRF files to DART directory prepare_nature_dart(time) # link WRF files to DART directory
print('prepare prior ensemble') print("prepare prior ensemble")
prepare_prior_ensemble(time, prior_init_time, prior_valid_time, prior_path_exp) prepare_prior_ensemble(time, prior_init_time, prior_valid_time, prior_path_exp)
################################################ ################################################
print(' 1) get the assimilation errors in a single vector ') print(" 1) get the assimilation errors in a single vector ")
error_assimilate = [] error_assimilate = []
# to get the obs-error for assimilation, # to get the obs-error for assimilation,
# we need to get the true obs-space values for the parametrized variable # we need to get the true obs-space values for the parametrized variable
# and collect the obs-error for assimilation in a single vector/list # and collect the obs-error for assimilation in a single vector/list
for i, obscfg in enumerate(exp.observations): for i, obscfg in enumerate(exp.observations):
n_obs = obscfg['n_obs'] n_obs = obscfg["n_obs"]
n_obs_z = len(obscfg.get('heights', [1,])) n_obs_z = len(
obscfg.get(
"heights",
[
1,
],
)
)
n_obs_3d = n_obs * n_obs_z n_obs_3d = n_obs * n_obs_z
do_error_parametrization = ((obscfg['error_assimilate'] == False) and (obscfg.get('sat_channel') == 6)) do_error_parametrization = (obscfg["error_assimilate"] == False) and (
obscfg.get("sat_channel") == 6
)
if do_error_parametrization: if do_error_parametrization:
# get observations for sat 6 # get observations for sat 6
osq.create_obsseqin_alltypes(time, [obscfg,], np.zeros(n_obs_3d)) osq.create_obsseqin_alltypes(
time,
[
obscfg,
],
np.zeros(n_obs_3d),
)
run_perfect_model_obs() run_perfect_model_obs()
superob()
# depends on obs_seq.out produced before by run_perfect_model_obs() # depends on obs_seq.out produced before by run_perfect_model_obs()
Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir + "/obs_seq.out")
Hx_prior = obs_operator_ensemble() # files are already linked to DART directory Hx_prior = (
obs_operator_ensemble()
) # files are already linked to DART directory
err_assim = calc_obserr_WV73(Hx_nat, Hx_prior) err_assim = calc_obserr_WV73(Hx_nat, Hx_prior)
else: else:
err_assim = np.zeros(n_obs_3d) + obscfg['error_assimilate'] err_assim = np.zeros(n_obs_3d) + obscfg["error_assimilate"]
error_assimilate.extend(err_assim) # the obs-error we assume for assimilating observations error_assimilate.extend(
err_assim
) # the obs-error we assume for assimilating observations
################################################ ################################################
print(' 2) generate observations ') print(" 2) generate observations ")
# the obs-error we use for generating obs is user-defined # the obs-error we use for generating obs is user-defined
error_generate = [] error_generate = []
for i, obscfg in enumerate(exp.observations): for i, obscfg in enumerate(exp.observations):
error_generate.extend(np.zeros(n_obs_3d) + obscfg['error_generate']) error_generate.extend(np.zeros(n_obs_3d) + obscfg["error_generate"])
osq.create_obsseqin_alltypes(time, exp.observations, error_generate) osq.create_obsseqin_alltypes(time, exp.observations, error_generate)
set_DART_nml() set_DART_nml()
run_perfect_model_obs() # actually create observations that are used to assimilate run_perfect_model_obs() # actually create observations that are used to assimilate
superob()
archive_osq_out(time) archive_osq_out(time)
################################################ ################################################
print(' 3) assimilate with observation-errors for assimilation') print(" 3) assimilate with observation-errors for assimilation")
replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) replace_errors_obsseqout(cluster.dartrundir + "/obs_seq.out", error_assimilate)
t = time_module.time() t = time_module.time()
assimilate() assimilate()
print('filter took', time_module.time()-t, 'seconds') print("filter took", time_module.time() - t, "seconds")
archive_filteroutput(time) archive_filteroutput(time)
archive_osq_final(time) archive_osq_final(time)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment