Skip to content
Snippets Groups Projects
Commit e6a1e4ad authored by lkugler's avatar lkugler
Browse files

config change

parent 7bfd329f
No related branches found
No related tags found
No related merge requests found
File moved
...@@ -7,7 +7,7 @@ if __name__ == "__main__": ...@@ -7,7 +7,7 @@ if __name__ == "__main__":
""" """
Run a cycled OSSE with WRF and DART. Run a cycled OSSE with WRF and DART.
""" """
w = WorkFlows(exp_config='exp_example.py', server_config='jet.py') w = WorkFlows(exp_config='cfg.py', server_config='jet.py')
timedelta_integrate = dt.timedelta(minutes=15) timedelta_integrate = dt.timedelta(minutes=15)
timedelta_btw_assim = dt.timedelta(minutes=15) timedelta_btw_assim = dt.timedelta(minutes=15)
......
...@@ -24,19 +24,19 @@ def link_nature_to_dart_truth(time): ...@@ -24,19 +24,19 @@ def link_nature_to_dart_truth(time):
# get wrfout_d01 from nature run # get wrfout_d01 from nature run
shutil.copy(time.strftime(exp.nature+'/'+wrfout_format), shutil.copy(time.strftime(exp.nature+'/'+wrfout_format),
cluster.dartrundir + "/wrfout_d01") cluster.dart_rundir + "/wrfout_d01")
# DART may need a wrfinput file as well, which serves as a template for dimension sizes # DART may need a wrfinput file as well, which serves as a template for dimension sizes
symlink(cluster.dartrundir + "/wrfout_d01", symlink(cluster.dart_rundir + "/wrfout_d01",
cluster.dartrundir + "/wrfinput_d01") cluster.dart_rundir + "/wrfinput_d01")
print("linked", time.strftime(exp.nature+'/'+wrfout_format), print("linked", time.strftime(exp.nature+'/'+wrfout_format),
"to", cluster.dartrundir + "/wrfout_d01") "to", cluster.dart_rundir + "/wrfout_d01")
def prepare_nature_dart(time): def prepare_nature_dart(time):
print("linking nature to DART & georeferencing") print("linking nature to DART & georeferencing")
link_nature_to_dart_truth(time) link_nature_to_dart_truth(time)
wrfout_add_geo.run(cluster.geo_em, cluster.dartrundir + "/wrfout_d01") wrfout_add_geo.run(cluster.geo_em, cluster.dart_rundir + "/wrfout_d01")
def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_path_exp): def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_path_exp):
...@@ -57,7 +57,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ ...@@ -57,7 +57,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
+ str(iens) + str(iens)
+ prior_valid_time.strftime("/"+wrfout_format) + prior_valid_time.strftime("/"+wrfout_format)
) )
dart_ensdir = cluster.dartrundir + "/prior_ens" + str(iens) dart_ensdir = cluster.dart_rundir + "/prior_ens" + str(iens)
wrfout_dart = dart_ensdir + "/wrfout_d01" wrfout_dart = dart_ensdir + "/wrfout_d01"
os.makedirs(dart_ensdir, exist_ok=True) os.makedirs(dart_ensdir, exist_ok=True)
...@@ -69,7 +69,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ ...@@ -69,7 +69,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
if assim_time != prior_valid_time: if assim_time != prior_valid_time:
print("overwriting time in prior from nature wrfout") print("overwriting time in prior from nature wrfout")
shell(cluster.ncks+ " -A -v XTIME,Times "+ shell(cluster.ncks+ " -A -v XTIME,Times "+
cluster.dartrundir+"/wrfout_d01 "+ wrfout_dart) cluster.dart_rundir+"/wrfout_d01 "+ wrfout_dart)
# this seems to be necessary (else wrong level selection) # this seems to be necessary (else wrong level selection)
wrfout_add_geo.run(cluster.geo_em, wrfout_dart) wrfout_add_geo.run(cluster.geo_em, wrfout_dart)
...@@ -78,18 +78,18 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ ...@@ -78,18 +78,18 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
write_list_of_outputfiles() write_list_of_outputfiles()
print("removing preassim and filter_restart") print("removing preassim and filter_restart")
os.system("rm -rf " + cluster.dartrundir + "/preassim_*") os.system("rm -rf " + cluster.dart_rundir + "/preassim_*")
os.system("rm -rf " + cluster.dartrundir + "/filter_restart*") os.system("rm -rf " + cluster.dart_rundir + "/filter_restart*")
os.system("rm -rf " + cluster.dartrundir + "/output_mean*") os.system("rm -rf " + cluster.dart_rundir + "/output_mean*")
os.system("rm -rf " + cluster.dartrundir + "/output_sd*") os.system("rm -rf " + cluster.dart_rundir + "/output_sd*")
os.system("rm -rf " + cluster.dartrundir + "/perfect_output_*") os.system("rm -rf " + cluster.dart_rundir + "/perfect_output_*")
os.system("rm -rf " + cluster.dartrundir + "/obs_seq.fina*") os.system("rm -rf " + cluster.dart_rundir + "/obs_seq.fina*")
def write_list_of_inputfiles_prior(): def write_list_of_inputfiles_prior():
files = [] files = []
for iens in range(1, exp.n_ens+1): for iens in range(1, exp.n_ens+1):
files.append("./prior_ens" + str(iens) + "/wrfout_d01") files.append("./prior_ens" + str(iens) + "/wrfout_d01")
write_txt(files, cluster.dartrundir+'/input_list.txt') write_txt(files, cluster.dart_rundir+'/input_list.txt')
def write_list_of_inputfiles_posterior(assim_time): def write_list_of_inputfiles_posterior(assim_time):
filedir = cluster.archivedir+assim_time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") filedir = cluster.archivedir+assim_time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/")
...@@ -97,43 +97,43 @@ def write_list_of_inputfiles_posterior(assim_time): ...@@ -97,43 +97,43 @@ def write_list_of_inputfiles_posterior(assim_time):
files = [] files = []
for iens in range(1, exp.n_ens+1): for iens in range(1, exp.n_ens+1):
files.append(filedir+'filter_restart_d01.'+str(iens).zfill(4)) files.append(filedir+'filter_restart_d01.'+str(iens).zfill(4))
write_txt(files, cluster.dartrundir+'/input_list.txt') write_txt(files, cluster.dart_rundir+'/input_list.txt')
def write_list_of_outputfiles(): def write_list_of_outputfiles():
files = [] files = []
for iens in range(1, exp.n_ens+1): for iens in range(1, exp.n_ens+1):
files.append("./filter_restart_d01." + str(iens).zfill(4)) files.append("./filter_restart_d01." + str(iens).zfill(4))
write_txt(files, cluster.dartrundir+'/output_list.txt') write_txt(files, cluster.dart_rundir+'/output_list.txt')
def run_perfect_model_obs(nproc=12, verbose=True): def run_perfect_model_obs(nproc=12, verbose=True):
if verbose: if verbose:
print("generating observations - running ./perfect_model_obs") print("generating observations - running ./perfect_model_obs")
os.chdir(cluster.dartrundir) os.chdir(cluster.dart_rundir)
try_remove(cluster.dartrundir + "/obs_seq.out") try_remove(cluster.dart_rundir + "/obs_seq.out")
if not os.path.exists(cluster.dartrundir + "/obs_seq.in"): if not os.path.exists(cluster.dart_rundir + "/obs_seq.in"):
raise RuntimeError("obs_seq.in does not exist in " + cluster.dartrundir) raise RuntimeError("obs_seq.in does not exist in " + cluster.dart_rundir)
shell(cluster.dart_modules+' mpirun -np '+str(nproc)+" ./perfect_model_obs > log.perfect_model_obs") shell(cluster.dart_modules+' mpirun -np '+str(nproc)+" ./perfect_model_obs > log.perfect_model_obs")
if not os.path.exists(cluster.dartrundir + "/obs_seq.out"): if not os.path.exists(cluster.dart_rundir + "/obs_seq.out"):
raise RuntimeError( raise RuntimeError(
"obs_seq.out does not exist in " + cluster.dartrundir, "obs_seq.out does not exist in " + cluster.dart_rundir,
"\n look for " + cluster.dartrundir + "/log.perfect_model_obs") "\n look for " + cluster.dart_rundir + "/log.perfect_model_obs")
def filter(nproc=12): def filter(nproc=12):
print("time now", dt.datetime.now()) print("time now", dt.datetime.now())
print("running filter") print("running filter")
os.chdir(cluster.dartrundir) os.chdir(cluster.dart_rundir)
try_remove(cluster.dartrundir + "/obs_seq.final") try_remove(cluster.dart_rundir + "/obs_seq.final")
t = time_module.time() t = time_module.time()
if nproc < 12: if nproc < 12:
shell(cluster.dart_modules+' mpirun -np 12 ./filter &> log.filter') shell(cluster.dart_modules+' mpirun -np 12 ./filter &> log.filter')
else: # -genv I_MPI_PIN_PROCESSOR_LIST=0-"+str(int(nproc) - 1) else: # -genv I_MPI_PIN_PROCESSOR_LIST=0-"+str(int(nproc) - 1)
shell(cluster.dart_modules+" mpirun -np "+str(int(nproc))+" ./filter > log.filter") shell(cluster.dart_modules+" mpirun -np "+str(int(nproc))+" ./filter > log.filter")
print("./filter took", int(time_module.time() - t), "seconds") print("./filter took", int(time_module.time() - t), "seconds")
if not os.path.isfile(cluster.dartrundir + "/obs_seq.final"): if not os.path.isfile(cluster.dart_rundir + "/obs_seq.final"):
raise RuntimeError( raise RuntimeError(
"obs_seq.final does not exist in " + cluster.dartrundir, "obs_seq.final does not exist in " + cluster.dart_rundir,
"\n look for " + cluster.dartrundir + "/log.filter") "\n look for " + cluster.dart_rundir + "/log.filter")
############### archiving ############### archiving
...@@ -144,28 +144,28 @@ def archive_filteroutput(time): ...@@ -144,28 +144,28 @@ def archive_filteroutput(time):
archive_dir = cluster.archivedir + "/obs_seq_final/" archive_dir = cluster.archivedir + "/obs_seq_final/"
mkdir(archive_dir) mkdir(archive_dir)
fout = archive_dir + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.final") fout = archive_dir + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.final")
copy(cluster.dartrundir + "/obs_seq.final", fout) copy(cluster.dart_rundir + "/obs_seq.final", fout)
print(fout, "saved.") print(fout, "saved.")
archive_assim = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") archive_assim = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/")
mkdir(archive_assim) mkdir(archive_assim)
copy(cluster.dartrundir + "/input.nml", archive_assim + "/input.nml") copy(cluster.dart_rundir + "/input.nml", archive_assim + "/input.nml")
for iens in range(1, exp.n_ens + 1): # single members for iens in range(1, exp.n_ens + 1): # single members
copy( copy(
cluster.dartrundir + "/filter_restart_d01." + str(iens).zfill(4), cluster.dart_rundir + "/filter_restart_d01." + str(iens).zfill(4),
archive_assim + "/filter_restart_d01." + str(iens).zfill(4), archive_assim + "/filter_restart_d01." + str(iens).zfill(4),
) )
try: # not necessary for next forecast run try: # not necessary for next forecast run
for iens in range(1, exp.n_ens + 1): for iens in range(1, exp.n_ens + 1):
copy( copy(
cluster.dartrundir + "/postassim_member_" + str(iens).zfill(4) + ".nc", cluster.dart_rundir + "/postassim_member_" + str(iens).zfill(4) + ".nc",
archive_assim + "/postassim_member_" + str(iens).zfill(4) + ".nc", archive_assim + "/postassim_member_" + str(iens).zfill(4) + ".nc",
) )
for f in ["output_mean.nc", "output_sd.nc"]: # copy mean and sd to archive for f in ["output_mean.nc", "output_sd.nc"]: # copy mean and sd to archive
copy(cluster.dartrundir + "/" + f, archive_assim + "/" + f) copy(cluster.dart_rundir + "/" + f, archive_assim + "/" + f)
except Exception as e: except Exception as e:
warnings.warn(str(e)) warnings.warn(str(e))
...@@ -273,10 +273,10 @@ def qc_obs(time, oso, osf_prior): ...@@ -273,10 +273,10 @@ def qc_obs(time, oso, osf_prior):
# for archiving # for archiving
f_out_archive = cluster.archivedir + "/obs_seq_out/" + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC") f_out_archive = cluster.archivedir + "/obs_seq_out/" + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC")
os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True) os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True)
copy(cluster.dartrundir + "/obs_seq.out", f_out_archive) copy(cluster.dart_rundir + "/obs_seq.out", f_out_archive)
# for assimilation later # for assimilation later
f_out_dart = cluster.dartrundir+'/obs_seq.out' f_out_dart = cluster.dart_rundir+'/obs_seq.out'
oso.to_dart(f_out_dart) oso.to_dart(f_out_dart)
print('saved', f_out_dart) print('saved', f_out_dart)
...@@ -286,8 +286,8 @@ def evaluate(assim_time, ...@@ -286,8 +286,8 @@ def evaluate(assim_time,
"""Depending on input_list.txt, this function calculates either prior or posterior obs space values. """Depending on input_list.txt, this function calculates either prior or posterior obs space values.
""" """
os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in os.makedirs(cluster.dart_rundir, exist_ok=True) # create directory to run DART in
os.chdir(cluster.dartrundir) os.chdir(cluster.dart_rundir)
# link DART binaries to run_DART # link DART binaries to run_DART
os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py") os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py")
...@@ -298,8 +298,8 @@ def evaluate(assim_time, ...@@ -298,8 +298,8 @@ def evaluate(assim_time,
print("prepare nature") print("prepare nature")
prepare_nature_dart(assim_time) # link WRF files to DART directory prepare_nature_dart(assim_time) # link WRF files to DART directory
if not os.path.isfile(cluster.dartrundir+'/obs_seq.out'): if not os.path.isfile(cluster.dart_rundir+'/obs_seq.out'):
raise RuntimeError(cluster.dartrundir+'/obs_seq.out does not exist') raise RuntimeError(cluster.dart_rundir+'/obs_seq.out does not exist')
dart_nml.write_namelist(just_prior_values=True) dart_nml.write_namelist(just_prior_values=True)
filter(nproc=6) filter(nproc=6)
...@@ -307,10 +307,10 @@ def evaluate(assim_time, ...@@ -307,10 +307,10 @@ def evaluate(assim_time,
# archiving # archiving
fout = cluster.archivedir + "/obs_seq_final/" + assim_time.strftime(output_format) fout = cluster.archivedir + "/obs_seq_final/" + assim_time.strftime(output_format)
os.makedirs(cluster.archivedir + "/obs_seq_final/", exist_ok=True) os.makedirs(cluster.archivedir + "/obs_seq_final/", exist_ok=True)
copy(cluster.dartrundir + "/obs_seq.final", fout) copy(cluster.dart_rundir + "/obs_seq.final", fout)
print(fout, "saved.") print(fout, "saved.")
osf = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.final") osf = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.final")
return osf return osf
...@@ -324,21 +324,21 @@ def generate_obsseq_out(time): ...@@ -324,21 +324,21 @@ def generate_obsseq_out(time):
if_vis_obs = oso.df['kind'].values == 262 if_vis_obs = oso.df['kind'].values == 262
if_obs_below_surface_albedo = oso.df['observations'].values < clearsky_albedo if_obs_below_surface_albedo = oso.df['observations'].values < clearsky_albedo
oso.df.loc[if_vis_obs & if_obs_below_surface_albedo, ('observations')] = clearsky_albedo oso.df.loc[if_vis_obs & if_obs_below_surface_albedo, ('observations')] = clearsky_albedo
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") oso.to_dart(f=cluster.dart_rundir + "/obs_seq.out")
return oso return oso
def apply_superobbing(oso): def apply_superobbing(oso):
try: try:
f_oso = dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-before_superob") f_oso = dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-before_superob")
shutil.copy(cluster.dartrundir + "/obs_seq.out-before_superob", f_oso) shutil.copy(cluster.dart_rundir + "/obs_seq.out-before_superob", f_oso)
print('saved', f_oso) print('saved', f_oso)
except Exception as e: except Exception as e:
warnings.warn(str(e)) warnings.warn(str(e))
print(" 2.3) superobbing to", exp.superob_km, "km") print(" 2.3) superobbing to", exp.superob_km, "km")
oso.df = oso.df.superob(window_km=exp.superob_km) oso.df = oso.df.superob(window_km=exp.superob_km)
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") oso.to_dart(f=cluster.dart_rundir + "/obs_seq.out")
############################## ##############################
...@@ -350,7 +350,7 @@ def generate_obsseq_out(time): ...@@ -350,7 +350,7 @@ def generate_obsseq_out(time):
run_perfect_model_obs() # generate observation, draws from gaussian run_perfect_model_obs() # generate observation, draws from gaussian
print(" 2.1) obs preprocessing") print(" 2.1) obs preprocessing")
oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out")
oso = ensure_physical_vis(oso) oso = ensure_physical_vis(oso)
...@@ -359,7 +359,7 @@ def generate_obsseq_out(time): ...@@ -359,7 +359,7 @@ def generate_obsseq_out(time):
# archive complete obsseqout # archive complete obsseqout
f_oso = dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out") f_oso = dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out")
shutil.copy(cluster.dartrundir + "/obs_seq.out", f_oso) shutil.copy(cluster.dart_rundir + "/obs_seq.out", f_oso)
print('saved', f_oso) print('saved', f_oso)
return oso return oso
...@@ -369,9 +369,9 @@ def get_obsseq_out(time): ...@@ -369,9 +369,9 @@ def get_obsseq_out(time):
# did we specify an obsseqout inputfile? # did we specify an obsseqout inputfile?
if exp.use_existing_obsseq != False: if exp.use_existing_obsseq != False:
f_obsseq = time.strftime(exp.use_existing_obsseq) f_obsseq = time.strftime(exp.use_existing_obsseq)
copy(f_obsseq, cluster.dartrundir+'/obs_seq.out') copy(f_obsseq, cluster.dart_rundir+'/obs_seq.out')
print(f_obsseq, 'copied to', cluster.dartrundir+'/obs_seq.out') print(f_obsseq, 'copied to', cluster.dart_rundir+'/obs_seq.out')
oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out")
else: else:
# decision to NOT use existing obs_seq.out file # decision to NOT use existing obs_seq.out file
...@@ -379,9 +379,9 @@ def get_obsseq_out(time): ...@@ -379,9 +379,9 @@ def get_obsseq_out(time):
# f_oso_thisexp = cluster.archivedir+'/obs_seq_out/'+time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out") # f_oso_thisexp = cluster.archivedir+'/obs_seq_out/'+time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out")
# if os.path.isfile(f_oso_thisexp): # if os.path.isfile(f_oso_thisexp):
# # oso exists # # oso exists
# copy(f_oso_thisexp, cluster.dartrundir+'/obs_seq.out') # copy(f_oso_thisexp, cluster.dart_rundir+'/obs_seq.out')
# print('copied existing obsseqout from', f_oso_thisexp) # print('copied existing obsseqout from', f_oso_thisexp)
# oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") # oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out")
# else: # else:
# generate observations with new observation noise # generate observations with new observation noise
...@@ -403,7 +403,7 @@ def prepare_inflation_2(time, prior_init_time): ...@@ -403,7 +403,7 @@ def prepare_inflation_2(time, prior_init_time):
f_default = cluster.archive_base + "/input_priorinf_mean.nc" f_default = cluster.archive_base + "/input_priorinf_mean.nc"
f_prior = dir_priorinf + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc") f_prior = dir_priorinf + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc")
f_new = cluster.dartrundir + '/input_priorinf_mean.nc' f_new = cluster.dart_rundir + '/input_priorinf_mean.nc'
if os.path.isfile(f_prior): if os.path.isfile(f_prior):
copy(f_prior, f_new) copy(f_prior, f_new)
...@@ -414,7 +414,7 @@ def prepare_inflation_2(time, prior_init_time): ...@@ -414,7 +414,7 @@ def prepare_inflation_2(time, prior_init_time):
f_default = cluster.archive_base + "/input_priorinf_sd.nc" f_default = cluster.archive_base + "/input_priorinf_sd.nc"
f_prior = dir_priorinf + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_sd.nc") f_prior = dir_priorinf + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_sd.nc")
f_new = cluster.dartrundir + '/input_priorinf_sd.nc' f_new = cluster.dart_rundir + '/input_priorinf_sd.nc'
if os.path.isfile(f_prior): if os.path.isfile(f_prior):
copy(f_prior, f_new) copy(f_prior, f_new)
...@@ -427,12 +427,12 @@ def archive_inflation_2(time): ...@@ -427,12 +427,12 @@ def archive_inflation_2(time):
dir_output = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") dir_output = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/")
os.makedirs(dir_output, exist_ok=True) os.makedirs(dir_output, exist_ok=True)
f_output = cluster.dartrundir + '/output_priorinf_sd.nc' f_output = cluster.dart_rundir + '/output_priorinf_sd.nc'
f_archive = dir_output + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_sd.nc") f_archive = dir_output + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_sd.nc")
copy(f_output, f_archive) copy(f_output, f_archive)
print(f_archive, 'saved') print(f_archive, 'saved')
f_output = cluster.dartrundir + '/output_priorinf_mean.nc' f_output = cluster.dart_rundir + '/output_priorinf_mean.nc'
f_archive = dir_output + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc") f_archive = dir_output + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc")
copy(f_output, f_archive) copy(f_output, f_archive)
print(f_archive, 'saved') print(f_archive, 'saved')
...@@ -461,8 +461,8 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp): ...@@ -461,8 +461,8 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp):
nproc = cluster.max_nproc nproc = cluster.max_nproc
archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/") archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/")
os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in os.makedirs(cluster.dart_rundir, exist_ok=True) # create directory to run DART in
os.chdir(cluster.dartrundir) os.chdir(cluster.dart_rundir)
# link DART binaries to run_DART # link DART binaries to run_DART
os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py") os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py")
...@@ -484,7 +484,7 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp): ...@@ -484,7 +484,7 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp):
osf_prior = evaluate(time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_prior_allobs") osf_prior = evaluate(time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_prior_allobs")
print(" 2.2) assign observation-errors for assimilation ") print(" 2.2) assign observation-errors for assimilation ")
set_obserr_assimilate_in_obsseqout(oso, osf_prior, outfile=cluster.dartrundir + "/obs_seq.out") set_obserr_assimilate_in_obsseqout(oso, osf_prior, outfile=cluster.dart_rundir + "/obs_seq.out")
if getattr(exp, "reject_smallFGD", False): if getattr(exp, "reject_smallFGD", False):
print(" 2.3) reject observations? ") print(" 2.3) reject observations? ")
...@@ -505,7 +505,7 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp): ...@@ -505,7 +505,7 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp):
write_list_of_inputfiles_posterior(time) write_list_of_inputfiles_posterior(time)
if getattr(exp, "reject_smallFGD", False): if getattr(exp, "reject_smallFGD", False):
copy(cluster.archivedir+'/obs_seq_out/'+time.strftime('%Y-%m-%d_%H:%M_obs_seq.out-beforeQC'), copy(cluster.archivedir+'/obs_seq_out/'+time.strftime('%Y-%m-%d_%H:%M_obs_seq.out-beforeQC'),
cluster.dartrundir+'/obs_seq.out') cluster.dart_rundir+'/obs_seq.out')
evaluate(time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_posterior_allobs") evaluate(time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_posterior_allobs")
......
...@@ -31,7 +31,7 @@ if __name__ == "__main__": ...@@ -31,7 +31,7 @@ if __name__ == "__main__":
print('will save obsseq to', dir_for_obsseqout) print('will save obsseq to', dir_for_obsseqout)
os.makedirs(dir_for_obsseqout, exist_ok=True) os.makedirs(dir_for_obsseqout, exist_ok=True)
os.chdir(cluster.dartrundir) os.chdir(cluster.dart_rundir)
# link DART binaries to run_DART # link DART binaries to run_DART
os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py") os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py")
...@@ -49,7 +49,7 @@ if __name__ == "__main__": ...@@ -49,7 +49,7 @@ if __name__ == "__main__":
aso.run_perfect_model_obs(nproc=6) # create observations (obs_seq.out) aso.run_perfect_model_obs(nproc=6) # create observations (obs_seq.out)
oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out")
if True: # set reflectance < surface albedo to surface albedo if True: # set reflectance < surface albedo to surface albedo
print(" 2.2) removing obs below surface albedo ") print(" 2.2) removing obs below surface albedo ")
...@@ -57,13 +57,13 @@ if __name__ == "__main__": ...@@ -57,13 +57,13 @@ if __name__ == "__main__":
if_obs_below_surface_albedo = oso.df['observations'].values < 0.2928 if_obs_below_surface_albedo = oso.df['observations'].values < 0.2928
oso.df.loc[if_vis_obs & if_obs_below_surface_albedo, ('observations')] = 0.2928 oso.df.loc[if_vis_obs & if_obs_below_surface_albedo, ('observations')] = 0.2928
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") oso.to_dart(f=cluster.dart_rundir + "/obs_seq.out")
if getattr(exp, "superob_km", False): if getattr(exp, "superob_km", False):
print(" 2.3) superobbing to", exp.superob_km, "km") print(" 2.3) superobbing to", exp.superob_km, "km")
oso.df = oso.df.superob(window_km=exp.superob_km) oso.df = oso.df.superob(window_km=exp.superob_km)
copy(cluster.dartrundir + "/obs_seq.out", cluster.dartrundir + "/obs_seq.out-orig") copy(cluster.dart_rundir + "/obs_seq.out", cluster.dart_rundir + "/obs_seq.out-orig")
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") oso.to_dart(f=cluster.dart_rundir + "/obs_seq.out")
aso.archive_osq_out(time, dir_obsseq=dir_for_obsseqout) aso.archive_osq_out(time, dir_obsseq=dir_for_obsseqout)
...@@ -173,7 +173,7 @@ kind ...@@ -173,7 +173,7 @@ kind
def create_obs_seq_in(time_dt, list_obscfg, def create_obs_seq_in(time_dt, list_obscfg,
output_path=cluster.dartrundir+'/obs_seq.in'): output_path=cluster.dart_rundir+'/obs_seq.in'):
"""Create obs_seq.in with multiple obs types in one file """Create obs_seq.in with multiple obs types in one file
Args: Args:
...@@ -314,6 +314,6 @@ if __name__ == '__main__': ...@@ -314,6 +314,6 @@ if __name__ == '__main__':
if False: if False:
error_assimilate = 5.*np.ones(n_obs*len(radar['heights'])) error_assimilate = 5.*np.ones(n_obs*len(radar['heights']))
import assim_synth_obs as aso import assim_synth_obs as aso
aso.replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) aso.replace_errors_obsseqout(cluster.dart_rundir+'/obs_seq.out', error_assimilate)
...@@ -32,20 +32,22 @@ def read_namelist(filepath): ...@@ -32,20 +32,22 @@ def read_namelist(filepath):
# namelist section # namelist section
if line.startswith('&'): if line.startswith('&'):
section = line.lower() section = line
d[section] = dict() d[section] = dict()
continue
# namelist variable if '/' in line:
else: continue # skip end of namelist section
try:
# split line into variable name and value # split line into variable name and value
var, val = line.split('=') var, val = line.split('=')
var = var.strip().lower() val = val.strip().strip(',').strip()
val = val.strip()
# split value into list if possible except ValueError:
if ',' in val: # If the above split failed, we are still, we are still in the previous variable
val = val.split(',') nextline_values = line.strip().split(',').strip()
val = [v.strip() for v in val] val = val + nextline_values
# add variable to dictionary # add variable to dictionary
d[section][var] = val d[section][var] = val
...@@ -209,8 +211,8 @@ def write_namelist(just_prior_values=False): ...@@ -209,8 +211,8 @@ def write_namelist(just_prior_values=False):
raise ValueError("Selected vertical localization, but observations contain satellite obs -> Not possible.") raise ValueError("Selected vertical localization, but observations contain satellite obs -> Not possible.")
# write to file # write to file
write_namelist_from_dict(nml, cluster.dartrundir + "/input.nml") write_namelist_from_dict(nml, cluster.dart_rundir + "/input.nml")
# append section for RTTOV # append section for RTTOV
rttov_nml = cluster.scriptsdir + "/../templates/obs_def_rttov.VIS.nml" rttov_nml = cluster.scriptsdir + "/../templates/obs_def_rttov.VIS.nml"
append_file(cluster.dartrundir + "/input.nml", rttov_nml) append_file(cluster.dart_rundir + "/input.nml", rttov_nml)
\ No newline at end of file \ No newline at end of file
...@@ -22,6 +22,6 @@ if __name__ == "__main__": ...@@ -22,6 +22,6 @@ if __name__ == "__main__":
oso_input = cluster.archivedir+'/obs_seq_out' + assim_time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC") oso_input = cluster.archivedir+'/obs_seq_out' + assim_time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC")
if not os.path.isfile(oso_input): if not os.path.isfile(oso_input):
oso_input = cluster.archivedir+'/obs_seq_out' + assim_time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out") oso_input = cluster.archivedir+'/obs_seq_out' + assim_time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out")
shutil.copy(oso_input, cluster.dartrundir+'/obs_seq.out') shutil.copy(oso_input, cluster.dart_rundir+'/obs_seq.out')
aso.evaluate(assim_time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_posterior_allobs") aso.evaluate(assim_time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_posterior_allobs")
\ No newline at end of file
...@@ -27,7 +27,7 @@ if __name__ == "__main__": ...@@ -27,7 +27,7 @@ if __name__ == "__main__":
oso_input = cluster.archivedir+'/obs_seq_out' + assim_time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC") oso_input = cluster.archivedir+'/obs_seq_out' + assim_time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC")
if not os.path.isfile(oso_input): if not os.path.isfile(oso_input):
oso_input = cluster.archivedir+'/obs_seq_out' + assim_time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out") oso_input = cluster.archivedir+'/obs_seq_out' + assim_time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out")
shutil.copy(oso_input, cluster.dartrundir+'/obs_seq.out') shutil.copy(oso_input, cluster.dart_rundir+'/obs_seq.out')
aso.evaluate(assim_time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_prior_allobs") aso.evaluate(assim_time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_prior_allobs")
\ No newline at end of file
...@@ -11,8 +11,8 @@ if __name__ == "__main__": ...@@ -11,8 +11,8 @@ if __name__ == "__main__":
bins = ['perfect_model_obs', 'filter', 'obs_diag', 'obs_seq_to_netcdf'] bins = ['perfect_model_obs', 'filter', 'obs_diag', 'obs_seq_to_netcdf']
for b in bins: for b in bins:
symlink(joinp(cluster.dart_srcdir, b), symlink(joinp(cluster.dart_srcdir, b),
joinp(cluster.dartrundir, b)) joinp(cluster.dart_rundir, b))
print(joinp(cluster.dartrundir, b), 'created') print(joinp(cluster.dart_rundir, b), 'created')
rttov_files = ['rttov13pred54L/rtcoef_msg_4_seviri_o3.dat', rttov_files = ['rttov13pred54L/rtcoef_msg_4_seviri_o3.dat',
#'mfasis_lut/rttov_mfasis_cld_msg_4_seviri_deff.dat', #'mfasis_lut/rttov_mfasis_cld_msg_4_seviri_deff.dat',
...@@ -25,17 +25,17 @@ if __name__ == "__main__": ...@@ -25,17 +25,17 @@ if __name__ == "__main__":
destname = 'rtcoef_msg_4_seviri.dat' destname = 'rtcoef_msg_4_seviri.dat'
symlink(cluster.rttov_srcdir + f_src, symlink(cluster.rttov_srcdir + f_src,
cluster.dartrundir+'/'+destname) cluster.dart_rundir+'/'+destname)
################## ##################
symlink(cluster.dartrundir+'/rttov_mfasis_cld_msg_4_seviri_deff.H5', symlink(cluster.dart_rundir+'/rttov_mfasis_cld_msg_4_seviri_deff.H5',
cluster.dartrundir+'/rttov_mfasis_cld_msg_4_seviri.H5') cluster.dart_rundir+'/rttov_mfasis_cld_msg_4_seviri.H5')
symlink(cluster.dart_srcdir+'/../../../observations/forward_operators/rttov_sensor_db.csv', symlink(cluster.dart_srcdir+'/../../../observations/forward_operators/rttov_sensor_db.csv',
cluster.dartrundir+'/rttov_sensor_db.csv') cluster.dart_rundir+'/rttov_sensor_db.csv')
symlink(cluster.dart_srcdir+'/../../../assimilation_code/programs/gen_sampling_err_table/' symlink(cluster.dart_srcdir+'/../../../assimilation_code/programs/gen_sampling_err_table/'
+'work/sampling_error_correction_table.nc', +'work/sampling_error_correction_table.nc',
cluster.dartrundir+'/sampling_error_correction_table.nc') cluster.dart_rundir+'/sampling_error_correction_table.nc')
print('prepared DART & RTTOV links in', cluster.dartrundir) print('prepared DART & RTTOV links in', cluster.dart_rundir)
...@@ -717,4 +717,4 @@ if __name__ == "__main__": ...@@ -717,4 +717,4 @@ if __name__ == "__main__":
obs.plot(f_out="./map_obs_superobs.png") obs.plot(f_out="./map_obs_superobs.png")
# write to obs_seq.out in DART format # write to obs_seq.out in DART format
# obs.to_dart(f=cluster.dartrundir + "/obs_seq.out") # obs.to_dart(f=cluster.dart_rundir + "/obs_seq.out")
...@@ -13,7 +13,7 @@ class Experiment(object): ...@@ -13,7 +13,7 @@ class Experiment(object):
class ClusterConfig(object): class ClusterConfig(object):
"""Collection of variables regarding the cluster configuration""" """Collection of variables regarding the cluster configuration"""
def __init__(self, exp): def __init__(self, exp):
self.exp = exp self.exp = exp # makes derived properties available
self.dart_modules = '' # default value self.dart_modules = '' # default value
@property @property
...@@ -35,7 +35,7 @@ class ClusterConfig(object): ...@@ -35,7 +35,7 @@ class ClusterConfig(object):
return self.archivedir+'/DART-WRF/' return self.archivedir+'/DART-WRF/'
@property @property
def dartrundir(self): def dart_rundir(self):
"""Path to the directory where DART programs will run """Path to the directory where DART programs will run
Includes the experiment name Includes the experiment name
""" """
......
...@@ -10,7 +10,6 @@ import datetime as dt ...@@ -10,7 +10,6 @@ import datetime as dt
import importlib import importlib
from dartwrf.utils import script_to_str from dartwrf.utils import script_to_str
from config.cfg import exp
class WorkFlows(object): class WorkFlows(object):
def __init__(self, exp_config='cfg.py', server_config='server.py'): def __init__(self, exp_config='cfg.py', server_config='server.py'):
...@@ -23,14 +22,40 @@ class WorkFlows(object): ...@@ -23,14 +22,40 @@ class WorkFlows(object):
Attributes: Attributes:
cluster (obj): cluster configuration as defined in server_config file cluster (obj): cluster configuration as defined in server_config file
Note: we load the config from load the config from cluster.scripts_rundir/config/cfg.py
in WorkFlows, we load the config from the git cloned folder
in all other dartwrf scripts, load the config from cluster.scripts_rundir
""" """
print('------ start exp from ', exp_config, ' and ', server_config, ' ------') print('------ start exp from ', exp_config, ' and ', server_config, ' ------')
# exp = __import__('config/'+exp_config)
# load python config file def copy_dartwrf_to_archive():
# Copy scripts to self.cluster.archivedir folder
os.makedirs(self.cluster.archivedir, exist_ok=True)
try:
shutil.copytree(self.cluster.scriptsdir, self.cluster.scripts_rundir)
print('scripts have been copied to', self.cluster.archivedir)
except FileExistsError as e:
warnings.warn(str(e))
except:
raise
def copy_config_to_archive():
# later, we can load the exp cfg with `from config.cfg import exp`
shutil.copyfile('config/'+exp_config, self.cluster.scripts_rundir+'/config/cfg.py')
# later, we can load the cluster cfg with `from config.cluster import cluster`
shutil.copyfile('config/'+server_config, self.cluster.scripts_rundir+'/config/cluster.py') # whatever server, the config name is always the same!
# copy config file to current config folder
try:
shutil.copyfile('config/'+exp_config, '/'.join(__file__.split('/')[:-2])+'/config/cfg.py')
except shutil.SameFileError:
pass
# load python config files
self.cluster = importlib.import_module('config.'+server_config.strip('.py')).cluster self.cluster = importlib.import_module('config.'+server_config.strip('.py')).cluster
self.exp = importlib.import_module('config.'+exp_config.strip('.py')).exp
copy_dartwrf_to_archive()
copy_config_to_archive()
# Set paths and backup scripts # Set paths and backup scripts
self.cluster.log_dir = self.cluster.archivedir+'/logs/' self.cluster.log_dir = self.cluster.archivedir+'/logs/'
...@@ -88,21 +113,7 @@ class WorkFlows(object): ...@@ -88,21 +113,7 @@ class WorkFlows(object):
_dict_to_py(_obskind_read(), self.cluster.scriptsdir+'/../config/obskind.py') _dict_to_py(_obskind_read(), self.cluster.scriptsdir+'/../config/obskind.py')
# Copy scripts to self.cluster.archivedir folder
os.makedirs(self.cluster.archivedir, exist_ok=True)
try:
shutil.copytree(self.cluster.scriptsdir, self.cluster.scripts_rundir)
print('scripts have been copied to', self.cluster.archivedir)
except FileExistsError as e:
warnings.warn(str(e))
except:
raise
# later, we can load the exp cfg with `from config.cfg import exp`
shutil.copy('config/'+exp_config, self.cluster.scripts_rundir+'/config/cfg.py')
# later, we can load the cluster cfg with `from config.cluster import cluster`
shutil.copy('config/'+server_config, self.cluster.scripts_rundir+'/config/cluster.py') # whatever server, the config name is always the same!
# probably not needed # probably not needed
# shutil.copy('config/'+server_config, 'config/cluster.py') # whatever server, the config name is always the same! # shutil.copy('config/'+server_config, 'config/cluster.py') # whatever server, the config name is always the same!
...@@ -137,21 +148,21 @@ class WorkFlows(object): ...@@ -137,21 +148,21 @@ class WorkFlows(object):
""" """
cmd = """# run ideal.exe in parallel cmd = """# run ideal.exe in parallel
export SLURM_STEP_GRES=none export SLURM_STEP_GRES=none
for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) for ((n=1; n<="""+str(self.exp.n_ens)+"""; n++))
do do
rundir="""+self.cluster.wrf_rundir_base+'/'+exp.expname+"""/$n rundir="""+self.cluster.wrf_rundir_base+'/'+self.exp.expname+"""/$n
echo $rundir echo $rundir
cd $rundir cd $rundir
mpirun -np 1 ./ideal.exe & mpirun -np 1 ./ideal.exe &
done done
wait wait
for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) for ((n=1; n<="""+str(self.exp.n_ens)+"""; n++))
do do
rundir="""+self.cluster.wrf_rundir_base+'/'+exp.expname+"""/$n rundir="""+self.cluster.wrf_rundir_base+'/'+self.exp.expname+"""/$n
mv $rundir/rsl.out.0000 $rundir/rsl.out.input mv $rundir/rsl.out.0000 $rundir/rsl.out.input
done done
""" """
id = self.cluster.run_job(cmd, "ideal-"+exp.expname, cfg_update={"ntasks": str(exp.n_ens), id = self.cluster.run_job(cmd, "ideal-"+self.exp.expname, cfg_update={"ntasks": str(self.exp.n_ens),
"time": "10", "mem": "100G"}, depends_on=[depends_on]) "time": "10", "mem": "100G"}, depends_on=[depends_on])
return id return id
...@@ -159,13 +170,12 @@ class WorkFlows(object): ...@@ -159,13 +170,12 @@ class WorkFlows(object):
"""Given that directories with wrfinput files exist, """Given that directories with wrfinput files exist,
update these wrfinput files with warm bubbles update these wrfinput files with warm bubbles
""" """
pstr = ' ' pstr = ' '
if perturb: if perturb:
pstr = ' perturb' pstr = ' perturb'
cmd = self.cluster.python+' '+self.cluster.scripts_rundir+'/create_wbubble_wrfinput.py'+pstr cmd = self.cluster.python+' '+self.cluster.scripts_rundir+'/create_wbubble_wrfinput.py'+pstr
id = self.cluster.run_job(cmd, "ins_wbub-"+exp.expname, cfg_update={"time": "5"}, depends_on=[depends_on]) id = self.cluster.run_job(cmd, "ins_wbub-"+self.exp.expname, cfg_update={"time": "5"}, depends_on=[depends_on])
return id return id
def run_ENS(self, begin, end, depends_on=None, first_minute=False, def run_ENS(self, begin, end, depends_on=None, first_minute=False,
...@@ -198,28 +208,28 @@ class WorkFlows(object): ...@@ -198,28 +208,28 @@ class WorkFlows(object):
id = depends_on id = depends_on
restart_flag = '.false.' if not input_is_restart else '.true.' restart_flag = '.false.' if not input_is_restart else '.true.'
wrf_cmd = script_to_str(self.cluster.run_WRF wrf_cmd = script_to_str(self.cluster.run_WRF
).replace('<exp.expname>', exp.expname ).replace('<exp.expname>', self.exp.expname
).replace('<cluster.wrf_rundir_base>', self.cluster.wrf_rundir_base ).replace('<cluster.wrf_rundir_base>', self.cluster.wrf_rundir_base
).replace('<cluster.wrf_modules>', self.cluster.wrf_modules) ).replace('<cluster.wrf_modules>', self.cluster.wrf_modules)
# first minute forecast (needed for validating a radiance assimilation) # first minute forecast (needed for validating a radiance assimilation)
if first_minute: if first_minute:
hist_interval = 1 # to get an output after 1 minute
radt = 1 # to get a cloud fraction CFRAC after 1 minute
id = prepare_WRF_inputfiles(begin, begin+dt.timedelta(minutes=1), id = prepare_WRF_inputfiles(begin, begin+dt.timedelta(minutes=1),
hist_interval=hist_interval, radt=radt, output_restart_interval=output_restart_interval, depends_on=id) hist_interval=1, # to get an output after 1 minute
radt=1, # to get a cloud fraction CFRAC after 1 minute
output_restart_interval=output_restart_interval, depends_on=id)
id = self.cluster.run_job(wrf_cmd, "WRF-"+exp.expname, id = self.cluster.run_job(wrf_cmd, "WRF-"+self.exp.expname,
cfg_update={"array": "1-"+str(self.cluster.size_jobarray), "ntasks": "10", "nodes": "1", cfg_update={"array": "1-"+str(self.cluster.size_jobarray), "ntasks": "10", "nodes": "1",
"time": "10", "mem": "40G"}, "time": "10", "mem": "40G"},
depends_on=[id]) depends_on=[id])
# forecast for the whole forecast duration # forecast for the whole forecast duration
id = prepare_WRF_inputfiles(depends_on=id) id = prepare_WRF_inputfiles(begin, end, output_restart_interval=output_restart_interval, depends_on=id)
time_in_simulation_hours = (end-begin).total_seconds()/3600 time_in_simulation_hours = (end-begin).total_seconds()/3600
runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9) # usually below 9 min/hour runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9) # usually below 9 min/hour
id = self.cluster.run_job(wrf_cmd, "WRF-"+exp.expname, id = self.cluster.run_job(wrf_cmd, "WRF-"+self.exp.expname,
cfg_update={"array": "1-"+str(self.cluster.size_jobarray), "ntasks": "10", "nodes": "1", cfg_update={"array": "1-"+str(self.cluster.size_jobarray), "ntasks": "10", "nodes": "1",
"time": str(runtime_wallclock_mins_expected), "mem": "40G"}, "time": str(runtime_wallclock_mins_expected), "mem": "40G"},
depends_on=[id]) depends_on=[id])
...@@ -247,7 +257,7 @@ class WorkFlows(object): ...@@ -247,7 +257,7 @@ class WorkFlows(object):
+prior_valid_time.strftime('%Y-%m-%d_%H:%M ') +prior_valid_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp) +prior_path_exp)
id = self.cluster.run_job(cmd, "Assim-"+exp.expname, cfg_update={"ntasks": "12", "time": "60", id = self.cluster.run_job(cmd, "Assim-"+self.exp.expname, cfg_update={"ntasks": "12", "time": "60",
"mem": "200G", "ntasks-per-node": "12", "ntasks-per-core": "2"}, depends_on=[depends_on]) "mem": "200G", "ntasks-per-node": "12", "ntasks-per-core": "2"}, depends_on=[depends_on])
return id return id
...@@ -264,19 +274,19 @@ class WorkFlows(object): ...@@ -264,19 +274,19 @@ class WorkFlows(object):
+prior_init_time.strftime(' %Y-%m-%d_%H:%M') +prior_init_time.strftime(' %Y-%m-%d_%H:%M')
+prior_valid_time.strftime(' %Y-%m-%d_%H:%M') +prior_valid_time.strftime(' %Y-%m-%d_%H:%M')
+tnew) +tnew)
id = self.cluster.run_job(cmd, "IC-prior-"+exp.expname, cfg_update=dict(time="8"), depends_on=[depends_on]) id = self.cluster.run_job(cmd, "IC-prior-"+self.exp.expname, cfg_update=dict(time="8"), depends_on=[depends_on])
return id return id
def update_IC_from_DA(self, assim_time, depends_on=None): def update_IC_from_DA(self, assim_time, depends_on=None):
cmd = self.cluster.python+' '+self.cluster.scripts_rundir+'/update_IC.py '+assim_time.strftime('%Y-%m-%d_%H:%M') cmd = self.cluster.python+' '+self.cluster.scripts_rundir+'/update_IC.py '+assim_time.strftime('%Y-%m-%d_%H:%M')
id = self.cluster.run_job(cmd, "IC-update-"+exp.expname, cfg_update=dict(time="8"), depends_on=[depends_on]) id = self.cluster.run_job(cmd, "IC-update-"+self.exp.expname, cfg_update=dict(time="8"), depends_on=[depends_on])
return id return id
def create_satimages(self, init_time, depends_on=None): def create_satimages(self, init_time, depends_on=None):
cmd = 'module purge; module load netcdf-fortran/4.5.3-gcc-8.5.0-qsqbozc; python ~/RTTOV-WRF/run_init.py '+self.cluster.archivedir+init_time.strftime('/%Y-%m-%d_%H:%M/') cmd = 'module purge; module load netcdf-fortran/4.5.3-gcc-8.5.0-qsqbozc; python ~/RTTOV-WRF/run_init.py '+self.cluster.archivedir+init_time.strftime('/%Y-%m-%d_%H:%M/')
id = self.cluster.run_job(cmd, "RTTOV-"+exp.expname, cfg_update={"ntasks": "12", "time": "120", "mem": "200G"}, depends_on=[depends_on]) id = self.cluster.run_job(cmd, "RTTOV-"+self.exp.expname, cfg_update={"ntasks": "12", "time": "120", "mem": "200G"}, depends_on=[depends_on])
return id return id
...@@ -288,22 +298,22 @@ class WorkFlows(object): ...@@ -288,22 +298,22 @@ class WorkFlows(object):
def verify_sat(self, depends_on=None): def verify_sat(self, depends_on=None):
cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node sat verif1d FSS BS' cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+self.exp.expname+' has_node sat verif1d FSS BS'
self.cluster.run_job(cmd, "verif-SAT-"+exp.expname, self.cluster.run_job(cmd, "verif-SAT-"+self.exp.expname,
cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "15", cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "15",
"ntasks-per-node": "15", "ntasks-per-core": "1", "mem": "100G",}, depends_on=[depends_on]) "ntasks-per-node": "15", "ntasks-per-core": "1", "mem": "100G",}, depends_on=[depends_on])
def verify_wrf(self, depends_on=None): def verify_wrf(self, depends_on=None):
cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node wrf verif1d FSS BS' cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+self.exp.expname+' has_node wrf verif1d FSS BS'
self.cluster.run_job(cmd, "verif-WRF-"+exp.expname, self.cluster.run_job(cmd, "verif-WRF-"+self.exp.expname,
cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "15", cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "15",
"ntasks-per-node": "15", "ntasks-per-core": "1", "mem": "180G"}, depends_on=[depends_on]) "ntasks-per-node": "15", "ntasks-per-core": "1", "mem": "180G"}, depends_on=[depends_on])
def verify_fast(self, depends_on=None): def verify_fast(self, depends_on=None):
cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_fast/plot_single_exp.py '+exp.expname cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_fast/plot_single_exp.py '+self.exp.expname
self.cluster.run_job(cmd, "verif-fast-"+exp.expname, self.cluster.run_job(cmd, "verif-fast-"+self.exp.expname,
cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1", cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1",
"ntasks-per-node": "1", "ntasks-per-core": "1"}, depends_on=[depends_on]) "ntasks-per-node": "1", "ntasks-per-core": "1"}, depends_on=[depends_on])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment