Skip to content
Snippets Groups Projects
Commit eed51634 authored by lkugler's avatar lkugler
Browse files

history_interval_s, prep_nature, docs

parent b5d21cd5
No related branches found
No related tags found
No related merge requests found
...@@ -12,7 +12,12 @@ from dartwrf import dart_nml ...@@ -12,7 +12,12 @@ from dartwrf import dart_nml
from dartwrf.exp_config import exp from dartwrf.exp_config import exp
from dartwrf.server_config import cluster from dartwrf.server_config import cluster
wrfout_format = 'wrfout_d01_%Y-%m-%d_%H:%M:%S'
wrfout_format = '/wrfout_d01_%Y-%m-%d_%H:%M:%S' # WRF file format, will only change if WRF changes
pattern_init_time = "/%Y-%m-%d_%H:%M/" # pattern for the init_timme folder in sim_archive
pattern_obs_seq_final = "/%Y-%m-%d_%H:%M:%S_obs_seq.final" # how an obs_seq.final file is archived
pattern_obs_seq_out = "/%Y-%m-%d_%H:%M:%S_obs_seq.out" # how an obs_seq.out file is archived
def _prepare_DART_grid_template(): def _prepare_DART_grid_template():
...@@ -21,8 +26,8 @@ def _prepare_DART_grid_template(): ...@@ -21,8 +26,8 @@ def _prepare_DART_grid_template():
symlink(cluster.dart_rundir + "/prior_ens1/wrfout_d01", symlink(cluster.dart_rundir + "/prior_ens1/wrfout_d01",
cluster.dart_rundir + "/wrfinput_d01") cluster.dart_rundir + "/wrfinput_d01")
def _copy_nature_to_dart(time): def _find_nature(time):
"""Copies wrfout_d01 from nature run to DART directory """Find the path to the nature file for the given time
""" """
glob_pattern = time.strftime(exp.nature_wrfout_pattern) # replace time in pattern glob_pattern = time.strftime(exp.nature_wrfout_pattern) # replace time in pattern
print('searching for nature in pattern:', glob_pattern) print('searching for nature in pattern:', glob_pattern)
...@@ -30,14 +35,9 @@ def _copy_nature_to_dart(time): ...@@ -30,14 +35,9 @@ def _copy_nature_to_dart(time):
# check user input # check user input
if not 'wrfout' in f_nat.split('/')[-1]: if not 'wrfout' in f_nat.split('/')[-1]:
warnings.warn(f+" does not contain 'wrfout' in filename, are you sure this is a valid nature file?") warnings.warn(f_nat+" does not contain 'wrfout' in filename, are you sure this is a valid nature file?")
assert os.path.exists(f_nat), f_nat+" does not exist"
# copy nature wrfout to DART directory return f_nat
shutil.copy(f_nat, cluster.dart_rundir + "/wrfout_d01")
# add coordinates if necessary
if cluster.geo_em_for_WRF_ideal:
wrfout_add_geo.run(cluster.geo_em_for_WRF_ideal, cluster.dart_rundir + "/wrfout_d01")
def prepare_nature_dart(time): def prepare_nature_dart(time):
"""Prepares DART nature (wrfout_d01) if available """Prepares DART nature (wrfout_d01) if available
...@@ -46,16 +46,22 @@ def prepare_nature_dart(time): ...@@ -46,16 +46,22 @@ def prepare_nature_dart(time):
time (dt.datetime): Time at which observations will be made time (dt.datetime): Time at which observations will be made
""" """
try: try:
_copy_nature_to_dart(time) f_nat = _find_nature(time)
except (FileNotFoundError, AttributeError) as e: # if nature is not available due to any reason except:
print('-> has no nature, not copying nature') print('-> no nature available')
return
shutil.copy(f_nat, cluster.dart_rundir + "/wrfout_d01") # copy nature wrfout to DART directory
# add coordinates if necessary
if cluster.geo_em_for_WRF_ideal:
wrfout_add_geo.run(cluster.geo_em_for_WRF_ideal, cluster.dart_rundir + "/wrfout_d01")
def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_path_exp): def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_path_exp):
"""Prepares DART files for running filter """Prepares DART files for running filter
i.e. i.e.
- links first guess state to DART first guess filenames - links first guess state to DART first guess filenames
- creates wrfinput_d01 files - creates wrfinput_d01 files (grid file, not a real wrfinput file)
- adds geo-reference (xlat,xlon) coords so that DART can deal with the files - adds geo-reference (xlat,xlon) coords so that DART can deal with the files
- writes txt files so DART knows what input and output is - writes txt files so DART knows what input and output is
- removes probably pre-existing files which could lead to problems - removes probably pre-existing files which could lead to problems
...@@ -66,19 +72,23 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ ...@@ -66,19 +72,23 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
print("link wrfout file to DART background file") print("link wrfout file to DART background file")
wrfout_run = ( wrfout_run = (
prior_path_exp prior_path_exp
+ prior_init_time.strftime("/%Y-%m-%d_%H:%M/") + prior_init_time.strftime(pattern_init_time)
+ str(iens) + str(iens)
+ prior_valid_time.strftime("/"+wrfout_format) + prior_valid_time.strftime(wrfout_format)
) )
dart_ensdir = cluster.dart_rundir + "/prior_ens" + str(iens) dart_ensdir = cluster.dart_rundir + "/prior_ens" + str(iens)
wrfout_dart = dart_ensdir + "/wrfout_d01" wrfout_dart = dart_ensdir + "/wrfout_d01"
# copy prior ens file from archived wrfout files (prior_path_exp)
os.makedirs(dart_ensdir, exist_ok=True) os.makedirs(dart_ensdir, exist_ok=True)
print("copy", wrfout_run, "to", wrfout_dart) print("copy", wrfout_run, "to", wrfout_dart)
copy(wrfout_run, wrfout_dart) copy(wrfout_run, wrfout_dart)
# DART needs a grid file for each ensemble member (this is no real wrfinput file)
symlink(wrfout_dart, dart_ensdir + "/wrfinput_d01") symlink(wrfout_dart, dart_ensdir + "/wrfinput_d01")
# ensure prior time matches assim time (can be intentionally different) # ensure prior time matches assim time
# can be intentionally different, e.g. by using a prior for a different time
if assim_time != prior_valid_time: if assim_time != prior_valid_time:
print("overwriting time in prior from nature wrfout") print("overwriting time in prior from nature wrfout")
shell(cluster.ncks+ " -A -v XTIME,Times "+ shell(cluster.ncks+ " -A -v XTIME,Times "+
...@@ -88,7 +98,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ ...@@ -88,7 +98,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
if cluster.geo_em_for_WRF_ideal: if cluster.geo_em_for_WRF_ideal:
wrfout_add_geo.run(cluster.geo_em_for_WRF_ideal, wrfout_dart) wrfout_add_geo.run(cluster.geo_em_for_WRF_ideal, wrfout_dart)
write_list_of_inputfiles_prior() use_linked_files_as_prior()
write_list_of_outputfiles() write_list_of_outputfiles()
print("removing preassim and filter_restart") print("removing preassim and filter_restart")
...@@ -99,7 +109,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ ...@@ -99,7 +109,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
os.system("rm -rf " + cluster.dart_rundir + "/perfect_output_*") os.system("rm -rf " + cluster.dart_rundir + "/perfect_output_*")
os.system("rm -rf " + cluster.dart_rundir + "/obs_seq.fina*") os.system("rm -rf " + cluster.dart_rundir + "/obs_seq.fina*")
def write_list_of_inputfiles_prior(): def use_linked_files_as_prior():
"""Instruct DART to use the prior ensemble as input """Instruct DART to use the prior ensemble as input
""" """
files = [] files = []
...@@ -107,10 +117,10 @@ def write_list_of_inputfiles_prior(): ...@@ -107,10 +117,10 @@ def write_list_of_inputfiles_prior():
files.append("./prior_ens" + str(iens) + "/wrfout_d01") files.append("./prior_ens" + str(iens) + "/wrfout_d01")
write_txt(files, cluster.dart_rundir+'/input_list.txt') write_txt(files, cluster.dart_rundir+'/input_list.txt')
def write_list_of_inputfiles_posterior(assim_time): def use_filter_output_as_prior(assim_time):
"""Use posterior as input for DART, e.g. to evaluate the analysis in observation space """Use posterior as input for DART, e.g. to evaluate the analysis in observation space
""" """
filedir = cluster.archivedir+assim_time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") filedir = cluster.archivedir+assim_time.strftime(pattern_init_time+"/assim_stage0/")
files = [] files = []
for iens in range(1, exp.n_ens+1): for iens in range(1, exp.n_ens+1):
...@@ -150,11 +160,11 @@ def archive_filteroutput(time): ...@@ -150,11 +160,11 @@ def archive_filteroutput(time):
archive_dir = cluster.archivedir + "/obs_seq_final/" archive_dir = cluster.archivedir + "/obs_seq_final/"
mkdir(archive_dir) mkdir(archive_dir)
fout = archive_dir + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.final") fout = archive_dir + time.strftime(pattern_obs_seq_final)
copy(cluster.dart_rundir + "/obs_seq.final", fout) copy(cluster.dart_rundir + "/obs_seq.final", fout)
print(fout, "saved.") print(fout, "saved.")
archive_assim = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") archive_assim = cluster.archivedir + time.strftime(pattern_init_time+"/assim_stage0/")
mkdir(archive_assim) mkdir(archive_assim)
copy(cluster.dart_rundir + "/input.nml", archive_assim + "/input.nml") copy(cluster.dart_rundir + "/input.nml", archive_assim + "/input.nml")
...@@ -240,12 +250,24 @@ def set_obserr_assimilate_in_obsseqout(oso, outfile="./obs_seq.out"): ...@@ -240,12 +250,24 @@ def set_obserr_assimilate_in_obsseqout(oso, outfile="./obs_seq.out"):
oso.to_dart(outfile) oso.to_dart(outfile)
def qc_obs(time, oso): def qc_obs(time, oso):
"""Quality control of observations
We assume that the prior values have been evaluated and are in `run_DART/obs_seq.final`
Args:
time (datetime): time of the assimilation
oso (ObsSeq): python representation of obs_seq.out file, will be modified and written to file
Returns:
None (writes to file)
The pre-existing obs_seq.out will be archived.
The new obs_seq.out will be written to the DART run directory.
"""
osf_prior = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.final") osf_prior = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.final")
# obs should be superobbed already! # obs should be superobbed already!
for i, obscfg in enumerate(exp.observations): for i, obscfg in enumerate(exp.observations):
if i > 0: if i > 0:
raise NotImplementedError() raise NotImplementedError('Multiple observation types -> might not work')
obs = oso.df.observations.values obs = oso.df.observations.values
Hx_prior_mean = osf_prior.df['prior ensemble mean'] Hx_prior_mean = osf_prior.df['prior ensemble mean']
...@@ -253,15 +275,15 @@ def qc_obs(time, oso): ...@@ -253,15 +275,15 @@ def qc_obs(time, oso):
if obscfg.get("sat_channel") == 1: if obscfg.get("sat_channel") == 1:
if False: if True:
print('removing obs with abs(FGD) < 0.05') print('removing obs with abs(FGD) < 0.03')
Hx_prior = osf_prior.df.get_prior_Hx().T Hx_prior = osf_prior.df.get_prior_Hx().T
# Hx_prior_mean = np.mean(Hx_prior, axis=0) Hx_prior_mean = np.mean(Hx_prior, axis=0)
#Hx_prior_spread = osf_prior.df['prior ensemble spread'].values #Hx_prior_spread = osf_prior.df['prior ensemble spread'].values
#Hx_prior_spread[Hx_prior_spread < 1e-9] = 1e-9 #Hx_prior_spread[Hx_prior_spread < 1e-9] = 1e-9
abs_FGD = abs(obs - Hx_prior_mean) # /Hx_prior_spread abs_FGD = abs(obs - Hx_prior_mean) # /Hx_prior_spread
oso.df = oso.df[abs_FGD > 0.05] # Hx_prior_spread] oso.df = oso.df[abs_FGD > 0.03] # Hx_prior_spread]
# obs_dist_to_priormean = abs(obs - Hx_prior_mean) # obs_dist_to_priormean = abs(obs - Hx_prior_mean)
# oso.df = oso.df[obs_dist_to_priormean > 5] # oso.df = oso.df[obs_dist_to_priormean > 5]
...@@ -286,8 +308,8 @@ def qc_obs(time, oso): ...@@ -286,8 +308,8 @@ def qc_obs(time, oso):
print('QC removed', n_obs_orig-len(oso.df), 'observations') print('QC removed', n_obs_orig-len(oso.df), 'observations')
# for archiving # archive obs_seq.out before QC (contains all observations, including later removed ones)
f_out_archive = cluster.archivedir + "/obs_seq_out/" + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC") f_out_archive = cluster.archivedir + "/obs_seq_out/" + time.strftime(pattern_obs_seq_out+"-beforeQC")
os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True) os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True)
copy(cluster.dart_rundir + "/obs_seq.out", f_out_archive) copy(cluster.dart_rundir + "/obs_seq.out", f_out_archive)
...@@ -299,9 +321,8 @@ def qc_obs(time, oso): ...@@ -299,9 +321,8 @@ def qc_obs(time, oso):
def evaluate(assim_time, def evaluate(assim_time,
obs_seq_out=False, obs_seq_out=False,
prior=True, prior_is_filter_output=False,
posterior=False, output_format=pattern_obs_seq_final+"-evaluate"):
output_format="%Y-%m-%d_%H:%M_obs_seq.final-evaluate"):
"""Calculates either prior or posterior obs space values. """Calculates either prior or posterior obs space values.
Note: Depends on a prepared input_list.txt, which defines the ensemble (prior or posterior). Note: Depends on a prepared input_list.txt, which defines the ensemble (prior or posterior).
...@@ -316,9 +337,6 @@ def evaluate(assim_time, ...@@ -316,9 +337,6 @@ def evaluate(assim_time,
Returns: Returns:
obsseq.ObsSeq obsseq.ObsSeq
""" """
if prior == posterior:
raise ValueError('either prior or posterior must be True, the other must be False')
os.makedirs(cluster.dart_rundir, exist_ok=True) # create directory to run DART in os.makedirs(cluster.dart_rundir, exist_ok=True) # create directory to run DART in
os.chdir(cluster.dart_rundir) os.chdir(cluster.dart_rundir)
...@@ -330,10 +348,12 @@ def evaluate(assim_time, ...@@ -330,10 +348,12 @@ def evaluate(assim_time,
print("prepare nature") print("prepare nature")
prepare_nature_dart(assim_time) # link WRF files to DART directory prepare_nature_dart(assim_time) # link WRF files to DART directory
if posterior: if prior_is_filter_output:
write_list_of_inputfiles_posterior(time) print('using filter_restart files from last assimilation as prior')
if prior: use_filter_output_as_prior(time)
write_list_of_inputfiles_prior() else:
print('using files linked to `run_DART/<exp>/prior_ens*/wrfout_d01` as prior')
use_linked_files_as_prior()
if obs_seq_out: if obs_seq_out:
copy(obs_seq_out, cluster.dart_rundir+'/obs_seq.out') copy(obs_seq_out, cluster.dart_rundir+'/obs_seq.out')
...@@ -368,7 +388,7 @@ def get_obsseq_out(time): ...@@ -368,7 +388,7 @@ def get_obsseq_out(time):
# copy to sim_archive # copy to sim_archive
os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True) os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True)
copy(f_obsseq, time.strftime(cluster.archivedir+'/obs_seq_out/%Y-%m-%d_%H:%M_obs_seq.out')) copy(f_obsseq, time.strftime(cluster.archivedir+'/obs_seq_out/'+pattern_obs_seq_out))
oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out") # read the obs_seq.out file oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out") # read the obs_seq.out file
else: else:
...@@ -387,7 +407,7 @@ def prepare_inflation_2(time, prior_init_time): ...@@ -387,7 +407,7 @@ def prepare_inflation_2(time, prior_init_time):
time (datetime): time of assimilation time (datetime): time of assimilation
prior_init_time (datetime): time of prior assimilation prior_init_time (datetime): time of prior assimilation
""" """
dir_priorinf = cluster.archivedir + prior_init_time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") dir_priorinf = cluster.archivedir + prior_init_time.strftime(pattern_init_time+"/assim_stage0/")
f_default = cluster.archive_base + "/input_priorinf_mean.nc" f_default = cluster.archive_base + "/input_priorinf_mean.nc"
f_prior = dir_priorinf + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc") f_prior = dir_priorinf + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc")
...@@ -412,7 +432,7 @@ def prepare_inflation_2(time, prior_init_time): ...@@ -412,7 +432,7 @@ def prepare_inflation_2(time, prior_init_time):
copy(f_default, f_new) copy(f_default, f_new)
def archive_inflation_2(time): def archive_inflation_2(time):
dir_output = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") dir_output = cluster.archivedir + time.strftime(pattern_init_time+"/assim_stage0/")
os.makedirs(dir_output, exist_ok=True) os.makedirs(dir_output, exist_ok=True)
f_output = cluster.dart_rundir + '/output_priorinf_sd.nc' f_output = cluster.dart_rundir + '/output_priorinf_sd.nc'
...@@ -496,7 +516,7 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp): ...@@ -496,7 +516,7 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp):
None None
""" """
nproc = cluster.max_nproc nproc = cluster.max_nproc
do_QC = getattr(exp, "reject_smallFGD", False) # True: triggers additional evaluations of prior & posterior do_QC = getattr(exp, "do_quality_control", False) # True: triggers additional evaluations of prior & posterior
# for which observation type do we have a parametrized observation error? # for which observation type do we have a parametrized observation error?
error_is_parametrized = [obscfg["error_assimilate"] == False for obscfg in exp.observations] error_is_parametrized = [obscfg["error_assimilate"] == False for obscfg in exp.observations]
...@@ -517,8 +537,7 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp): ...@@ -517,8 +537,7 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp):
# is any observation error parametrized? # is any observation error parametrized?
if any(error_is_parametrized) or do_QC: if any(error_is_parametrized) or do_QC:
print(" (optional) evaluate prior for all observations (incl rejected) ") print(" (optional) evaluate prior for all observations (incl rejected) ")
evaluate(time, prior=True, evaluate(time, output_format=pattern_obs_seq_final+"-evaluate_prior")
output_format="%Y-%m-%d_%H:%M_obs_seq.final-evaluate_prior")
print(" assign observation-errors for assimilation ") print(" assign observation-errors for assimilation ")
set_obserr_assimilate_in_obsseqout(oso, outfile=cluster.dart_rundir + "/obs_seq.out") set_obserr_assimilate_in_obsseqout(oso, outfile=cluster.dart_rundir + "/obs_seq.out")
...@@ -539,15 +558,15 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp): ...@@ -539,15 +558,15 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp):
archive_inflation_2(time) archive_inflation_2(time)
print(" evaluate posterior in observation-space") print(" evaluate posterior in observation-space")
f_oso = pattern_obs_seq_out
if do_QC: if do_QC:
f_oso = '%Y-%m-%d_%H:%M_obs_seq.out-beforeQC' # includes all observations (including rejected ones in qc_obs()) f_oso += '-beforeQC' # includes all observations (including rejected ones in qc_obs())
else:
f_oso = '%Y-%m-%d_%H:%M_obs_seq.out'
# evaluate() separately after ./filter is crucial when assimilating cloud variables # evaluate() separately after ./filter is crucial when assimilating cloud variables
# as the obs_seq.final returned by ./filter was produced using un-clamped cloud variables # as the obs_seq.final returned by ./filter was produced using un-clamped cloud variables
evaluate(time, obs_seq_out=cluster.archivedir+'/obs_seq_out/'+time.strftime(f_oso), evaluate(time, obs_seq_out=cluster.archivedir+'/obs_seq_out/'+time.strftime(f_oso),
prior=False, posterior=True, prior_is_filter_output=True,
output_format="%Y-%m-%d_%H:%M_obs_seq.final-evaluate_posterior") output_format=pattern_obs_seq_final+"-evaluate")
if __name__ == "__main__": if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment