diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..e0a66811123dee4def21f9db91477ab99bf7ce82 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +*.pyc +slurm-scripts/ +logs/ +dask-worker-space/ +DART_WRF.egg* +build* \ No newline at end of file diff --git a/config/cfg.py b/config/cfg.py index 9664c3857227bea26a84ade551fe2203be6c16da..762aa68258fc45e9899ec680ce7ccebfa3ee3aae 100755 --- a/config/cfg.py +++ b/config/cfg.py @@ -7,7 +7,7 @@ class ExperimentConfiguration(object): pass exp = ExperimentConfiguration() -exp.expname = "exp_v1.19_wb-random_Radar" +exp.expname = "exp_v1.19_wb-random_WV_superob2-12" exp.model_dx = 2000 exp.n_ens = 40 exp.n_nodes = 10 @@ -23,8 +23,9 @@ exp.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/2021-05-04/ # needs a horizontal scale too, to calculate the vertical normalization # since you can not specify different vertical localizations for diff. variables exp.cov_loc_vert_km_horiz_km = (1, 30) +exp.superob_km = 12 -n_obs = 961 #121: 30km, 256:16x16 (20km); 961: 10km resoltn # radar: n_obs for each observation height level +n_obs = 22801 #22801: 2km, 121: 30km, 256:16x16 (20km); 961: 10km resoltn # radar: n_obs for each observation height level vis = dict(plotname='VIS 0.6µm', plotunits='[1]', kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs, @@ -64,7 +65,7 @@ psfc = dict(plotname='SYNOP Pressure', plotunits='[dBz]', cov_loc_radius_km=32) -exp.observations = [radar] # 108, wv73, vis] +exp.observations = [wv73] # 108, wv73, vis] #exp.update_vars = ['T', 'QVAPOR', 'QCLOUD', 'QICE','CLDFRA'] exp.update_vars = ['U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'CLDFRA'] diff --git a/scripts/assim_synth_obs.py b/scripts/assim_synth_obs.py index 9a7db1d7b69ee54b19d9b95eabc9e21f8295a17f..3e70acb3cf6be7d40e3cf0954ba55a39958843dc 100755 --- a/scripts/assim_synth_obs.py +++ b/scripts/assim_synth_obs.py @@ -8,19 +8,22 @@ from config.cfg import exp, cluster from utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print import create_obsseq as osq import wrfout_add_geo +import obsseq earth_radius_km = 6370 # Kelvin, fit of Fig 7b, Harnisch 2016 -x_ci = [0, 5, 10.5, 13, 16] # average cloud impact -y_oe = [1, 4.5, 10, 12, 13] # adjusted observation error +x_ci = [0, 5, 10.5, 13, 16] # average cloud impact +y_oe = [1, 4.5, 10, 12, 13] # adjusted observation error oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True) + def oe_73(ci): if ci < 13: return oe_73_linear(ci) else: - return 13. + return 13.0 + def cloudimpact_73(bt_mod, bt_obs): """ @@ -29,187 +32,214 @@ def cloudimpact_73(bt_mod, bt_obs): biascor_obs = 0 bt_lim = 255 # Kelvin for 7.3 micron WV channel - ci_obs = max(0, bt_lim-(bt_obs - biascor_obs)) - ci_mod = max(0, bt_lim-bt_mod) - ci = (ci_obs+ci_mod)/2 + ci_obs = max(0, bt_lim - (bt_obs - biascor_obs)) + ci_mod = max(0, bt_lim - bt_mod) + ci = (ci_obs + ci_mod) / 2 return ci + def read_prior_obs(f_obs_prior): """ docstring """ - obsseq = open(f_obs_prior, 'r').readlines() + obsseq = open(f_obs_prior, "r").readlines() OBSs = [] # read observations from obs_seq.final for i, line in enumerate(obsseq): - if ' OBS ' in line: - observed = float(obsseq[i+1].strip()) - truth = float(obsseq[i+2].strip()) - prior_ensmean = float(obsseq[i+3].strip()) - prior_enssd = float(obsseq[i+4].strip()) + if " OBS " in line: + observed = float(obsseq[i + 1].strip()) + truth = float(obsseq[i + 2].strip()) + prior_ensmean = float(obsseq[i + 3].strip()) + prior_enssd = float(obsseq[i + 4].strip()) prior_ens = [] - for j in range(5, 5+exp.n_ens): - prior_ens.append(float(obsseq[i+j].strip())) + for j in range(5, 5 + exp.n_ens): + prior_ens.append(float(obsseq[i + j].strip())) - OBSs.append(dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens))) + OBSs.append( + dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens)) + ) return OBSs + def read_truth_obs_obsseq(f): """Reads observed and true values from obs_seq.out/final files.""" - obsseq = open(f, 'r').readlines() + obsseq = open(f, "r").readlines() true = [] obs = [] # read observations from obs_seq.out for i, line in enumerate(obsseq): - if ' OBS ' in line: - observed = float(obsseq[i+1].strip()) - truth = float(obsseq[i+2].strip()) + if " OBS " in line: + observed = float(obsseq[i + 1].strip()) + truth = float(obsseq[i + 2].strip()) true.append(truth) obs.append(observed) return true, obs + def replace_errors_obsseqout(f, new_errors): """Replaces existing observation errors in obs_seq.final files - + new_errors (np.array) : standard deviation, shape must match the number of observations """ debug = True - obsseq = open(f, 'r').readlines() + obsseq = open(f, "r").readlines() # find number of lines between two ' OBS ' lines: first_obs = None for i, line in enumerate(obsseq): - if ' OBS ' in line: + if " OBS " in line: if first_obs is None: first_obs = i else: second_obs = i break lines_between = second_obs - first_obs - lines_obserr_after_obsnr = lines_between - 1 # obserr line is one before ' OBS ' line + lines_obserr_after_obsnr = ( + lines_between - 1 + ) # obserr line is one before ' OBS ' line # replace values in list obsseq i_obs = 0 for i, line in enumerate(obsseq): - if ' OBS ' in line: - line_error_obs_i = i+lines_obserr_after_obsnr + if " OBS " in line: + line_error_obs_i = i + lines_obserr_after_obsnr previous_err_var = obsseq[line_error_obs_i] - new_err_obs_i = new_errors[i_obs]**2 # variance in obs_seq.out - if debug: - print(line.strip(), 'previous err var ', float(previous_err_var.strip()), 'new error', new_err_obs_i) - obsseq[line_error_obs_i] = ' '+str(new_err_obs_i)+' \n' + new_err_obs_i = new_errors[i_obs] ** 2 # variance in obs_seq.out + if debug: + print( + line.strip(), + "previous err var ", + float(previous_err_var.strip()), + "new error", + new_err_obs_i, + ) + obsseq[line_error_obs_i] = " " + str(new_err_obs_i) + " \n" i_obs += 1 # next iteration - with open(f, 'w') as file: + with open(f, "w") as file: for line in obsseq: file.write(line) - print('replaced obs errors in', f) + print("replaced obs errors in", f) def set_DART_nml(just_prior_values=False): - def to_radian_horizontal(cov_loc_horiz_km): - cov_loc_radian = cov_loc_horiz_km/earth_radius_km + cov_loc_radian = cov_loc_horiz_km / earth_radius_km return cov_loc_radian - + def to_vertical_normalization(cov_loc_vert_km, cov_loc_horiz_km): - vert_norm_rad = earth_radius_km*cov_loc_vert_km/cov_loc_horiz_km*1000 + vert_norm_rad = earth_radius_km * cov_loc_vert_km / cov_loc_horiz_km * 1000 return vert_norm_rad - list_obstypes = [obscfg['kind'] for obscfg in exp.observations] - list_cov_loc_radius_km = [obscfg['cov_loc_radius_km'] for obscfg in exp.observations] + list_obstypes = [obscfg["kind"] for obscfg in exp.observations] + list_cov_loc_radius_km = [ + obscfg["cov_loc_radius_km"] for obscfg in exp.observations + ] list_cov_loc_radian = [str(to_radian_horizontal(a)) for a in list_cov_loc_radius_km] - + if just_prior_values: - template = cluster.scriptsdir+'/../templates/input.eval.nml' + template = cluster.scriptsdir + "/../templates/input.eval.nml" else: - template = cluster.scriptsdir+'/../templates/input.nml' - copy(template, cluster.dartrundir+'/input.nml') + template = cluster.scriptsdir + "/../templates/input.nml" + copy(template, cluster.dartrundir + "/input.nml") # options keys are replaced in input.nml with values - options = {'<n_ens>': str(int(exp.n_ens)), - '<cov_loc_radian>': '0.00000001', # dummy value, used for types not mentioned below - '<list_obstypes>': "'" + "','".join(list_obstypes) + "'", - '<list_cutoffs>': ", ".join(list_cov_loc_radian), - } + options = { + "<n_ens>": str(int(exp.n_ens)), + "<cov_loc_radian>": "0.00000001", # dummy value, used for types not mentioned below + "<list_obstypes>": "'" + "','".join(list_obstypes) + "'", + "<list_cutoffs>": ", ".join(list_cov_loc_radian), + } # if cov_loc_vert_km: - options['<horiz_dist_only>'] = '.true.' - + options["<horiz_dist_only>"] = ".true." + cov_loc_vert_km, cov_loc_horiz_km = exp.cov_loc_vert_km_horiz_km vert_norm_hgt = to_vertical_normalization(cov_loc_vert_km, cov_loc_horiz_km) - options['<vert_norm_hgt>'] = str(vert_norm_hgt) + options["<vert_norm_hgt>"] = str(vert_norm_hgt) # else: # options['<horiz_dist_only>'] = '.true.' # options['<vert_norm_hgt>'] = '50000.0' # dummy value for key, value in options.items(): - sed_inplace(cluster.dartrundir+'/input.nml', key, value) + sed_inplace(cluster.dartrundir + "/input.nml", key, value) # input.nml for RTTOV - rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml' - append_file(cluster.dartrundir+'/input.nml', rttov_nml) + rttov_nml = cluster.scriptsdir + "/../templates/obs_def_rttov.VIS.nml" + append_file(cluster.dartrundir + "/input.nml", rttov_nml) def obs_operator_ensemble(): # assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01 - print('running obs operator on ensemble forecast') + print("running obs operator on ensemble forecast") os.chdir(cluster.dartrundir) list_ensemble_truths = [] t = time_module.time() - for iens in range(1, exp.n_ens+1): - print('observation operator for ens #'+str(iens)) + for iens in range(1, exp.n_ens + 1): + print("observation operator for ens #" + str(iens)) # ens members are already linked to advance_temp<i>/wrfout_d01 - copy(cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01', - cluster.dartrundir+'/wrfout_d01') + copy( + cluster.dartrundir + "/advance_temp" + str(iens) + "/wrfout_d01", + cluster.dartrundir + "/wrfout_d01", + ) # DART may need a wrfinput file as well, which serves as a template for dimension sizes - symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01') - + symlink( + cluster.dartrundir + "/wrfout_d01", cluster.dartrundir + "/wrfinput_d01" + ) # run perfect_model obs (forward operator) - os.system('mpirun -np 12 ./perfect_model_obs > /dev/null') + os.system("mpirun -np 12 ./perfect_model_obs > /dev/null") # truth values in obs_seq.out are H(x) values - true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') + true, _ = read_truth_obs_obsseq(cluster.dartrundir + "/obs_seq.out") list_ensemble_truths.append(true) - - print('obs operator ensemble took', int(time_module.time()-t), 'seconds') + + print("obs operator ensemble took", int(time_module.time() - t), "seconds") n_obs = len(list_ensemble_truths[0]) np_array = np.full((exp.n_ens, n_obs), np.nan) for i in range(exp.n_ens): np_array[i, :] = list_ensemble_truths[i] return np_array + def obs_operator_nature(time): - print('getting true values in obs space from nature run') + print("getting true values in obs space from nature run") prepare_nature_dart(time) run_perfect_model_obs() - true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') + true, _ = read_truth_obs_obsseq(cluster.dartrundir + "/obs_seq.out") return true def link_nature_to_dart_truth(time): # get wrfout_d01 from nature run - shutil.copy(time.strftime(exp.nature_wrfout), - cluster.dartrundir+'/wrfout_d01') + shutil.copy(time.strftime(exp.nature_wrfout), cluster.dartrundir + "/wrfout_d01") # DART may need a wrfinput file as well, which serves as a template for dimension sizes - symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01') - print('linked', time.strftime(exp.nature_wrfout), 'to', cluster.dartrundir+'/wrfout_d01') + symlink(cluster.dartrundir + "/wrfout_d01", cluster.dartrundir + "/wrfinput_d01") + print( + "linked", + time.strftime(exp.nature_wrfout), + "to", + cluster.dartrundir + "/wrfout_d01", + ) def prepare_nature_dart(time): - print('linking nature to DART & georeferencing') + print("linking nature to DART & georeferencing") link_nature_to_dart_truth(time) - wrfout_add_geo.run(cluster.dartrundir+'/../geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') + wrfout_add_geo.run( + cluster.dartrundir + "/../geo_em.d01.nc", cluster.dartrundir + "/wrfout_d01" + ) -def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_path_exp): - """Prepares DART files for running filter +def prepare_prior_ensemble( + assim_time, prior_init_time, prior_valid_time, prior_path_exp +): + """Prepares DART files for running filter i.e. - links first guess state to DART first guess filenames - creates wrfinput_d01 files @@ -217,54 +247,60 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ - writes txt files so DART knows what input and output is - removes probably pre-existing files which could lead to problems """ - print('prepare prior state estimate') - for iens in range(1, exp.n_ens+1): - print('link wrfout file to DART background file') - wrfout_run = prior_path_exp \ - +prior_init_time.strftime('/%Y-%m-%d_%H:%M/') \ - +str(iens) \ - +prior_valid_time.strftime('/wrfout_d01_%Y-%m-%d_%H:%M:%S') - dart_ensdir = cluster.dartrundir+'/advance_temp'+str(iens) - wrfout_dart = dart_ensdir+'/wrfout_d01' + print("prepare prior state estimate") + for iens in range(1, exp.n_ens + 1): + print("link wrfout file to DART background file") + wrfout_run = ( + prior_path_exp + + prior_init_time.strftime("/%Y-%m-%d_%H:%M/") + + str(iens) + + prior_valid_time.strftime("/wrfout_d01_%Y-%m-%d_%H:%M:%S") + ) + dart_ensdir = cluster.dartrundir + "/advance_temp" + str(iens) + wrfout_dart = dart_ensdir + "/wrfout_d01" os.makedirs(dart_ensdir, exist_ok=True) - print('copy', wrfout_run, 'to', wrfout_dart) + print("copy", wrfout_run, "to", wrfout_dart) copy(wrfout_run, wrfout_dart) - symlink(wrfout_dart, dart_ensdir+'/wrfinput_d01') + symlink(wrfout_dart, dart_ensdir + "/wrfinput_d01") # ensure prior time matches assim time (can be off intentionally) if assim_time != prior_valid_time: - print('overwriting time in prior from nature wrfout') - os.system(cluster.ncks+' -A -v XTIME,Times ' - +cluster.dartrundir+'/wrfout_d01 '+wrfout_dart) + print("overwriting time in prior from nature wrfout") + os.system( + cluster.ncks + + " -A -v XTIME,Times " + + cluster.dartrundir + + "/wrfout_d01 " + + wrfout_dart + ) # this seems to be necessary (else wrong level selection) - wrfout_add_geo.run(cluster.dartrundir+'/../geo_em.d01.nc', wrfout_dart) + wrfout_add_geo.run(cluster.dartrundir + "/../geo_em.d01.nc", wrfout_dart) - fpath = cluster.dartrundir+'/input_list.txt' - print('writing', fpath) + fpath = cluster.dartrundir + "/input_list.txt" + print("writing", fpath) try_remove(fpath) - with open(fpath, 'w') as f: - for iens in range(1, exp.n_ens+1): - f.write('./advance_temp'+str(iens)+'/wrfout_d01') - f.write('\n') + with open(fpath, "w") as f: + for iens in range(1, exp.n_ens + 1): + f.write("./advance_temp" + str(iens) + "/wrfout_d01") + f.write("\n") - fpath = cluster.dartrundir+'/output_list.txt' - print('writing', fpath) + fpath = cluster.dartrundir + "/output_list.txt" + print("writing", fpath) try_remove(fpath) - with open(fpath, 'w') as f: - for iens in range(1, exp.n_ens+1): - f.write('./filter_restart_d01.'+str(iens).zfill(4)) - f.write('\n') + with open(fpath, "w") as f: + for iens in range(1, exp.n_ens + 1): + f.write("./filter_restart_d01." + str(iens).zfill(4)) + f.write("\n") - - print('removing preassim and filter_restart') - os.system('rm -rf '+cluster.dartrundir+'/preassim_*') - os.system('rm -rf '+cluster.dartrundir+'/filter_restart*') - os.system('rm -rf '+cluster.dartrundir+'/output_mean*') - os.system('rm -rf '+cluster.dartrundir+'/output_sd*') - os.system('rm -rf '+cluster.dartrundir+'/perfect_output_*') - os.system('rm -rf '+cluster.dartrundir+'/obs_seq.fina*') + print("removing preassim and filter_restart") + os.system("rm -rf " + cluster.dartrundir + "/preassim_*") + os.system("rm -rf " + cluster.dartrundir + "/filter_restart*") + os.system("rm -rf " + cluster.dartrundir + "/output_mean*") + os.system("rm -rf " + cluster.dartrundir + "/output_sd*") + os.system("rm -rf " + cluster.dartrundir + "/perfect_output_*") + os.system("rm -rf " + cluster.dartrundir + "/obs_seq.fina*") def calc_obserr_WV73(Hx_nature, Hx_prior): @@ -279,30 +315,41 @@ def calc_obserr_WV73(Hx_nature, Hx_prior): mean_CI = np.mean(CIs) oe_nature = oe_73(mean_CI) - print('oe_nature:', oe_nature, ', bt_y:', bt_y, ', mean_CI:', mean_CI) + print("oe_nature:", oe_nature, ", bt_y:", bt_y, ", mean_CI:", mean_CI) OEs[iobs] = oe_nature return OEs + def run_perfect_model_obs(): - print('generating observations - running ./perfect_model_obs') + print("generating observations - running ./perfect_model_obs") os.chdir(cluster.dartrundir) - try_remove(cluster.dartrundir+'/obs_seq.out') - if not os.path.exists(cluster.dartrundir+'/obs_seq.in'): - raise RuntimeError('obs_seq.in does not exist in '+cluster.dartrundir) - os.system('mpirun -np 12 ./perfect_model_obs > log.perfect_model_obs') - if not os.path.exists(cluster.dartrundir+'/obs_seq.out'): - raise RuntimeError('obs_seq.out does not exist in '+cluster.dartrundir, - '\n look for '+cluster.dartrundir+'/log.perfect_model_obs') + try_remove(cluster.dartrundir + "/obs_seq.out") + if not os.path.exists(cluster.dartrundir + "/obs_seq.in"): + raise RuntimeError("obs_seq.in does not exist in " + cluster.dartrundir) + os.system("mpirun -np 12 ./perfect_model_obs > log.perfect_model_obs") + if not os.path.exists(cluster.dartrundir + "/obs_seq.out"): + raise RuntimeError( + "obs_seq.out does not exist in " + cluster.dartrundir, + "\n look for " + cluster.dartrundir + "/log.perfect_model_obs", + ) + def assimilate(nproc=96): - print('time now', dt.datetime.now()) - print('running filter') + print("time now", dt.datetime.now()) + print("running filter") os.chdir(cluster.dartrundir) - try_remove(cluster.dartrundir+'/obs_seq.final') + try_remove(cluster.dartrundir + "/obs_seq.final") t = time_module.time() - os.system('mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-'+str(int(nproc)-1)+' -np '+str(int(nproc))+' ./filter > log.filter') - print('./filter took', int(time_module.time()-t), 'seconds') + os.system( + "mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-" + + str(int(nproc) - 1) + + " -np " + + str(int(nproc)) + + " ./filter > log.filter" + ) + print("./filter took", int(time_module.time() - t), "seconds") + # currently unused # def recycle_output(): @@ -322,7 +369,7 @@ def assimilate(nproc=96): # if os.path.islink(dart_input): # l = os.readlink(dart_input) # os.remove(dart_input) -# copy(l, dart_input) +# copy(l, dart_input) # # print('move DART output to input: '+dart_output+' -> '+dart_input) # # os.rename(dart_output, dart_input) # probably doesnt work @@ -333,53 +380,72 @@ def assimilate(nproc=96): ############### archiving + def archive_osq_final(time, posterior_1min=False): - """Save obs_seq.final file for later. + """Save obs_seq.final file for later. time (dt.datetime) : time of sampling values from files posterior_1min (bool) : False if usual assimilation """ if posterior_1min: - archive_dir = cluster.archivedir+'/obs_seq_final_1min/' + archive_dir = cluster.archivedir + "/obs_seq_final_1min/" else: - archive_dir = cluster.archivedir+'/obs_seq_final/' + archive_dir = cluster.archivedir + "/obs_seq_final/" mkdir(archive_dir) - fout = archive_dir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final') - copy(cluster.dartrundir+'/obs_seq.final', fout) - print(fout, 'saved.') + fout = archive_dir + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.final") + copy(cluster.dartrundir + "/obs_seq.final", fout) + print(fout, "saved.") - # try: + # try: # print('archive regression diagnostics') # what are regression diagnostics?! # copy(cluster.dartrundir+'/reg_diagnostics', archive_dir+'/reg_diagnostics') # except Exception as e: # warnings.warn(str(e)) + def archive_filteroutput(time): - print('archiving output') - archive_assim = cluster.archivedir + time.strftime('/%Y-%m-%d_%H:%M/assim_stage0/') + print("archiving output") + archive_assim = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") mkdir(archive_assim) - copy(cluster.dartrundir+'/input.nml', archive_assim+'/input.nml') + copy(cluster.dartrundir + "/input.nml", archive_assim + "/input.nml") - for iens in range(1, exp.n_ens+1): # single members - copy(cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4), - archive_assim+'/filter_restart_d01.'+str(iens).zfill(4)) + for iens in range(1, exp.n_ens + 1): # single members + copy( + cluster.dartrundir + "/filter_restart_d01." + str(iens).zfill(4), + archive_assim + "/filter_restart_d01." + str(iens).zfill(4), + ) try: # not necessary for next forecast run - for iens in range(1, exp.n_ens+1): - copy(cluster.dartrundir+'/postassim_member_'+str(iens).zfill(4)+'.nc', - archive_assim+'/postassim_member_'+str(iens).zfill(4)+'.nc') + for iens in range(1, exp.n_ens + 1): + copy( + cluster.dartrundir + "/postassim_member_" + str(iens).zfill(4) + ".nc", + archive_assim + "/postassim_member_" + str(iens).zfill(4) + ".nc", + ) - for f in ['output_mean.nc', 'output_sd.nc']: # copy mean and sd to archive - copy(cluster.dartrundir+'/'+f, archive_assim+'/'+f) + for f in ["output_mean.nc", "output_sd.nc"]: # copy mean and sd to archive + copy(cluster.dartrundir + "/" + f, archive_assim + "/" + f) except Exception as e: warnings.warn(str(e)) def archive_osq_out(time): - dir_obsseq = cluster.archivedir+'/obs_seq_out/' + dir_obsseq = cluster.archivedir + "/obs_seq_out/" os.makedirs(dir_obsseq, exist_ok=True) - copy(cluster.dartrundir+'/obs_seq.out', dir_obsseq+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.out')) + copy( + cluster.dartrundir + "/obs_seq.out", + dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out"), + ) + + +def superob(): + if hasattr(exp, "superob_km"): + t = time_module.time() + obs = obsseq.ObsSeqOut(cluster.dartrundir + "/obs_seq.out") + print("superobbing to", exp.superob_km, "km") + obs.superob(window_km=exp.superob_km) + obs.to_dart(f=cluster.dartrundir + "/obs_seq.out") + print("superob took", int(time_module.time() - t), "seconds") if __name__ == "__main__": @@ -395,15 +461,15 @@ if __name__ == "__main__": 4) Assimilate - adapt obs errors for assimilation - calculate assim obs error from parametrization - 1) create obs_seq.in with obs error=0 - 2) calculate y_nat = H(x_nature) and y_ens = H(x_ensemble) - 3) calculate obs error as function of y_nat, y_ensmean + 1) create obs_seq.in with obs error=0 + 2) calculate y_nat = H(x_nature) and y_ens = H(x_ensemble) + 3) calculate obs error as function of y_nat, y_ensmean - or get assim obs error from config - edit obs error in obs_seq.out - assimilate - write state to archive - + Note: assim_time (dt.datetime): time of output prior_valid_time (dt.datetime): valid time of prior (may be different to assim_time) @@ -412,75 +478,100 @@ if __name__ == "__main__": python assim.py 2008-08-07_12:00 2008-08-06:00 2008-08-07_13:00 /home/fs71386/lkugler/data/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem """ - time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') - prior_init_time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M') - prior_valid_time = dt.datetime.strptime(sys.argv[3], '%Y-%m-%d_%H:%M') + time = dt.datetime.strptime(sys.argv[1], "%Y-%m-%d_%H:%M") + prior_init_time = dt.datetime.strptime(sys.argv[2], "%Y-%m-%d_%H:%M") + prior_valid_time = dt.datetime.strptime(sys.argv[3], "%Y-%m-%d_%H:%M") prior_path_exp = str(sys.argv[4]) - archive_time = cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/') + archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/") os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in os.chdir(cluster.dartrundir) - os.system(cluster.python+' '+cluster.scripts_rundir+'/link_dart_rttov.py') # link DART binaries to run_DART - os.system('rm -f input.nml obs_seq.in obs_seq.out obs_seq.final') # remove any existing observation files + os.system( + cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py" + ) # link DART binaries to run_DART + os.system( + "rm -f input.nml obs_seq.in obs_seq.out obs_seq.final" + ) # remove any existing observation files set_DART_nml() - print('prepare nature') + print("prepare nature") prepare_nature_dart(time) # link WRF files to DART directory - print('prepare prior ensemble') + print("prepare prior ensemble") prepare_prior_ensemble(time, prior_init_time, prior_valid_time, prior_path_exp) ################################################ - print(' 1) get the assimilation errors in a single vector ') - error_assimilate = [] - # to get the obs-error for assimilation, + print(" 1) get the assimilation errors in a single vector ") + error_assimilate = [] + # to get the obs-error for assimilation, # we need to get the true obs-space values for the parametrized variable # and collect the obs-error for assimilation in a single vector/list for i, obscfg in enumerate(exp.observations): - n_obs = obscfg['n_obs'] - n_obs_z = len(obscfg.get('heights', [1,])) + n_obs = obscfg["n_obs"] + n_obs_z = len( + obscfg.get( + "heights", + [ + 1, + ], + ) + ) n_obs_3d = n_obs * n_obs_z - do_error_parametrization = ((obscfg['error_assimilate'] == False) and (obscfg.get('sat_channel') == 6)) - + do_error_parametrization = (obscfg["error_assimilate"] == False) and ( + obscfg.get("sat_channel") == 6 + ) + if do_error_parametrization: # get observations for sat 6 - osq.create_obsseqin_alltypes(time, [obscfg,], np.zeros(n_obs_3d)) + osq.create_obsseqin_alltypes( + time, + [ + obscfg, + ], + np.zeros(n_obs_3d), + ) run_perfect_model_obs() - + superob() + # depends on obs_seq.out produced before by run_perfect_model_obs() - Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') + Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir + "/obs_seq.out") - Hx_prior = obs_operator_ensemble() # files are already linked to DART directory + Hx_prior = ( + obs_operator_ensemble() + ) # files are already linked to DART directory err_assim = calc_obserr_WV73(Hx_nat, Hx_prior) else: - err_assim = np.zeros(n_obs_3d) + obscfg['error_assimilate'] + err_assim = np.zeros(n_obs_3d) + obscfg["error_assimilate"] - error_assimilate.extend(err_assim) # the obs-error we assume for assimilating observations + error_assimilate.extend( + err_assim + ) # the obs-error we assume for assimilating observations ################################################ - print(' 2) generate observations ') + print(" 2) generate observations ") # the obs-error we use for generating obs is user-defined error_generate = [] for i, obscfg in enumerate(exp.observations): - error_generate.extend(np.zeros(n_obs_3d) + obscfg['error_generate']) + error_generate.extend(np.zeros(n_obs_3d) + obscfg["error_generate"]) osq.create_obsseqin_alltypes(time, exp.observations, error_generate) set_DART_nml() run_perfect_model_obs() # actually create observations that are used to assimilate + superob() archive_osq_out(time) ################################################ - print(' 3) assimilate with observation-errors for assimilation') + print(" 3) assimilate with observation-errors for assimilation") - replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) + replace_errors_obsseqout(cluster.dartrundir + "/obs_seq.out", error_assimilate) t = time_module.time() assimilate() - print('filter took', time_module.time()-t, 'seconds') - + print("filter took", time_module.time() - t, "seconds") + archive_filteroutput(time) archive_osq_final(time)