From e6a1e4addce16635f58dbc47d64aa66c28d1f9ec Mon Sep 17 00:00:00 2001 From: lkugler <lukas.kugler@gmail.com> Date: Wed, 3 May 2023 17:32:07 +0200 Subject: [PATCH] config change --- config/{exp_example.py => cfg.py} | 0 cycled_exp.py | 2 +- dartwrf/assim_synth_obs.py | 118 +++++++++++++++--------------- dartwrf/create_obs_upfront.py | 10 +-- dartwrf/create_obsseq.py | 4 +- dartwrf/dart_nml.py | 28 +++---- dartwrf/evaluate_posterior.py | 2 +- dartwrf/evaluate_prior.py | 2 +- dartwrf/link_dart_rttov.py | 16 ++-- dartwrf/obsseq.py | 2 +- dartwrf/utils.py | 4 +- dartwrf/workflows.py | 100 +++++++++++++------------ 12 files changed, 150 insertions(+), 138 deletions(-) rename config/{exp_example.py => cfg.py} (100%) diff --git a/config/exp_example.py b/config/cfg.py similarity index 100% rename from config/exp_example.py rename to config/cfg.py diff --git a/cycled_exp.py b/cycled_exp.py index 8e28725..ff30d15 100755 --- a/cycled_exp.py +++ b/cycled_exp.py @@ -7,7 +7,7 @@ if __name__ == "__main__": """ Run a cycled OSSE with WRF and DART. """ - w = WorkFlows(exp_config='exp_example.py', server_config='jet.py') + w = WorkFlows(exp_config='cfg.py', server_config='jet.py') timedelta_integrate = dt.timedelta(minutes=15) timedelta_btw_assim = dt.timedelta(minutes=15) diff --git a/dartwrf/assim_synth_obs.py b/dartwrf/assim_synth_obs.py index ddc39b3..7e5b40f 100755 --- a/dartwrf/assim_synth_obs.py +++ b/dartwrf/assim_synth_obs.py @@ -24,19 +24,19 @@ def link_nature_to_dart_truth(time): # get wrfout_d01 from nature run shutil.copy(time.strftime(exp.nature+'/'+wrfout_format), - cluster.dartrundir + "/wrfout_d01") + cluster.dart_rundir + "/wrfout_d01") # DART may need a wrfinput file as well, which serves as a template for dimension sizes - symlink(cluster.dartrundir + "/wrfout_d01", - cluster.dartrundir + "/wrfinput_d01") + symlink(cluster.dart_rundir + "/wrfout_d01", + cluster.dart_rundir + "/wrfinput_d01") print("linked", time.strftime(exp.nature+'/'+wrfout_format), - "to", cluster.dartrundir + "/wrfout_d01") + "to", cluster.dart_rundir + "/wrfout_d01") def prepare_nature_dart(time): print("linking nature to DART & georeferencing") link_nature_to_dart_truth(time) - wrfout_add_geo.run(cluster.geo_em, cluster.dartrundir + "/wrfout_d01") + wrfout_add_geo.run(cluster.geo_em, cluster.dart_rundir + "/wrfout_d01") def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_path_exp): @@ -57,7 +57,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ + str(iens) + prior_valid_time.strftime("/"+wrfout_format) ) - dart_ensdir = cluster.dartrundir + "/prior_ens" + str(iens) + dart_ensdir = cluster.dart_rundir + "/prior_ens" + str(iens) wrfout_dart = dart_ensdir + "/wrfout_d01" os.makedirs(dart_ensdir, exist_ok=True) @@ -69,7 +69,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ if assim_time != prior_valid_time: print("overwriting time in prior from nature wrfout") shell(cluster.ncks+ " -A -v XTIME,Times "+ - cluster.dartrundir+"/wrfout_d01 "+ wrfout_dart) + cluster.dart_rundir+"/wrfout_d01 "+ wrfout_dart) # this seems to be necessary (else wrong level selection) wrfout_add_geo.run(cluster.geo_em, wrfout_dart) @@ -78,18 +78,18 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ write_list_of_outputfiles() print("removing preassim and filter_restart") - os.system("rm -rf " + cluster.dartrundir + "/preassim_*") - os.system("rm -rf " + cluster.dartrundir + "/filter_restart*") - os.system("rm -rf " + cluster.dartrundir + "/output_mean*") - os.system("rm -rf " + cluster.dartrundir + "/output_sd*") - os.system("rm -rf " + cluster.dartrundir + "/perfect_output_*") - os.system("rm -rf " + cluster.dartrundir + "/obs_seq.fina*") + os.system("rm -rf " + cluster.dart_rundir + "/preassim_*") + os.system("rm -rf " + cluster.dart_rundir + "/filter_restart*") + os.system("rm -rf " + cluster.dart_rundir + "/output_mean*") + os.system("rm -rf " + cluster.dart_rundir + "/output_sd*") + os.system("rm -rf " + cluster.dart_rundir + "/perfect_output_*") + os.system("rm -rf " + cluster.dart_rundir + "/obs_seq.fina*") def write_list_of_inputfiles_prior(): files = [] for iens in range(1, exp.n_ens+1): files.append("./prior_ens" + str(iens) + "/wrfout_d01") - write_txt(files, cluster.dartrundir+'/input_list.txt') + write_txt(files, cluster.dart_rundir+'/input_list.txt') def write_list_of_inputfiles_posterior(assim_time): filedir = cluster.archivedir+assim_time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") @@ -97,43 +97,43 @@ def write_list_of_inputfiles_posterior(assim_time): files = [] for iens in range(1, exp.n_ens+1): files.append(filedir+'filter_restart_d01.'+str(iens).zfill(4)) - write_txt(files, cluster.dartrundir+'/input_list.txt') + write_txt(files, cluster.dart_rundir+'/input_list.txt') def write_list_of_outputfiles(): files = [] for iens in range(1, exp.n_ens+1): files.append("./filter_restart_d01." + str(iens).zfill(4)) - write_txt(files, cluster.dartrundir+'/output_list.txt') + write_txt(files, cluster.dart_rundir+'/output_list.txt') def run_perfect_model_obs(nproc=12, verbose=True): if verbose: print("generating observations - running ./perfect_model_obs") - os.chdir(cluster.dartrundir) + os.chdir(cluster.dart_rundir) - try_remove(cluster.dartrundir + "/obs_seq.out") - if not os.path.exists(cluster.dartrundir + "/obs_seq.in"): - raise RuntimeError("obs_seq.in does not exist in " + cluster.dartrundir) + try_remove(cluster.dart_rundir + "/obs_seq.out") + if not os.path.exists(cluster.dart_rundir + "/obs_seq.in"): + raise RuntimeError("obs_seq.in does not exist in " + cluster.dart_rundir) shell(cluster.dart_modules+' mpirun -np '+str(nproc)+" ./perfect_model_obs > log.perfect_model_obs") - if not os.path.exists(cluster.dartrundir + "/obs_seq.out"): + if not os.path.exists(cluster.dart_rundir + "/obs_seq.out"): raise RuntimeError( - "obs_seq.out does not exist in " + cluster.dartrundir, - "\n look for " + cluster.dartrundir + "/log.perfect_model_obs") + "obs_seq.out does not exist in " + cluster.dart_rundir, + "\n look for " + cluster.dart_rundir + "/log.perfect_model_obs") def filter(nproc=12): print("time now", dt.datetime.now()) print("running filter") - os.chdir(cluster.dartrundir) - try_remove(cluster.dartrundir + "/obs_seq.final") + os.chdir(cluster.dart_rundir) + try_remove(cluster.dart_rundir + "/obs_seq.final") t = time_module.time() if nproc < 12: shell(cluster.dart_modules+' mpirun -np 12 ./filter &> log.filter') else: # -genv I_MPI_PIN_PROCESSOR_LIST=0-"+str(int(nproc) - 1) shell(cluster.dart_modules+" mpirun -np "+str(int(nproc))+" ./filter > log.filter") print("./filter took", int(time_module.time() - t), "seconds") - if not os.path.isfile(cluster.dartrundir + "/obs_seq.final"): + if not os.path.isfile(cluster.dart_rundir + "/obs_seq.final"): raise RuntimeError( - "obs_seq.final does not exist in " + cluster.dartrundir, - "\n look for " + cluster.dartrundir + "/log.filter") + "obs_seq.final does not exist in " + cluster.dart_rundir, + "\n look for " + cluster.dart_rundir + "/log.filter") ############### archiving @@ -144,28 +144,28 @@ def archive_filteroutput(time): archive_dir = cluster.archivedir + "/obs_seq_final/" mkdir(archive_dir) fout = archive_dir + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.final") - copy(cluster.dartrundir + "/obs_seq.final", fout) + copy(cluster.dart_rundir + "/obs_seq.final", fout) print(fout, "saved.") archive_assim = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") mkdir(archive_assim) - copy(cluster.dartrundir + "/input.nml", archive_assim + "/input.nml") + copy(cluster.dart_rundir + "/input.nml", archive_assim + "/input.nml") for iens in range(1, exp.n_ens + 1): # single members copy( - cluster.dartrundir + "/filter_restart_d01." + str(iens).zfill(4), + cluster.dart_rundir + "/filter_restart_d01." + str(iens).zfill(4), archive_assim + "/filter_restart_d01." + str(iens).zfill(4), ) try: # not necessary for next forecast run for iens in range(1, exp.n_ens + 1): copy( - cluster.dartrundir + "/postassim_member_" + str(iens).zfill(4) + ".nc", + cluster.dart_rundir + "/postassim_member_" + str(iens).zfill(4) + ".nc", archive_assim + "/postassim_member_" + str(iens).zfill(4) + ".nc", ) for f in ["output_mean.nc", "output_sd.nc"]: # copy mean and sd to archive - copy(cluster.dartrundir + "/" + f, archive_assim + "/" + f) + copy(cluster.dart_rundir + "/" + f, archive_assim + "/" + f) except Exception as e: warnings.warn(str(e)) @@ -273,10 +273,10 @@ def qc_obs(time, oso, osf_prior): # for archiving f_out_archive = cluster.archivedir + "/obs_seq_out/" + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC") os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True) - copy(cluster.dartrundir + "/obs_seq.out", f_out_archive) + copy(cluster.dart_rundir + "/obs_seq.out", f_out_archive) # for assimilation later - f_out_dart = cluster.dartrundir+'/obs_seq.out' + f_out_dart = cluster.dart_rundir+'/obs_seq.out' oso.to_dart(f_out_dart) print('saved', f_out_dart) @@ -286,8 +286,8 @@ def evaluate(assim_time, """Depending on input_list.txt, this function calculates either prior or posterior obs space values. """ - os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in - os.chdir(cluster.dartrundir) + os.makedirs(cluster.dart_rundir, exist_ok=True) # create directory to run DART in + os.chdir(cluster.dart_rundir) # link DART binaries to run_DART os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py") @@ -298,8 +298,8 @@ def evaluate(assim_time, print("prepare nature") prepare_nature_dart(assim_time) # link WRF files to DART directory - if not os.path.isfile(cluster.dartrundir+'/obs_seq.out'): - raise RuntimeError(cluster.dartrundir+'/obs_seq.out does not exist') + if not os.path.isfile(cluster.dart_rundir+'/obs_seq.out'): + raise RuntimeError(cluster.dart_rundir+'/obs_seq.out does not exist') dart_nml.write_namelist(just_prior_values=True) filter(nproc=6) @@ -307,10 +307,10 @@ def evaluate(assim_time, # archiving fout = cluster.archivedir + "/obs_seq_final/" + assim_time.strftime(output_format) os.makedirs(cluster.archivedir + "/obs_seq_final/", exist_ok=True) - copy(cluster.dartrundir + "/obs_seq.final", fout) + copy(cluster.dart_rundir + "/obs_seq.final", fout) print(fout, "saved.") - osf = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.final") + osf = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.final") return osf @@ -324,21 +324,21 @@ def generate_obsseq_out(time): if_vis_obs = oso.df['kind'].values == 262 if_obs_below_surface_albedo = oso.df['observations'].values < clearsky_albedo oso.df.loc[if_vis_obs & if_obs_below_surface_albedo, ('observations')] = clearsky_albedo - oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") + oso.to_dart(f=cluster.dart_rundir + "/obs_seq.out") return oso def apply_superobbing(oso): try: f_oso = dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-before_superob") - shutil.copy(cluster.dartrundir + "/obs_seq.out-before_superob", f_oso) + shutil.copy(cluster.dart_rundir + "/obs_seq.out-before_superob", f_oso) print('saved', f_oso) except Exception as e: warnings.warn(str(e)) print(" 2.3) superobbing to", exp.superob_km, "km") oso.df = oso.df.superob(window_km=exp.superob_km) - oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") + oso.to_dart(f=cluster.dart_rundir + "/obs_seq.out") ############################## @@ -350,7 +350,7 @@ def generate_obsseq_out(time): run_perfect_model_obs() # generate observation, draws from gaussian print(" 2.1) obs preprocessing") - oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") + oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out") oso = ensure_physical_vis(oso) @@ -359,7 +359,7 @@ def generate_obsseq_out(time): # archive complete obsseqout f_oso = dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out") - shutil.copy(cluster.dartrundir + "/obs_seq.out", f_oso) + shutil.copy(cluster.dart_rundir + "/obs_seq.out", f_oso) print('saved', f_oso) return oso @@ -369,9 +369,9 @@ def get_obsseq_out(time): # did we specify an obsseqout inputfile? if exp.use_existing_obsseq != False: f_obsseq = time.strftime(exp.use_existing_obsseq) - copy(f_obsseq, cluster.dartrundir+'/obs_seq.out') - print(f_obsseq, 'copied to', cluster.dartrundir+'/obs_seq.out') - oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") + copy(f_obsseq, cluster.dart_rundir+'/obs_seq.out') + print(f_obsseq, 'copied to', cluster.dart_rundir+'/obs_seq.out') + oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out") else: # decision to NOT use existing obs_seq.out file @@ -379,9 +379,9 @@ def get_obsseq_out(time): # f_oso_thisexp = cluster.archivedir+'/obs_seq_out/'+time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out") # if os.path.isfile(f_oso_thisexp): # # oso exists - # copy(f_oso_thisexp, cluster.dartrundir+'/obs_seq.out') + # copy(f_oso_thisexp, cluster.dart_rundir+'/obs_seq.out') # print('copied existing obsseqout from', f_oso_thisexp) - # oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") + # oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out") # else: # generate observations with new observation noise @@ -403,7 +403,7 @@ def prepare_inflation_2(time, prior_init_time): f_default = cluster.archive_base + "/input_priorinf_mean.nc" f_prior = dir_priorinf + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc") - f_new = cluster.dartrundir + '/input_priorinf_mean.nc' + f_new = cluster.dart_rundir + '/input_priorinf_mean.nc' if os.path.isfile(f_prior): copy(f_prior, f_new) @@ -414,7 +414,7 @@ def prepare_inflation_2(time, prior_init_time): f_default = cluster.archive_base + "/input_priorinf_sd.nc" f_prior = dir_priorinf + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_sd.nc") - f_new = cluster.dartrundir + '/input_priorinf_sd.nc' + f_new = cluster.dart_rundir + '/input_priorinf_sd.nc' if os.path.isfile(f_prior): copy(f_prior, f_new) @@ -427,12 +427,12 @@ def archive_inflation_2(time): dir_output = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") os.makedirs(dir_output, exist_ok=True) - f_output = cluster.dartrundir + '/output_priorinf_sd.nc' + f_output = cluster.dart_rundir + '/output_priorinf_sd.nc' f_archive = dir_output + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_sd.nc") copy(f_output, f_archive) print(f_archive, 'saved') - f_output = cluster.dartrundir + '/output_priorinf_mean.nc' + f_output = cluster.dart_rundir + '/output_priorinf_mean.nc' f_archive = dir_output + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc") copy(f_output, f_archive) print(f_archive, 'saved') @@ -461,8 +461,8 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp): nproc = cluster.max_nproc archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/") - os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in - os.chdir(cluster.dartrundir) + os.makedirs(cluster.dart_rundir, exist_ok=True) # create directory to run DART in + os.chdir(cluster.dart_rundir) # link DART binaries to run_DART os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py") @@ -484,7 +484,7 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp): osf_prior = evaluate(time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_prior_allobs") print(" 2.2) assign observation-errors for assimilation ") - set_obserr_assimilate_in_obsseqout(oso, osf_prior, outfile=cluster.dartrundir + "/obs_seq.out") + set_obserr_assimilate_in_obsseqout(oso, osf_prior, outfile=cluster.dart_rundir + "/obs_seq.out") if getattr(exp, "reject_smallFGD", False): print(" 2.3) reject observations? ") @@ -505,7 +505,7 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp): write_list_of_inputfiles_posterior(time) if getattr(exp, "reject_smallFGD", False): copy(cluster.archivedir+'/obs_seq_out/'+time.strftime('%Y-%m-%d_%H:%M_obs_seq.out-beforeQC'), - cluster.dartrundir+'/obs_seq.out') + cluster.dart_rundir+'/obs_seq.out') evaluate(time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_posterior_allobs") diff --git a/dartwrf/create_obs_upfront.py b/dartwrf/create_obs_upfront.py index 3ccd5a2..510261c 100755 --- a/dartwrf/create_obs_upfront.py +++ b/dartwrf/create_obs_upfront.py @@ -31,7 +31,7 @@ if __name__ == "__main__": print('will save obsseq to', dir_for_obsseqout) os.makedirs(dir_for_obsseqout, exist_ok=True) - os.chdir(cluster.dartrundir) + os.chdir(cluster.dart_rundir) # link DART binaries to run_DART os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py") @@ -49,7 +49,7 @@ if __name__ == "__main__": aso.run_perfect_model_obs(nproc=6) # create observations (obs_seq.out) - oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") + oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out") if True: # set reflectance < surface albedo to surface albedo print(" 2.2) removing obs below surface albedo ") @@ -57,13 +57,13 @@ if __name__ == "__main__": if_obs_below_surface_albedo = oso.df['observations'].values < 0.2928 oso.df.loc[if_vis_obs & if_obs_below_surface_albedo, ('observations')] = 0.2928 - oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") + oso.to_dart(f=cluster.dart_rundir + "/obs_seq.out") if getattr(exp, "superob_km", False): print(" 2.3) superobbing to", exp.superob_km, "km") oso.df = oso.df.superob(window_km=exp.superob_km) - copy(cluster.dartrundir + "/obs_seq.out", cluster.dartrundir + "/obs_seq.out-orig") - oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") + copy(cluster.dart_rundir + "/obs_seq.out", cluster.dart_rundir + "/obs_seq.out-orig") + oso.to_dart(f=cluster.dart_rundir + "/obs_seq.out") aso.archive_osq_out(time, dir_obsseq=dir_for_obsseqout) diff --git a/dartwrf/create_obsseq.py b/dartwrf/create_obsseq.py index 12aeab6..5348d19 100755 --- a/dartwrf/create_obsseq.py +++ b/dartwrf/create_obsseq.py @@ -173,7 +173,7 @@ kind def create_obs_seq_in(time_dt, list_obscfg, - output_path=cluster.dartrundir+'/obs_seq.in'): + output_path=cluster.dart_rundir+'/obs_seq.in'): """Create obs_seq.in with multiple obs types in one file Args: @@ -314,6 +314,6 @@ if __name__ == '__main__': if False: error_assimilate = 5.*np.ones(n_obs*len(radar['heights'])) import assim_synth_obs as aso - aso.replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) + aso.replace_errors_obsseqout(cluster.dart_rundir+'/obs_seq.out', error_assimilate) diff --git a/dartwrf/dart_nml.py b/dartwrf/dart_nml.py index 2271506..7299304 100644 --- a/dartwrf/dart_nml.py +++ b/dartwrf/dart_nml.py @@ -32,23 +32,25 @@ def read_namelist(filepath): # namelist section if line.startswith('&'): - section = line.lower() + section = line d[section] = dict() + continue + + if '/' in line: + continue # skip end of namelist section - # namelist variable - else: + try: # split line into variable name and value var, val = line.split('=') - var = var.strip().lower() - val = val.strip() + val = val.strip().strip(',').strip() - # split value into list if possible - if ',' in val: - val = val.split(',') - val = [v.strip() for v in val] + except ValueError: + # If the above split failed, we are still, we are still in the previous variable + nextline_values = line.strip().split(',').strip() + val = val + nextline_values - # add variable to dictionary - d[section][var] = val + # add variable to dictionary + d[section][var] = val return d @@ -209,8 +211,8 @@ def write_namelist(just_prior_values=False): raise ValueError("Selected vertical localization, but observations contain satellite obs -> Not possible.") # write to file - write_namelist_from_dict(nml, cluster.dartrundir + "/input.nml") + write_namelist_from_dict(nml, cluster.dart_rundir + "/input.nml") # append section for RTTOV rttov_nml = cluster.scriptsdir + "/../templates/obs_def_rttov.VIS.nml" - append_file(cluster.dartrundir + "/input.nml", rttov_nml) \ No newline at end of file + append_file(cluster.dart_rundir + "/input.nml", rttov_nml) \ No newline at end of file diff --git a/dartwrf/evaluate_posterior.py b/dartwrf/evaluate_posterior.py index 59c5c02..0f97783 100755 --- a/dartwrf/evaluate_posterior.py +++ b/dartwrf/evaluate_posterior.py @@ -22,6 +22,6 @@ if __name__ == "__main__": oso_input = cluster.archivedir+'/obs_seq_out' + assim_time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC") if not os.path.isfile(oso_input): oso_input = cluster.archivedir+'/obs_seq_out' + assim_time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out") - shutil.copy(oso_input, cluster.dartrundir+'/obs_seq.out') + shutil.copy(oso_input, cluster.dart_rundir+'/obs_seq.out') aso.evaluate(assim_time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_posterior_allobs") \ No newline at end of file diff --git a/dartwrf/evaluate_prior.py b/dartwrf/evaluate_prior.py index 21751ac..5340e5c 100755 --- a/dartwrf/evaluate_prior.py +++ b/dartwrf/evaluate_prior.py @@ -27,7 +27,7 @@ if __name__ == "__main__": oso_input = cluster.archivedir+'/obs_seq_out' + assim_time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC") if not os.path.isfile(oso_input): oso_input = cluster.archivedir+'/obs_seq_out' + assim_time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out") - shutil.copy(oso_input, cluster.dartrundir+'/obs_seq.out') + shutil.copy(oso_input, cluster.dart_rundir+'/obs_seq.out') aso.evaluate(assim_time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_prior_allobs") \ No newline at end of file diff --git a/dartwrf/link_dart_rttov.py b/dartwrf/link_dart_rttov.py index daf923b..4b1e0cf 100644 --- a/dartwrf/link_dart_rttov.py +++ b/dartwrf/link_dart_rttov.py @@ -11,8 +11,8 @@ if __name__ == "__main__": bins = ['perfect_model_obs', 'filter', 'obs_diag', 'obs_seq_to_netcdf'] for b in bins: symlink(joinp(cluster.dart_srcdir, b), - joinp(cluster.dartrundir, b)) - print(joinp(cluster.dartrundir, b), 'created') + joinp(cluster.dart_rundir, b)) + print(joinp(cluster.dart_rundir, b), 'created') rttov_files = ['rttov13pred54L/rtcoef_msg_4_seviri_o3.dat', #'mfasis_lut/rttov_mfasis_cld_msg_4_seviri_deff.dat', @@ -25,17 +25,17 @@ if __name__ == "__main__": destname = 'rtcoef_msg_4_seviri.dat' symlink(cluster.rttov_srcdir + f_src, - cluster.dartrundir+'/'+destname) + cluster.dart_rundir+'/'+destname) ################## - symlink(cluster.dartrundir+'/rttov_mfasis_cld_msg_4_seviri_deff.H5', - cluster.dartrundir+'/rttov_mfasis_cld_msg_4_seviri.H5') + symlink(cluster.dart_rundir+'/rttov_mfasis_cld_msg_4_seviri_deff.H5', + cluster.dart_rundir+'/rttov_mfasis_cld_msg_4_seviri.H5') symlink(cluster.dart_srcdir+'/../../../observations/forward_operators/rttov_sensor_db.csv', - cluster.dartrundir+'/rttov_sensor_db.csv') + cluster.dart_rundir+'/rttov_sensor_db.csv') symlink(cluster.dart_srcdir+'/../../../assimilation_code/programs/gen_sampling_err_table/' +'work/sampling_error_correction_table.nc', - cluster.dartrundir+'/sampling_error_correction_table.nc') + cluster.dart_rundir+'/sampling_error_correction_table.nc') - print('prepared DART & RTTOV links in', cluster.dartrundir) + print('prepared DART & RTTOV links in', cluster.dart_rundir) diff --git a/dartwrf/obsseq.py b/dartwrf/obsseq.py index 9f7dca5..439d09f 100755 --- a/dartwrf/obsseq.py +++ b/dartwrf/obsseq.py @@ -717,4 +717,4 @@ if __name__ == "__main__": obs.plot(f_out="./map_obs_superobs.png") # write to obs_seq.out in DART format - # obs.to_dart(f=cluster.dartrundir + "/obs_seq.out") + # obs.to_dart(f=cluster.dart_rundir + "/obs_seq.out") diff --git a/dartwrf/utils.py b/dartwrf/utils.py index 9844bc3..54dcf78 100755 --- a/dartwrf/utils.py +++ b/dartwrf/utils.py @@ -13,7 +13,7 @@ class Experiment(object): class ClusterConfig(object): """Collection of variables regarding the cluster configuration""" def __init__(self, exp): - self.exp = exp + self.exp = exp # makes derived properties available self.dart_modules = '' # default value @property @@ -35,7 +35,7 @@ class ClusterConfig(object): return self.archivedir+'/DART-WRF/' @property - def dartrundir(self): + def dart_rundir(self): """Path to the directory where DART programs will run Includes the experiment name """ diff --git a/dartwrf/workflows.py b/dartwrf/workflows.py index 3b2dc38..630334d 100644 --- a/dartwrf/workflows.py +++ b/dartwrf/workflows.py @@ -10,7 +10,6 @@ import datetime as dt import importlib from dartwrf.utils import script_to_str -from config.cfg import exp class WorkFlows(object): def __init__(self, exp_config='cfg.py', server_config='server.py'): @@ -23,14 +22,40 @@ class WorkFlows(object): Attributes: cluster (obj): cluster configuration as defined in server_config file - Note: - in WorkFlows, we load the config from the git cloned folder - in all other dartwrf scripts, load the config from cluster.scripts_rundir + we load the config from load the config from cluster.scripts_rundir/config/cfg.py """ print('------ start exp from ', exp_config, ' and ', server_config, ' ------') - # exp = __import__('config/'+exp_config) - # load python config file + + def copy_dartwrf_to_archive(): + # Copy scripts to self.cluster.archivedir folder + os.makedirs(self.cluster.archivedir, exist_ok=True) + try: + shutil.copytree(self.cluster.scriptsdir, self.cluster.scripts_rundir) + print('scripts have been copied to', self.cluster.archivedir) + except FileExistsError as e: + warnings.warn(str(e)) + except: + raise + + def copy_config_to_archive(): + # later, we can load the exp cfg with `from config.cfg import exp` + shutil.copyfile('config/'+exp_config, self.cluster.scripts_rundir+'/config/cfg.py') + + # later, we can load the cluster cfg with `from config.cluster import cluster` + shutil.copyfile('config/'+server_config, self.cluster.scripts_rundir+'/config/cluster.py') # whatever server, the config name is always the same! + + # copy config file to current config folder + try: + shutil.copyfile('config/'+exp_config, '/'.join(__file__.split('/')[:-2])+'/config/cfg.py') + except shutil.SameFileError: + pass + + # load python config files self.cluster = importlib.import_module('config.'+server_config.strip('.py')).cluster + self.exp = importlib.import_module('config.'+exp_config.strip('.py')).exp + + copy_dartwrf_to_archive() + copy_config_to_archive() # Set paths and backup scripts self.cluster.log_dir = self.cluster.archivedir+'/logs/' @@ -88,21 +113,7 @@ class WorkFlows(object): _dict_to_py(_obskind_read(), self.cluster.scriptsdir+'/../config/obskind.py') - # Copy scripts to self.cluster.archivedir folder - os.makedirs(self.cluster.archivedir, exist_ok=True) - try: - shutil.copytree(self.cluster.scriptsdir, self.cluster.scripts_rundir) - print('scripts have been copied to', self.cluster.archivedir) - except FileExistsError as e: - warnings.warn(str(e)) - except: - raise - - # later, we can load the exp cfg with `from config.cfg import exp` - shutil.copy('config/'+exp_config, self.cluster.scripts_rundir+'/config/cfg.py') - - # later, we can load the cluster cfg with `from config.cluster import cluster` - shutil.copy('config/'+server_config, self.cluster.scripts_rundir+'/config/cluster.py') # whatever server, the config name is always the same! + # probably not needed # shutil.copy('config/'+server_config, 'config/cluster.py') # whatever server, the config name is always the same! @@ -137,21 +148,21 @@ class WorkFlows(object): """ cmd = """# run ideal.exe in parallel export SLURM_STEP_GRES=none - for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) + for ((n=1; n<="""+str(self.exp.n_ens)+"""; n++)) do - rundir="""+self.cluster.wrf_rundir_base+'/'+exp.expname+"""/$n + rundir="""+self.cluster.wrf_rundir_base+'/'+self.exp.expname+"""/$n echo $rundir cd $rundir mpirun -np 1 ./ideal.exe & done wait - for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) + for ((n=1; n<="""+str(self.exp.n_ens)+"""; n++)) do - rundir="""+self.cluster.wrf_rundir_base+'/'+exp.expname+"""/$n + rundir="""+self.cluster.wrf_rundir_base+'/'+self.exp.expname+"""/$n mv $rundir/rsl.out.0000 $rundir/rsl.out.input done """ - id = self.cluster.run_job(cmd, "ideal-"+exp.expname, cfg_update={"ntasks": str(exp.n_ens), + id = self.cluster.run_job(cmd, "ideal-"+self.exp.expname, cfg_update={"ntasks": str(self.exp.n_ens), "time": "10", "mem": "100G"}, depends_on=[depends_on]) return id @@ -159,13 +170,12 @@ class WorkFlows(object): """Given that directories with wrfinput files exist, update these wrfinput files with warm bubbles """ - pstr = ' ' if perturb: pstr = ' perturb' cmd = self.cluster.python+' '+self.cluster.scripts_rundir+'/create_wbubble_wrfinput.py'+pstr - id = self.cluster.run_job(cmd, "ins_wbub-"+exp.expname, cfg_update={"time": "5"}, depends_on=[depends_on]) + id = self.cluster.run_job(cmd, "ins_wbub-"+self.exp.expname, cfg_update={"time": "5"}, depends_on=[depends_on]) return id def run_ENS(self, begin, end, depends_on=None, first_minute=False, @@ -198,28 +208,28 @@ class WorkFlows(object): id = depends_on restart_flag = '.false.' if not input_is_restart else '.true.' wrf_cmd = script_to_str(self.cluster.run_WRF - ).replace('<exp.expname>', exp.expname + ).replace('<exp.expname>', self.exp.expname ).replace('<cluster.wrf_rundir_base>', self.cluster.wrf_rundir_base ).replace('<cluster.wrf_modules>', self.cluster.wrf_modules) # first minute forecast (needed for validating a radiance assimilation) if first_minute: - hist_interval = 1 # to get an output after 1 minute - radt = 1 # to get a cloud fraction CFRAC after 1 minute id = prepare_WRF_inputfiles(begin, begin+dt.timedelta(minutes=1), - hist_interval=hist_interval, radt=radt, output_restart_interval=output_restart_interval, depends_on=id) + hist_interval=1, # to get an output after 1 minute + radt=1, # to get a cloud fraction CFRAC after 1 minute + output_restart_interval=output_restart_interval, depends_on=id) - id = self.cluster.run_job(wrf_cmd, "WRF-"+exp.expname, + id = self.cluster.run_job(wrf_cmd, "WRF-"+self.exp.expname, cfg_update={"array": "1-"+str(self.cluster.size_jobarray), "ntasks": "10", "nodes": "1", "time": "10", "mem": "40G"}, depends_on=[id]) # forecast for the whole forecast duration - id = prepare_WRF_inputfiles(depends_on=id) + id = prepare_WRF_inputfiles(begin, end, output_restart_interval=output_restart_interval, depends_on=id) time_in_simulation_hours = (end-begin).total_seconds()/3600 runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9) # usually below 9 min/hour - id = self.cluster.run_job(wrf_cmd, "WRF-"+exp.expname, + id = self.cluster.run_job(wrf_cmd, "WRF-"+self.exp.expname, cfg_update={"array": "1-"+str(self.cluster.size_jobarray), "ntasks": "10", "nodes": "1", "time": str(runtime_wallclock_mins_expected), "mem": "40G"}, depends_on=[id]) @@ -247,7 +257,7 @@ class WorkFlows(object): +prior_valid_time.strftime('%Y-%m-%d_%H:%M ') +prior_path_exp) - id = self.cluster.run_job(cmd, "Assim-"+exp.expname, cfg_update={"ntasks": "12", "time": "60", + id = self.cluster.run_job(cmd, "Assim-"+self.exp.expname, cfg_update={"ntasks": "12", "time": "60", "mem": "200G", "ntasks-per-node": "12", "ntasks-per-core": "2"}, depends_on=[depends_on]) return id @@ -264,19 +274,19 @@ class WorkFlows(object): +prior_init_time.strftime(' %Y-%m-%d_%H:%M') +prior_valid_time.strftime(' %Y-%m-%d_%H:%M') +tnew) - id = self.cluster.run_job(cmd, "IC-prior-"+exp.expname, cfg_update=dict(time="8"), depends_on=[depends_on]) + id = self.cluster.run_job(cmd, "IC-prior-"+self.exp.expname, cfg_update=dict(time="8"), depends_on=[depends_on]) return id def update_IC_from_DA(self, assim_time, depends_on=None): cmd = self.cluster.python+' '+self.cluster.scripts_rundir+'/update_IC.py '+assim_time.strftime('%Y-%m-%d_%H:%M') - id = self.cluster.run_job(cmd, "IC-update-"+exp.expname, cfg_update=dict(time="8"), depends_on=[depends_on]) + id = self.cluster.run_job(cmd, "IC-update-"+self.exp.expname, cfg_update=dict(time="8"), depends_on=[depends_on]) return id def create_satimages(self, init_time, depends_on=None): cmd = 'module purge; module load netcdf-fortran/4.5.3-gcc-8.5.0-qsqbozc; python ~/RTTOV-WRF/run_init.py '+self.cluster.archivedir+init_time.strftime('/%Y-%m-%d_%H:%M/') - id = self.cluster.run_job(cmd, "RTTOV-"+exp.expname, cfg_update={"ntasks": "12", "time": "120", "mem": "200G"}, depends_on=[depends_on]) + id = self.cluster.run_job(cmd, "RTTOV-"+self.exp.expname, cfg_update={"ntasks": "12", "time": "120", "mem": "200G"}, depends_on=[depends_on]) return id @@ -288,22 +298,22 @@ class WorkFlows(object): def verify_sat(self, depends_on=None): - cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node sat verif1d FSS BS' + cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+self.exp.expname+' has_node sat verif1d FSS BS' - self.cluster.run_job(cmd, "verif-SAT-"+exp.expname, + self.cluster.run_job(cmd, "verif-SAT-"+self.exp.expname, cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "15", "ntasks-per-node": "15", "ntasks-per-core": "1", "mem": "100G",}, depends_on=[depends_on]) def verify_wrf(self, depends_on=None): - cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node wrf verif1d FSS BS' + cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+self.exp.expname+' has_node wrf verif1d FSS BS' - self.cluster.run_job(cmd, "verif-WRF-"+exp.expname, + self.cluster.run_job(cmd, "verif-WRF-"+self.exp.expname, cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "15", "ntasks-per-node": "15", "ntasks-per-core": "1", "mem": "180G"}, depends_on=[depends_on]) def verify_fast(self, depends_on=None): - cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_fast/plot_single_exp.py '+exp.expname + cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_fast/plot_single_exp.py '+self.exp.expname - self.cluster.run_job(cmd, "verif-fast-"+exp.expname, + self.cluster.run_job(cmd, "verif-fast-"+self.exp.expname, cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1", "ntasks-per-node": "1", "ntasks-per-core": "1"}, depends_on=[depends_on]) -- GitLab