diff --git a/dartwrf/assim_synth_obs.py b/dartwrf/assimilate.py similarity index 82% rename from dartwrf/assim_synth_obs.py rename to dartwrf/assimilate.py index f2b7a2ea846258e9ba25a190cc151e4072059a0f..33b3264fb7693089d2be9cd894fe14080f398946 100755 --- a/dartwrf/assim_synth_obs.py +++ b/dartwrf/assimilate.py @@ -15,15 +15,14 @@ from dartwrf.server_config import cluster wrfout_format = '/wrfout_d01_%Y-%m-%d_%H:%M:%S' # WRF file format, will only change if WRF changes -pattern_init_time = "/%Y-%m-%d_%H:%M/" # pattern for the init_timme folder in sim_archive -pattern_obs_seq_final = "/%Y-%m-%d_%H:%M:%S_obs_seq.final" # how an obs_seq.final file is archived -pattern_obs_seq_out = "/%Y-%m-%d_%H:%M:%S_obs_seq.out" # how an obs_seq.out file is archived - +pattern_init_time = "/%Y-%m-%d_%H:%M/" # pattern for the init_time folder in sim_archive +pattern_obs_seq_out = cluster.archivedir + "/diagnostics/%Y-%m-%d_%H:%M_obs_seq.out" # how an obs_seq.out file is archived +pattern_obs_seq_final = cluster.archivedir + "/diagnostics/%Y-%m-%d_%H:%M_obs_seq.final" # how an obs_seq.final file is archived def _prepare_DART_grid_template(): # DART needs a wrfinput file as a template for the grid # No data will be read from this file, but the grid information must match exactly. - symlink(cluster.dart_rundir + "/prior_ens1/wrfout_d01", + symlink(cluster.dart_rundir + "/wrfout_d01", cluster.dart_rundir + "/wrfinput_d01") def _find_nature(time): @@ -45,10 +44,11 @@ def prepare_nature_dart(time): Args: time (dt.datetime): Time at which observations will be made """ + print("prepare nature") try: f_nat = _find_nature(time) except: - print('-> no nature available') + print('-> no nature available') # this is OK if it is a real data case or observations already exist return shutil.copy(f_nat, cluster.dart_rundir + "/wrfout_d01") # copy nature wrfout to DART directory @@ -66,7 +66,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ - writes txt files so DART knows what input and output is - removes probably pre-existing files which could lead to problems """ - print("prepare prior state estimate") + print("prepare prior ensemble") for iens in range(1, exp.n_ens + 1): print("link wrfout file to DART background file") @@ -117,14 +117,19 @@ def use_linked_files_as_prior(): files.append("./prior_ens" + str(iens) + "/wrfout_d01") write_txt(files, cluster.dart_rundir+'/input_list.txt') -def use_filter_output_as_prior(assim_time): - """Use posterior as input for DART, e.g. to evaluate the analysis in observation space +def use_filter_output_as_prior(): + """Use the last posterior as input for DART, e.g. to evaluate the analysis in observation space """ - filedir = cluster.archivedir+assim_time.strftime(pattern_init_time+"/assim_stage0/") - files = [] for iens in range(1, exp.n_ens+1): - files.append(filedir+'filter_restart_d01.'+str(iens).zfill(4)) + f_new = cluster.dart_rundir+'/prior_ens'+str(iens)+'/wrfout_d01' + try: + os.remove(f_new) + except: + pass + os.rename(cluster.dart_rundir+'/filter_restart_d01.'+str(iens).zfill(4), f_new) + files.append(f_new) + write_txt(files, cluster.dart_rundir+'/input_list.txt') def write_list_of_outputfiles(): @@ -135,6 +140,15 @@ def write_list_of_outputfiles(): def filter(nproc=12): + """Calls DART ./filter program + + Args: + nproc (int): number of cores for use in ./filter call + + Returns: + None (writes to file) + """ + nproc = min(nproc, cluster.max_nproc) _prepare_DART_grid_template() print("time now", dt.datetime.now()) @@ -161,38 +175,31 @@ def filter(nproc=12): def archive_filteroutput(time): """Archive filter output files (filter_restart, preassim, postassim, output_mean, output_sd) """ - archive_dir = cluster.archivedir + "/obs_seq_final/" - mkdir(archive_dir) - - # copy obs_seq.final to archive - fout = archive_dir + time.strftime(pattern_obs_seq_final) - copy(cluster.dart_rundir + "/obs_seq.final", fout) - print(fout, "saved.") - - archive_assim = cluster.archivedir + time.strftime(pattern_init_time+"/assim_stage0/") - mkdir(archive_assim) + # archive diagnostics + dir_out = cluster.archivedir + time.strftime(pattern_init_time) + os.makedirs(dir_out, exist_ok=True) # copy input.nml to archive - copy(cluster.dart_rundir + "/input.nml", archive_assim + "/input.nml") + copy(cluster.dart_rundir + "/input.nml", dir_out + "/input.nml") # copy filter_restart files to archive (initial condition for next run) for iens in range(1, exp.n_ens + 1): # single members copy( cluster.dart_rundir + "/filter_restart_d01." + str(iens).zfill(4), - archive_assim + "/filter_restart_d01." + str(iens).zfill(4), + dir_out + "/filter_restart_d01." + str(iens).zfill(4), ) # copy preassim/postassim files to archive (not necessary for next forecast run) try: + for f in ["output_mean.nc", "output_sd.nc"]: # copy mean and sd to archive + copy(cluster.dart_rundir + "/" + f, dir_out + "/" + f) + ftypes = ['preassim', 'postassim'] for ftype in ftypes: for iens in range(1, exp.n_ens + 1): fname = "/"+ftype+"_member_" + str(iens).zfill(4) + ".nc" - copy(cluster.dart_rundir + fname, archive_assim + fname) - - for f in ["output_mean.nc", "output_sd.nc"]: # copy mean and sd to archive - copy(cluster.dart_rundir + "/" + f, archive_assim + "/" + f) - + copy(cluster.dart_rundir + fname, dir_out + fname) + except Exception as e: warnings.warn(str(e)) @@ -245,7 +252,11 @@ def set_obserr_assimilate_in_obsseqout(oso, outfile="./obs_seq.out"): where_oso_iskind = oso.df.kind == kind if obscfg["error_assimilate"] == False: - osf_prior = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.final") # this file will be generated by `evaluate()` + f_osf = cluster.dart_rundir + "/obs_seq.final" + if not os.path.isfile(f_osf): + evaluate(time, f_out_pattern=f_osf) + + osf_prior = obsseq.ObsSeq(f_osf) # this file was generated by `evaluate()` where_osf_iskind = osf_prior.df.kind == kind @@ -318,8 +329,8 @@ def qc_obs(time, oso): print('QC removed', n_obs_orig-len(oso.df), 'observations') # archive obs_seq.out before QC (contains all observations, including later removed ones) - f_out_archive = cluster.archivedir + "/obs_seq_out/" + time.strftime(pattern_obs_seq_out+"-beforeQC") - os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True) + f_out_archive = time.strftime(pattern_obs_seq_out)+"-beforeQC" + os.makedirs(os.path.dirname(f_out_archive), exist_ok=True) copy(cluster.dart_rundir + "/obs_seq.out", f_out_archive) # for assimilation later @@ -328,10 +339,10 @@ def qc_obs(time, oso): print('saved', f_out_dart) -def evaluate(assim_time, +def evaluate(assim_time, obs_seq_out=False, prior_is_filter_output=False, - output_format=pattern_obs_seq_final+"-evaluate", + f_out_pattern=pattern_obs_seq_final+"-evaluate", nproc=12): """Calculates either prior or posterior obs space values. @@ -342,10 +353,12 @@ def evaluate(assim_time, Args: assim_time (datetime): time of assimilation obs_seq_out (str, optional): use the argument as obs_seq.out file, defaults to use the existing obs_seq.out file - output_format (str, optional): format string for output filename, default is `"%Y-%m-%d_%H:%M_obs_seq.final-eval_posterior_allobs"` + at these observations, the posterior will be evaluated + f_out_pattern (str, mandatory): output filename + prior_is_filter_output (bool, optional): if True, use the filter output as prior, else use already linked ensemble files - Returns: - obsseq.ObsSeq + Returns + None (writes file) """ os.makedirs(cluster.dart_rundir, exist_ok=True) # create directory to run DART in os.chdir(cluster.dart_rundir) @@ -355,34 +368,42 @@ def evaluate(assim_time, # remove any existing observation files os.system("rm -f input.nml obs_seq.final") - print("prepare nature") + # TODO: Maybe only necessary if there is no obs_seq.out file yet? prepare_nature_dart(assim_time) # link WRF files to DART directory if prior_is_filter_output: - print('using filter_restart files from last assimilation as prior') - use_filter_output_as_prior(assim_time) + print('using filter_restart files in run_DART as prior') + use_filter_output_as_prior() else: print('using files linked to `run_DART/<exp>/prior_ens*/wrfout_d01` as prior') use_linked_files_as_prior() - if obs_seq_out: - copy(obs_seq_out, cluster.dart_rundir+'/obs_seq.out') + # the observations at which to evaluate the prior at + if obs_seq_out: + copy(obs_seq_out, cluster.dart_rundir+'/obs_seq.out') # user defined file else: - # use existing obs_seq.out file + # use existing obs_seq.out file currently present in the run_DART directory if not os.path.isfile(cluster.dart_rundir+'/obs_seq.out'): raise RuntimeError(cluster.dart_rundir+'/obs_seq.out does not exist') dart_nml.write_namelist(just_prior_values=True) filter(nproc=nproc) + archive_filter_diagnostics(assim_time, f_out_pattern) + - # archiving - fout = cluster.archivedir + "/obs_seq_final/" + assim_time.strftime(output_format) - os.makedirs(cluster.archivedir + "/obs_seq_final/", exist_ok=True) - copy(cluster.dart_rundir + "/obs_seq.final", fout) - print(fout, "saved.") +def archive_filter_diagnostics(time, f_out_pattern): + f_archive = time.strftime(f_out_pattern) + + dir_out = os.path.dirname(f_archive) + os.makedirs(dir_out, exist_ok=True) + copy(cluster.dart_rundir + "/obs_seq.final", f_archive) + print(f_archive, "saved.") def get_obsseq_out(time): """Prepares an obs_seq.out file in the run_DART folder + If `exp.use_existing_obsseq` points to an existing file, then this is used. + Otherwise, the observations are generated and an `obs_seq.out` file is created. + An `obs_seq.out` file is always archived. Args: time (datetime): time of assimilation @@ -393,18 +414,19 @@ def get_obsseq_out(time): if exp.use_existing_obsseq != False: # use an existing obs_seq.out file f_obsseq = time.strftime(exp.use_existing_obsseq) - copy(f_obsseq, cluster.dart_rundir+'/obs_seq.out') # copy to run_DART folder + copy(f_obsseq, cluster.dart_rundir+'/obs_seq.out') # copy to run_DART folder print(f_obsseq, 'copied to', cluster.dart_rundir+'/obs_seq.out') - # copy to sim_archive - os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True) - copy(f_obsseq, time.strftime(cluster.archivedir+'/obs_seq_out/'+pattern_obs_seq_out)) - oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out") # read the obs_seq.out file else: # do NOT use an existing obs_seq.out file # but generate observations with new observation noise oso = osq_out.generate_obsseq_out(time) + + # copy to sim_archive + f_obsseq_archive = time.strftime(pattern_obs_seq_out) + os.makedirs(os.path.dirname(f_obsseq_archive), exist_ok=True) + copy(cluster.dart_rundir+'/obs_seq.out', f_obsseq_archive) return oso def prepare_inflation_2(time, prior_init_time): @@ -417,7 +439,7 @@ def prepare_inflation_2(time, prior_init_time): time (datetime): time of assimilation prior_init_time (datetime): time of prior assimilation """ - dir_priorinf = cluster.archivedir + prior_init_time.strftime(pattern_init_time+"/assim_stage0/") + dir_priorinf = cluster.archivedir + prior_init_time.strftime(pattern_init_time) f_default = cluster.archive_base + "/input_priorinf_mean.nc" f_prior = dir_priorinf + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc") @@ -442,7 +464,7 @@ def prepare_inflation_2(time, prior_init_time): copy(f_default, f_new) def archive_inflation_2(time): - dir_output = cluster.archivedir + time.strftime(pattern_init_time+"/assim_stage0/") + dir_output = cluster.archivedir + time.strftime(pattern_init_time) os.makedirs(dir_output, exist_ok=True) f_output = cluster.dart_rundir + '/output_priorinf_sd.nc' @@ -490,10 +512,11 @@ def link_DART_binaries_and_RTTOV_files(): print('prepared DART & RTTOV links in', cluster.dart_rundir) except Exception as e: - if any(['sat_channel' in obscfg for obscfg in exp.observations]): - raise e - else: - pass # we dont need RTTOV anyway + # will any satellite channel be assimilated? + for obscfg in exp.observations: + if 'sat_channel' in obscfg: + raise e + pass # we dont need RTTOV anyway def prepare_run_DART_folder(): os.makedirs(cluster.dart_rundir, exist_ok=True) # create directory to run DART in @@ -513,8 +536,10 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp): Workflow: 1) prepare nature run & prior ensemble for DART 2) create obs_seq.in - 3) create obs from nature (obs_seq.out) with user-defined obs-errors + 3) create obs from nature (obs_seq.out) or use existing one + optional: quality-control 4) Assimilate with assigned errors + 5) Evaluate posterior (after DART clamping, e.g. setting negative cloud water to zero) Args: assim_time (dt.datetime): time of output @@ -525,29 +550,23 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp): Returns: None """ - nproc = cluster.max_nproc - do_QC = getattr(exp, "do_quality_control", False) # True: triggers additional evaluations of prior & posterior - - # for which observation type do we have a parametrized observation error? - error_is_parametrized = [obscfg["error_assimilate"] == False for obscfg in exp.observations] + do_QC = getattr(exp, "do_quality_control", False) # True: triggers additional evaluations of prior & posterior prepare_run_DART_folder() - nml = dart_nml.write_namelist() - prior_inflation_type = nml['&filter_nml']['inf_flavor'][0][0] + prepare_nature_dart(time) - print("prepare nature") - prepare_nature_dart(time) # link WRF files to DART directory + # from previous forecast + prepare_prior_ensemble(time, prior_init_time, prior_valid_time, prior_path_exp) - print("prepare prior ensemble") - prepare_prior_ensemble(time, prior_init_time, prior_valid_time, prior_path_exp) - + obscfg = exp.observations + nml = dart_nml.write_namelist() + print(" get observations with specified obs-error") oso = get_obsseq_out(time) - # is any observation error parametrized? - if any(error_is_parametrized) or do_QC: + if do_QC: # additional evaluation of prior print(" (optional) evaluate prior for all observations (incl rejected) ") - evaluate(time, output_format=pattern_obs_seq_final+"-evaluate_prior") + evaluate(time, f_out_pattern=pattern_obs_seq_final+"-evaluate_prior") print(" assign observation-errors for assimilation ") set_obserr_assimilate_in_obsseqout(oso, outfile=cluster.dart_rundir + "/obs_seq.out") @@ -556,31 +575,34 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp): print(" reject observations? ") qc_obs(time, oso) + prior_inflation_type = nml['&filter_nml']['inf_flavor'][0][0] if prior_inflation_type == '2': prepare_inflation_2(time, prior_init_time) print(" run filter ") dart_nml.write_namelist() - filter(nproc=nproc) + filter() archive_filteroutput(time) + archive_filter_diagnostics(time, pattern_obs_seq_final) if prior_inflation_type == '2': archive_inflation_2(time) print(" evaluate posterior in observation-space") - f_oso = pattern_obs_seq_out + f_oso = time.strftime(pattern_obs_seq_out) if do_QC: f_oso += '-beforeQC' # includes all observations (including rejected ones in qc_obs()) # evaluate() separately after ./filter is crucial when assimilating cloud variables # as the obs_seq.final returned by ./filter was produced using un-clamped cloud variables - evaluate(time, obs_seq_out=cluster.archivedir+'/obs_seq_out/'+time.strftime(f_oso), - prior_is_filter_output=True, - output_format=pattern_obs_seq_final+"-evaluate") + evaluate(time, + obs_seq_out=f_oso, + prior_is_filter_output=True, + f_out_pattern=pattern_obs_seq_final+"-evaluate") if __name__ == "__main__": - """Assimilate synthetic observations + """Assimilate observations Example: python assim_synth_obs.py 2008-08-07_13:00 2008-08_12:00 2008-08-07_13:00 /path/to/experiment/ diff --git a/dartwrf/dart_nml.py b/dartwrf/dart_nml.py index f541930df220ac3a01aaabbede7f4a01e8b55afc..39a4951ca690166a97a25c6bce89ebecb74dea7c 100644 --- a/dartwrf/dart_nml.py +++ b/dartwrf/dart_nml.py @@ -281,7 +281,7 @@ def write_namelist(just_prior_values=False): vert_norm_obs_types, vert_norm_heights, vert_norm_scale_heights, vert_norm_levels, vert_norm_pressures = _get_vertical_localization() - nml = read_namelist(cluster.dart_srcdir + "/input.nml") + nml = read_namelist(cluster.dart_srcdir + "/input.nml") # default compilation namelist n_obstypes = len(list_obstypes_all) if n_obstypes > 0: diff --git a/dartwrf/obs/create_obsseq_in.py b/dartwrf/obs/create_obsseq_in.py index ca990517a3bd7eb52e5e9b7c28240a509af508c3..6b64816e0ca76540abd7f9b23833962fee577041 100755 --- a/dartwrf/obs/create_obsseq_in.py +++ b/dartwrf/obs/create_obsseq_in.py @@ -73,7 +73,24 @@ def _write_file(msg, output_path='./'): def _append_hgt_to_coords(coords, heights): + """Adds vertical position to list of coordinates + + if heights is a scalar, then all obs have the same height + if heights is a list, then you get multiple levels + + Args: + coords (list of 2-tuples): (lat, lon) in degrees north/east + heights (float or list of float): height in meters + + Returns: + list of 3-tuples + """ coords2 = [] + try: + len(heights) # fails with scalar + except TypeError: + heights = [heights, ] + for i in range(len(coords)): for hgt_m in heights: coords2.append(coords[i] + (hgt_m,)) @@ -112,6 +129,11 @@ def _determine_vert_coords(sat_channel, kind, obscfg): else: vert_coord_sys = "-2" # undefined height vert_coords = ["-888888.0000000000", ] + + if 'height' in obscfg: + # hypothetical height, only used for localization + vert_coord_sys = "3" # meters AMSL + vert_coords = obscfg['height'] return vert_coord_sys, vert_coords diff --git a/dartwrf/obs/create_obsseq_out.py b/dartwrf/obs/create_obsseq_out.py index be91eb45dc62fa1424bf3a11da47f16d2cdcb55f..2ec5040d243bd86fa91a6b9525bd7d4344ecfc61 100644 --- a/dartwrf/obs/create_obsseq_out.py +++ b/dartwrf/obs/create_obsseq_out.py @@ -3,7 +3,7 @@ import os, shutil, warnings from dartwrf.utils import try_remove, print, shell, symlink import dartwrf.obs.create_obsseq_in as osi from dartwrf.obs import obsseq -from dartwrf import assim_synth_obs as aso +from dartwrf import assimilate as aso from dartwrf.exp_config import exp from dartwrf.server_config import cluster @@ -20,19 +20,23 @@ def _prepare_DART_grid_template(): symlink(cluster.dart_rundir + "/wrfout_d01", cluster.dart_rundir + "/wrfinput_d01") -def generate_obsseq_out(time): +def generate_obsseq_out(time, nproc=12): """Generate an obs_seq.out file from the current experiment Expects an existing nature file in the cluster.dart_rundir Args: time (datetime): time of the observations + nproc (int, optional): number of cores for call to ./perfect_model_obs Returns: obsseq.ObsSeq: obs_seq.out representation """ - def ensure_physical_vis(oso): # set reflectance < surface albedo to surface albedo - print(" 2.2) removing obs below surface albedo ") + def _ensure_physical_vis(oso): # set reflectance < surface albedo to surface albedo + """ Set values smaller than surface albedo to surface albedo + Highly hacky. Your albedo might be different. + """ + print(" removing obs below surface albedo ") clearsky_albedo = 0.2928 if_vis_obs = oso.df['kind'].values == 262 @@ -42,41 +46,30 @@ def generate_obsseq_out(time): return oso - def apply_superobbing(oso): + def _apply_superobbing(oso): try: - f_oso = dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M:%S_obs_seq.out-before_superob") - shutil.copy(cluster.dart_rundir + "/obs_seq.out-before_superob", f_oso) + f_oso = aso.pattern_obs_seq_out + '-before_superob' + shutil.copy(cluster.dart_rundir + "/obs_seq.out", f_oso) print('saved', f_oso) except Exception as e: warnings.warn(str(e)) - print(" 2.3) superobbing to", exp.superob_km, "km") + print(" superobbing to", exp.superob_km, "km") oso.df = oso.df.superob(window_km=exp.superob_km) oso.to_dart(f=cluster.dart_rundir + "/obs_seq.out") + return oso - - ############################## - - dir_obsseq=cluster.archivedir + "/obs_seq_out/" - os.makedirs(dir_obsseq, exist_ok=True) - - osi.create_obs_seq_in(time, exp.observations) + osi.create_obs_seq_in(time, exp.observations) # create obs definition file _prepare_DART_grid_template() - run_perfect_model_obs() # generate observation, draws from gaussian + run_perfect_model_obs(nproc=nproc) # generate observation, draws from gaussian - print(" 2.1) obs preprocessing") oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out") - oso = ensure_physical_vis(oso) + oso = _ensure_physical_vis(oso) if getattr(exp, "superob_km", False): - oso = apply_superobbing(oso) - - # archive complete obsseqout - f_oso = dir_obsseq + time.strftime(aso.pattern_obs_seq_out) - shutil.copy(cluster.dart_rundir + "/obs_seq.out", f_oso) - print('saved', f_oso) + oso = _apply_superobbing(oso) return oso def run_perfect_model_obs(nproc=12): @@ -90,17 +83,16 @@ def run_perfect_model_obs(nproc=12): """ print("running ./perfect_model_obs") os.chdir(cluster.dart_rundir) + nproc = min(nproc, cluster.max_nproc) try_remove(cluster.dart_rundir + "/obs_seq.out") if not os.path.exists(cluster.dart_rundir + "/obs_seq.in"): raise RuntimeError("obs_seq.in does not exist in " + cluster.dart_rundir) shell(cluster.dart_modules+' mpirun -np '+str(nproc)+" ./perfect_model_obs > log.perfect_model_obs") if not os.path.exists(cluster.dart_rundir + "/obs_seq.out"): - log_file_content = str(open(cluster.dart_rundir + "/log.perfect_model_obs",'r').read()) raise RuntimeError( "obs_seq.out does not exist in " + cluster.dart_rundir, - "\n probably perfect_model_obs failed, log file says:\n", - log_file_content) + "\n see "+cluster.dart_rundir + "/log.perfect_model_obs") if __name__ == '__main__': """Generate obs_seq.out files from an experiment @@ -123,7 +115,7 @@ if __name__ == '__main__': times = args.times.split(',') # before running perfect_model_obs, we need to set up the run_DART folder - from dartwrf import assim_synth_obs as aso + from dartwrf import assimilate as aso from dartwrf import dart_nml aso.prepare_run_DART_folder() nml = dart_nml.write_namelist() @@ -133,4 +125,4 @@ if __name__ == '__main__': time = dt.datetime.strptime(time, '%Y-%m-%d_%H:%M') aso.prepare_nature_dart(time) # link WRF files to DART directory - generate_obsseq_out(time) \ No newline at end of file + generate_obsseq_out(time) diff --git a/dartwrf/obs/obsseq.py b/dartwrf/obs/obsseq.py index b01c9ec6b75ff73577b2bb504a664d3154172cd2..e6147cb99cb5bcc3c59c88e15eaa45fa3cfeca9c 100755 --- a/dartwrf/obs/obsseq.py +++ b/dartwrf/obs/obsseq.py @@ -140,7 +140,7 @@ class ObsRecord(pd.DataFrame): Hx = self.iloc[:, keys_bool] # consistency check: compute mean over ens - compare with value from file - assert np.allclose(Hx.mean(axis=1), self[what+' ensemble mean']) + # assert np.allclose(Hx.mean(axis=1).values, self[what+' ensemble mean'].values, rtol=1e-6) return Hx.values @@ -619,6 +619,14 @@ class ObsSeq(object): Args: list_of_obsseq (list of ObsSeq()) + Example: + Combine two ObsSeq() objects + >>> oso1 = ObsSeq('path/to/obs_seq.out1') + >>> oso2 = ObsSeq('path/to/obs_seq.out2') + >>> oso_combi = oso1.append_obsseq([oso2,]) + + Returns: + ObsSeq() with combined data """ from dartwrf.obs.obskind import obs_kind_nrs # dictionary string => DART internal indices inverted_obs_kind_nrs = {v: k for k, v in obs_kind_nrs.items()} diff --git a/dartwrf/run_ens.jet.sh b/dartwrf/run_ens.jet.sh index 52aa2fc9d2acb60c53bcefa2fa65cd93e69cbbb0..de2123472fcfd01b5d447b8bba015f6e069a2008 100755 --- a/dartwrf/run_ens.jet.sh +++ b/dartwrf/run_ens.jet.sh @@ -10,8 +10,8 @@ RUNDIR=$MAINDIR/$EXPNAME/$IENS echo "ENSEMBLE NR: "$IENS" in "$RUNDIR cd $RUNDIR rm -rf rsl.out.0* -echo "mpirun -np 10 ./wrf.exe" -mpirun -np 10 ./wrf.exe +echo "mpirun -np <exp.np_WRF> ./wrf.exe" +mpirun -np <exp.np_WRF> ./wrf.exe # error checking diff --git a/dartwrf/update_IC.py b/dartwrf/update_IC.py index 16d856a235850616a7d0a1431a1def54e65e9adb..4c66a472b13c8120f625b8bedc899869e4515713 100755 --- a/dartwrf/update_IC.py +++ b/dartwrf/update_IC.py @@ -1,9 +1,11 @@ +print('load imports') import os, sys, warnings import datetime as dt import netCDF4 as nc from dartwrf.exp_config import exp from dartwrf.server_config import cluster +print('loaded imports') def update_initials_in_WRF_rundir(time): """Updates wrfrst-files in `/run_WRF/` directory @@ -28,12 +30,14 @@ def update_initials_in_WRF_rundir(time): raise IOError(ic_file+' does not exist, updating impossible!') else: # overwrite DA updated variables - filter_out = cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/assim_stage0/filter_restart_d01.'+str(iens).zfill(4)) + + filter_out = cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/filter_restart_d01.'+str(iens).zfill(4)) with nc.Dataset(filter_out, 'r') as ds_filter: with nc.Dataset(ic_file, 'r+') as ds_new: - ds_new.variables['T'][:] = ds_filter.variables['THM'][:] + if 'T' in update_vars or 'THM' in update_vars: + ds_new.variables['T'][:] = ds_filter.variables['THM'][:] # update all other variables for var in update_vars: diff --git a/dartwrf/utils.py b/dartwrf/utils.py index 53d89039c8988ca0a18bbfedf9f574173f5631cc..a74aeed0b1587058bd3ed48060ec4806089d6abd 100755 --- a/dartwrf/utils.py +++ b/dartwrf/utils.py @@ -4,8 +4,53 @@ import subprocess import datetime as dt import re, tempfile +# class Stage(object): +# """Collection of variables describing the assimilation stage""" +# def __init__(self, **kwargs): +# self.superob_km = False # False or int (spatial averaging of observations) +# self.use_existing_obsseq = False # False or pathname (use precomputed obs_seq.out files) +# self.__dict__.update(kwargs) + +# # raise ValueError if attributes are not set +# needed_attributes = ['observations', 'dart_nml',] +# for attr in needed_attributes: +# if not hasattr(self, attr): +# raise ValueError('Stage.'+attr+' is not set') + class Experiment(object): - """Collection of variables regarding the experiment configuration""" + """Collection of variables which define the experiment + + Attributes: + expname (str): Name of the experiment + model_dx (int): WRF grid spacing in meters + n_ens (int): Ensemble size + do_quality_control (bool): If True, activate "quality control" function in assim_synth_obs.py + + nature_wrfout_pattern (str): Path to the nature run, where we take observations from; + the path can contain wildcards (*,?), e.g. '/jetfs/exp1/*/1/wrfout_d01_%Y-%m-%d_%H:%M:%S' + input_profile (str): Path to WRF idealized input profiles; + e.g. '/data/initial_profiles/wrf/ens/raso.fc.<iens>.wrfprof'; + <iens> is replaced by 001-040 for a 40-member ensemble + + update_vars (list of str): Variables which will be updated after assimilation (update_IC.py) + e.g. ['U', 'V', 'W', 'THM', 'PH', 'MU', 'QVAPOR',] + + observations (list of dict): Dictionaries which define an observation; + keys: + `error_generate`: measurement error standard-deviation; + `error_assimilate`: assigned observation error std-dev; + `heights`: list of integers at which observations are taken; + `loc_horiz_km`: float of horizontal localization half-width in km; + `loc_vert_km`: float of vertical localization half-width in km; + + use_existing_obsseq (str, False): Path to existing obs_seq.out file (False: generate new one); + time string is replaced by actual time: /path/%Y-%m-%d_%H:%M_obs_seq.out + + dart_nml (dict): updates to the default input.nml of DART (in dart_srcdir) + keys are namelist section headers (e.g. &filter_nml) + values are dictionaries of parameters and values (e.g. dict(ens_size=exp.n_ens,)) + + """ def __init__(self): pass @@ -22,8 +67,9 @@ class ClusterConfig(object): Attributes: name (str): Name of the cluster max_nproc (int): Maximum number of processors that can be used + np_WRF (int): Number of cores for WRF (mpirun -np xx ./wrf.exe) use_slurm (bool): If True, use SLURM to submit jobs - size_jobarray (int): Size of SLURM job array for running the WRF ensemble + size_WRF_jobarray (int): Size of SLURM job array for running the WRF ensemble python (str): Path to python executable python_verif (str): Path to python executable for verification @@ -31,8 +77,8 @@ class ClusterConfig(object): ideal (str): Path to ideal.exe wrfexe (str): Path to wrf.exe - dart_modules (str): Modules to load for DART - wrf_modules (str): Modules to load for WRF + dart_modules (str): Command to load modules before running DART + wrf_modules (str): Command to load modules before running WRF wrf_rundir_base (str): Path to temporary files for WRF dart_rundir_base (str): Path to temporary files for DART @@ -41,20 +87,17 @@ class ClusterConfig(object): srcdir (str): Path to where WRF has been compiled, including the 'run' folder of WRF, e.g. /home/WRF-4.3/run dart_srcdir (str): Path to DART compile directory, e.g. /home/DART-9.11.9/models/wrf/work rttov_srcdir (str): Path to RTTOV compile directory, e.g. /home/RTTOV13/rtcoef_rttov13/ - scriptsdir (str): Path where DART-WRF scripts reside, e.g. /home/DART-WRF/scripts + dartwrf_dir (str): Path where DART-WRF scripts reside, e.g. /home/DART-WRF/ + geo_em_for_WRF_ideal (str, False): Path to the geo_em.d01.nc file for idealized nature runs + obs_impact_filename namelist (str): Path to a WRF namelist template; strings like <hist_interval>, will be overwritten in scripts/prepare_namelist.py run_WRF (str): Path to script which runs WRF on a node of the cluster - overwrite_coordinates_with_geo_em (bool): If WRF ideal: path to NetCDF file of WRF domain (see WRF guide) - if WRF real: set to False - obs_impact_filename (str): Path to obs_impact_filename (see DART guide; module assim_tools_mod and program obs_impact_tool) - slurm_cfg (dict): python dictionary, containing options of SLURM - defined in SLURM docs (https://slurm.schedmd.com/sbatch.html) - this configuration can be overwritten later on, for example: - $ cfg_update = {"nodes": "2"} - $ new_config = dict(cluster.slurm_cfg, **cfg_update) + slurm_cfg (dict): Dictionary containing the default configuration of SLURM + as defined in SLURM docs (https://slurm.schedmd.com/sbatch.html). + This configuration can be customized for any job (e.g. in workflows.py) """ def __init__(self, exp): @@ -138,6 +181,8 @@ def print(*args): __builtin__.print(*args, flush=True) def copy(src, dst, remove_if_exists=True): + if src == dst: + return # the link already exists, nothing to do if remove_if_exists: try: os.remove(dst) diff --git a/dartwrf/workflows.py b/dartwrf/workflows.py index 79b1eba8a18015c07420468d564ffdd03b126a04..5283010a04f48e3d3cec69ddf1bcbd13987b4b6a 100644 --- a/dartwrf/workflows.py +++ b/dartwrf/workflows.py @@ -218,7 +218,7 @@ class WorkFlows(object): begin (datetime): start time of the forecast end (datetime): end time of the forecast depends_on (str, optional): job ID of a previous job after which to run this job - first_minutes (bool, optional): if True, get wrfout of first 5 minutes every minute + first_minutes (bool, optional): if True, get wrfout of first 2 minutes input_is_restart (bool, optional): if True, start WRF from WRFrst file (restart mode) output_restart_interval (int, optional): interval in minutes between output of WRFrst files hist_interval (float, optional): interval in minutes between output of WRF history files @@ -270,10 +270,11 @@ class WorkFlows(object): time_in_simulation_hours = (end-begin).total_seconds()/3600 runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9) # usually below 9 min/hour - id = self.cluster.run_job(wrf_cmd, "WRF-"+self.exp.expname, - cfg_update={"array": "1-"+str(self.cluster.size_WRF_jobarray), "ntasks": "10", "nodes": "1", - "time": str(runtime_wallclock_mins_expected), "mem": "40G", }, - depends_on=[id]) + cfg_update = {"array": "1-"+str(self.cluster.size_WRF_jobarray), "ntasks": "10", "nodes": "1", + "time": str(runtime_wallclock_mins_expected), "mem": "40G", } + #if runtime_wallclock_mins_expected > 10: + # cfg_update.update({"nodelist": "jet08"}) + id = self.cluster.run_job(wrf_cmd, "WRF-"+self.exp.expname, cfg_update=cfg_update, depends_on=[id]) return id @@ -292,7 +293,7 @@ class WorkFlows(object): if not os.path.exists(prior_path_exp): raise IOError('prior_path_exp does not exist: '+prior_path_exp) - cmd = (self.cluster.python+' '+self.cluster.scripts_rundir+'/assim_synth_obs.py ' + cmd = (self.cluster.python+' '+self.cluster.scripts_rundir+'/assimilate.py ' +assim_time.strftime('%Y-%m-%d_%H:%M ') +prior_init_time.strftime('%Y-%m-%d_%H:%M ') +prior_valid_time.strftime('%Y-%m-%d_%H:%M ') @@ -327,7 +328,7 @@ class WorkFlows(object): def create_satimages(self, init_time, depends_on=None): """Run a job array, one job per ensemble member, to create satellite images""" - cmd = 'module purge; module load netcdf-fortran/4.5.3-gcc-8.5.0-qsqbozc; ' \ + cmd = 'module purge; module load rttov; ' \ + 'python ~/RTTOV-WRF/run_init.py '+self.cluster.archivedir+init_time.strftime('/%Y-%m-%d_%H:%M/ ') \ + '$SLURM_ARRAY_TASK_ID' id = self.cluster.run_job(cmd, "RTTOV-"+self.exp.expname, @@ -352,23 +353,15 @@ class WorkFlows(object): return id def verify_sat(self, depends_on=None): - cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+self.exp.expname+' has_node sat verif1d FSS BS' + cmd = '/jetfs/home/lkugler/miniforge3/envs/verif/bin/python /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+self.exp.expname+' has_node sat verif1d FSS BS' self.cluster.run_job(cmd, "verif-SAT-"+self.exp.expname, cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "15", "ntasks-per-node": "15", "ntasks-per-core": "1", "mem": "100G",}, depends_on=[depends_on]) def verify_wrf(self, depends_on=None): - cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+self.exp.expname+' has_node wrf verif1d verif3d FSS BS' + cmd = '/jetfs/home/lkugler/miniforge3/envs/verif/bin/python /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+self.exp.expname+' has_node wrf verif1d FSS BS' self.cluster.run_job(cmd, "verif-WRF-"+self.exp.expname, cfg_update={"time": "180", "mail-type": "FAIL,END", "ntasks": "21", "ntasks-per-node": "21", "ntasks-per-core": "1", "mem": "230G"}, depends_on=[depends_on]) - - def verify_fast(self, depends_on=None): - cmd = self.cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_fast/plot_single_exp.py '+self.exp.expname - - self.cluster.run_job(cmd, "verif-fast-"+self.exp.expname, - cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1", - "ntasks-per-node": "1", "ntasks-per-core": "1"}, depends_on=[depends_on]) -