From 526d315fb259be62c42585da047bfae6d60533e1 Mon Sep 17 00:00:00 2001 From: lkugler <lukas.kugler@gmail.com> Date: Tue, 18 Oct 2022 11:08:27 +0200 Subject: [PATCH] WV62, restructure etc --- dartwrf/assim_synth_obs.py | 569 ++++++++++++++-------------------- dartwrf/create_obs_upfront.py | 65 ++++ dartwrf/create_obsseq.py | 9 +- dartwrf/obs/error_models.py | 76 +++++ 4 files changed, 376 insertions(+), 343 deletions(-) create mode 100755 dartwrf/create_obs_upfront.py create mode 100644 dartwrf/obs/error_models.py diff --git a/dartwrf/assim_synth_obs.py b/dartwrf/assim_synth_obs.py index 5c841e5..8af668d 100755 --- a/dartwrf/assim_synth_obs.py +++ b/dartwrf/assim_synth_obs.py @@ -1,125 +1,17 @@ -from multiprocessing.sharedctypes import Value import os, sys, shutil, warnings import time as time_module import datetime as dt import numpy as np -from scipy.interpolate import interp1d from config.cfg import exp, cluster from dartwrf.utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print, shell import dartwrf.create_obsseq as osq from dartwrf import wrfout_add_geo from dartwrf import obsseq +from dartwrf.obs import error_models as err earth_radius_km = 6370 -def OE_model_harnisch_WV73(ci): - if ci >= 0 and ci < 16: - # Kelvin, fit of Fig 7b, Harnisch 2016 - x_ci = [0, 5, 10.5, 13, 16] # average cloud impact [K] - y_oe = [1, 4.5, 10, 12, 13] # adjusted observation error [K] - oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True) - return oe_73_linear(ci) - else: - return 13.0 - -def cloudimpact_73(bt_mod, bt_obs): - """ - follows Harnisch 2016 - """ - biascor_obs = 0 - bt_lim = 255 # Kelvin for 7.3 micron WV channel - - ci_obs = max(0, bt_lim - (bt_obs - biascor_obs)) - ci_mod = max(0, bt_lim - bt_mod) - ci = (ci_obs + ci_mod) / 2 - return ci - - -def read_prior_obs(f_obs_prior): - """ - docstring - """ - obsseq = open(f_obs_prior, "r").readlines() - OBSs = [] - # read observations from obs_seq.final - for i, line in enumerate(obsseq): - if " OBS " in line: - observed = float(obsseq[i + 1].strip()) - truth = float(obsseq[i + 2].strip()) - prior_ensmean = float(obsseq[i + 3].strip()) - prior_enssd = float(obsseq[i + 4].strip()) - prior_ens = [] - for j in range(5, 5 + exp.n_ens): - prior_ens.append(float(obsseq[i + j].strip())) - - OBSs.append(dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens))) - return OBSs - - -def read_truth_obs_obsseq(f): - """Reads observed and true values from obs_seq.out/final files.""" - obsseq = open(f, "r").readlines() - true = [] - obs = [] - # read observations from obs_seq.out - for i, line in enumerate(obsseq): - if " OBS " in line: - observed = float(obsseq[i + 1].strip()) - truth = float(obsseq[i + 2].strip()) - true.append(truth) - obs.append(observed) - return true, obs - - -def replace_errors_obsseqout(f, new_errors): - """Replaces existing observation errors in obs_seq.final files - - new_errors (np.array) : standard deviation, - shape must match the number of observations - """ - debug = True - obsseq = open(f, "r").readlines() - - # find number of lines between two ' OBS ' lines: - first_obs = second_obs = None - for i, line in enumerate(obsseq): - if " OBS " in line: - if first_obs is None: - first_obs = i - else: - second_obs = i - break - if not second_obs: - raise RuntimeError('just one OBS in this file?! '+str(f)) - lines_between = second_obs - first_obs - lines_obserr_after_obsnr = lines_between - 1 # obserr line is one before ' OBS ' line - - # replace values in list obsseq - i_obs = 0 - for i, line in enumerate(obsseq): - if " OBS " in line: - line_error_obs_i = i + lines_obserr_after_obsnr - - previous_err_var = obsseq[line_error_obs_i] - new_err_obs_i = new_errors[i_obs] ** 2 # variance in obs_seq.out - if debug: - print( - line.strip(), - "previous err var ", - float(previous_err_var.strip()), - "new error", - new_err_obs_i, - ) - obsseq[line_error_obs_i] = " " + str(new_err_obs_i) + " \n" - - i_obs += 1 # next iteration - - with open(f, "w") as file: - for line in obsseq: - file.write(line) - print("replaced obs errors in", f) - def set_DART_nml(just_prior_values=False): def to_radian_horizontal(cov_loc_horiz_km): @@ -142,6 +34,7 @@ def set_DART_nml(just_prior_values=False): # options keys are replaced in input.nml with values options = { + "<filter_kind>": str(int(exp.filter_kind)), "<sampling_error_correction>": '.true.' if exp.sec else '.false.', "<post_inflation>": '4' if exp.inflation else '0', "<n_ens>": str(int(exp.n_ens)), @@ -151,12 +44,13 @@ def set_DART_nml(just_prior_values=False): } # Note: only one value of vertical localization possible - if hasattr(exp, 'cov_loc_vert_km_horiz_km'): - options["<horiz_dist_only>"] = ".false." + try: cov_loc_vert_km, cov_loc_horiz_km = exp.cov_loc_vert_km_horiz_km vert_norm_hgt = to_vertical_normalization(cov_loc_vert_km, cov_loc_horiz_km) options["<vert_norm_hgt>"] = str(vert_norm_hgt) - else: + options["<horiz_dist_only>"] = ".false." + except Exception as e: + warnings.warn(str(e)+' - not using vertical localization.') options["<horiz_dist_only>"] = ".true." options["<vert_norm_hgt>"] = "99999.0" # dummy value @@ -168,53 +62,14 @@ def set_DART_nml(just_prior_values=False): append_file(cluster.dartrundir + "/input.nml", rttov_nml) -def run_Hx(time, obscfg): - """ - # assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01 - Creates - obs_seq.final (file): observations on (non-averaged) grid - """ - # existing obs_seq.out is a precondition for filter - # needs to know where to take observations - osq.create_obsseqin_alltypes(time, [obscfg, ]) - run_perfect_model_obs() - - print("running H(x) : obs operator on ensemble prior") - - # set input.nml to calculate prior Hx - set_DART_nml(just_prior_values=True) - - # run filter to calculate prior Hx - shell("mpirun -np 12 ./filter &> log.filter.preassim") - - osf = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.final") - - if hasattr(exp, "superob_km"): - df = osf.df - print("superobbing to", exp.superob_km, "km") - osf.df = osf.df.superob(window_km=exp.superob_km) - osf.to_dart(cluster.dartrundir + "/obs_seq.final") - return osf - - -def obs_operator_nature(time): - print("getting true values in obs space from nature run") - prepare_nature_dart(time) - run_perfect_model_obs() - true, _ = read_truth_obs_obsseq(cluster.dartrundir + "/obs_seq.out") - return true - - def link_nature_to_dart_truth(time): # get wrfout_d01 from nature run shutil.copy(time.strftime(exp.nature_wrfout), cluster.dartrundir + "/wrfout_d01") # DART may need a wrfinput file as well, which serves as a template for dimension sizes symlink(cluster.dartrundir + "/wrfout_d01", cluster.dartrundir + "/wrfinput_d01") print( - "linked", - time.strftime(exp.nature_wrfout), - "to", - cluster.dartrundir + "/wrfout_d01", + "linked", time.strftime(exp.nature_wrfout), + "to", cluster.dartrundir + "/wrfout_d01", ) @@ -261,21 +116,8 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ # this seems to be necessary (else wrong level selection) wrfout_add_geo.run(cluster.dartrundir + "/../geo_em.d01.nc", wrfout_dart) - fpath = cluster.dartrundir + "/input_list.txt" - print("writing", fpath) - try_remove(fpath) - with open(fpath, "w") as f: - for iens in range(1, exp.n_ens + 1): - f.write("./advance_temp" + str(iens) + "/wrfout_d01") - f.write("\n") - - fpath = cluster.dartrundir + "/output_list.txt" - print("writing", fpath) - try_remove(fpath) - with open(fpath, "w") as f: - for iens in range(1, exp.n_ens + 1): - f.write("./filter_restart_d01." + str(iens).zfill(4)) - f.write("\n") + write_list_of_inputfiles_prior() + write_list_of_outputfiles() print("removing preassim and filter_restart") os.system("rm -rf " + cluster.dartrundir + "/preassim_*") @@ -286,113 +128,74 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ os.system("rm -rf " + cluster.dartrundir + "/obs_seq.fina*") -def calc_obserr_WV73(Hx_nature, Hx_prior): - """Calculate parametrized error (for assimilation) - Args: - Hx_nature (np.array): dim (observations) - Hx_prior (np.array): dim (ensemble_members, observations) - - Returns - np.array Observation error std-deviation with dim (observations) - """ - debug = False - n_obs = len(Hx_nature) - OEs = np.ones(n_obs) - for iobs in range(n_obs): - bt_y = Hx_nature[iobs] - bt_x_ens = Hx_prior[:, iobs] - - # compute Cloud impact for every pair (ensemble, truth) - CIs = [cloudimpact_73(bt_x, bt_y) for bt_x in bt_x_ens] - mean_CI = np.mean(CIs) - oe_model = OE_model_harnisch_WV73(mean_CI) - if debug: - print("BT_nature=", bt_y, "=> mean_CI=", mean_CI, "=> OE_assim=", oe_model) - OEs[iobs] = oe_model - return OEs - -def run_perfect_model_obs(nproc=12): - print("generating observations - running ./perfect_model_obs") +def write_txt(lines, fpath): + try_remove(fpath) + with open(fpath, "w") as file: + for line in lines: + file.write(line+'\n') + +def write_list_of_inputfiles_prior(): + files = [] + for iens in range(1, exp.n_ens+1): + files.append("./advance_temp" + str(iens) + "/wrfout_d01") + write_txt(files, cluster.dartrundir+'/input_list.txt') + +def write_list_of_inputfiles_posterior(assim_time): + filedir = cluster.archivedir+assim_time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") + + files = [] + for iens in range(1, exp.n_ens+1): + files.append(filedir+'filter_restart_d01.'+str(iens).zfill(4)) + write_txt(files, cluster.dartrundir+'/input_list.txt') + +def write_list_of_outputfiles(): + files = [] + for iens in range(1, exp.n_ens+1): + files.append("./filter_restart_d01." + str(iens).zfill(4)) + write_txt(files, cluster.dartrundir+'/output_list.txt') + +def run_perfect_model_obs(nproc=12, verbose=True): + if verbose: + print("generating observations - running ./perfect_model_obs") os.chdir(cluster.dartrundir) try_remove(cluster.dartrundir + "/obs_seq.out") if not os.path.exists(cluster.dartrundir + "/obs_seq.in"): raise RuntimeError("obs_seq.in does not exist in " + cluster.dartrundir) - print(shell("mpirun -np "+str(nproc)+" ./perfect_model_obs > log.perfect_model_obs")) + shell('mpirun -np '+str(nproc)+' '+cluster.container+" ./perfect_model_obs > log.perfect_model_obs") if not os.path.exists(cluster.dartrundir + "/obs_seq.out"): raise RuntimeError( "obs_seq.out does not exist in " + cluster.dartrundir, "\n look for " + cluster.dartrundir + "/log.perfect_model_obs") - -def assimilate(nproc=48): +def filter(nproc=48): print("time now", dt.datetime.now()) print("running filter") os.chdir(cluster.dartrundir) try_remove(cluster.dartrundir + "/obs_seq.final") t = time_module.time() - #shell('mpirun -np 12 ./filter &> log.filter') - shell("".join([ - "mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-", - str(int(nproc) - 1)," -np ",str(int(nproc))," ./filter > log.filter"])) + if nproc < 12: + shell('mpirun -np 12 '+cluster.container+' ./filter &> log.filter') + else: # -genv I_MPI_PIN_PROCESSOR_LIST=0-"+str(int(nproc) - 1) + shell("mpirun -np "+str(int(nproc))+' '+cluster.container+" ./filter > log.filter") print("./filter took", int(time_module.time() - t), "seconds") if not os.path.isfile(cluster.dartrundir + "/obs_seq.final"): raise RuntimeError( "obs_seq.final does not exist in " + cluster.dartrundir, "\n look for " + cluster.dartrundir + "/log.filter") -# currently unused -# def recycle_output(): -# """Use output of assimilation (./filter) as input for another assimilation (with ./filter) -# Specifically, this copies the state fields from filter_restart_d01.000x to the wrfout files in advance_temp folders""" -# update_vars = ['U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'QRAIN', 'U10', 'V10', 'T2', 'Q2', 'TSK', 'PSFC', 'CLDFRA'] -# updates = ','.join(update_vars) - -# print('recycle DART output to be used as input') -# for iens in range(1, exp.n_ens+1): -# dart_output = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4) -# dart_input = cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01' - -# #print('check for non-monotonic vertical pressure') - -# # convert link to file in order to be able to update the content -# if os.path.islink(dart_input): -# l = os.readlink(dart_input) -# os.remove(dart_input) -# copy(l, dart_input) - -# # print('move DART output to input: '+dart_output+' -> '+dart_input) -# # os.rename(dart_output, dart_input) # probably doesnt work - -# print('updating', updates, 'in', dart_input, 'from', dart_output) -# os.system(cluster.ncks+' -A -v '+updates+' '+dart_output+' '+dart_input) ############### archiving -def archive_osq_final(time, posterior_1min=False): - """Save obs_seq.final file for later. - time (dt.datetime) : time of sampling values from files - posterior_1min (bool) : False if usual assimilation - """ +def archive_filteroutput(time): + print("archiving ...") - if posterior_1min: - archive_dir = cluster.archivedir + "/obs_seq_final_1min/" - else: - archive_dir = cluster.archivedir + "/obs_seq_final/" + archive_dir = cluster.archivedir + "/obs_seq_final/" mkdir(archive_dir) fout = archive_dir + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.final") copy(cluster.dartrundir + "/obs_seq.final", fout) print(fout, "saved.") - # try: - # print('archive regression diagnostics') # what are regression diagnostics?! - # copy(cluster.dartrundir+'/reg_diagnostics', archive_dir+'/reg_diagnostics') - # except Exception as e: - # warnings.warn(str(e)) - - -def archive_filteroutput(time): - print("archiving output") archive_assim = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") mkdir(archive_assim) copy(cluster.dartrundir + "/input.nml", archive_assim + "/input.nml") @@ -416,27 +219,7 @@ def archive_filteroutput(time): except Exception as e: warnings.warn(str(e)) - -def archive_osq_out(time): - dir_obsseq = cluster.archivedir + "/obs_seq_out/" - os.makedirs(dir_obsseq, exist_ok=True) - copy(cluster.dartrundir + "/obs_seq.out", - dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out")) - if os.path.isfile(cluster.dartrundir + "/obs_seq.out-orig"): - try: - copy(cluster.dartrundir + "/obs_seq.out-orig", - dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-orig")) - except Exception as e: - warnings.warn(str(e)) - -def is_assim_error_parametrized(obscfg): - if (obscfg["error_assimilate"] == False) and ( - obscfg.get("sat_channel") == 6): - return True - else: - return False - -def get_parametrized_error(obscfg): +def get_parametrized_error(obscfg, osf_prior): """Calculate the parametrized error for an ObsConfig (one obs type) Args @@ -446,70 +229,72 @@ def get_parametrized_error(obscfg): Returns np.array observation error std-dev for assimilation """ - - # run obs operator (through filter program) - # creates obs_seq.final containing truth & prior Hx - osf = run_Hx(time, obscfg) - df_obs = osf.df - + df_obs = osf_prior Hx_prior = df_obs.get_prior_Hx().T Hx_truth = df_obs.get_truth_Hx() # compute the obs error for assimilation on the averaged grid # since the assimilation is done on the averaged grid - if obscfg.get("sat_channel") == 6: - return calc_obserr_WV73(Hx_truth, Hx_prior) - elif obscfg.get("sat_channel") == 1: - return calc_obserr_VIS06(Hx_truth, Hx_prior) + channel = obscfg.get("sat_channel") + + if channel == 5: + return err.calc_obserr_WV('WV62', Hx_truth, Hx_prior) + if channel == 6: + return err.calc_obserr_WV('WV73', Hx_truth, Hx_prior) else: - NotImplementedError('error_assimilate if False and sat_channel', obscfg.get("sat_channel")) + NotImplementedError('sat_channel not implemented', obscfg.get("sat_channel")) -def set_obserr_assimilate_in_obsseqout(obsseqout, outfile="./obs_seq.out"): +def set_obserr_assimilate_in_obsseqout(oso, osf_prior, outfile="./obs_seq.out"): for obscfg in exp.observations: kind_str = obscfg['kind'] kind = osq.obs_kind_nrs[kind_str] # modify each kind separately, one after each other - mask_kind = obsseqout.df.kind == kind + where_oso_iskind = oso.df.kind == kind + where_osf_iskind = osf_prior.df.kind == kind if obscfg["error_assimilate"] == False: - assim_err = get_parametrized_error(obscfg) - obsseqout.df.loc[mask_kind, 'variance'] = assim_err**2 - #assert np.allclose(assim_err, obsseqout.df['variance']**2) # check + assim_err = get_parametrized_error(obscfg, osf_prior.df[where_osf_iskind]) + oso.df.loc[where_oso_iskind, 'variance'] = assim_err**2 + #assert np.allclose(assim_err, oso.df['variance']**2) # check else: # overwrite with user-defined values - obsseqout.df.loc[mask_kind, 'variance'] = obscfg["error_assimilate"]**2 + oso.df.loc[where_oso_iskind, 'variance'] = obscfg["error_assimilate"]**2 - obsseqout.to_dart(outfile) + oso.to_dart(outfile) -def qc_obs(oso, outfile='./obs_seq.out'): +def qc_obs(time, oso, osf_prior): # obs should be superobbed already! for i, obscfg in enumerate(exp.observations): if i > 0: raise NotImplementedError() - # run obs operator (through filter program) - # creates obs_seq.final containing truth & prior Hx - osf = run_Hx(time, obscfg) - df_final = osf.df obs = oso.df.observations.values + Hx_prior_mean = osf_prior.df['prior ensemble mean'] n_obs_orig = len(obs) if obscfg.get("sat_channel") == 1: - print('removing obs with abs(FGD) smaller than prior ens spread') - Hx_prior = df_final.get_prior_Hx().T - Hx_prior_mean = np.mean(Hx_prior, axis=0) - Hx_prior_spread = df_final['prior ensemble spread'].values - #Hx_prior_spread[Hx_prior_spread < 1e-9] = 1e-9 + if False: + print('removing obs with abs(FGD) < 0.05') + Hx_prior = osf_prior.df.get_prior_Hx().T + # Hx_prior_mean = np.mean(Hx_prior, axis=0) + #Hx_prior_spread = osf_prior.df['prior ensemble spread'].values + #Hx_prior_spread[Hx_prior_spread < 1e-9] = 1e-9 - abs_FGD = abs(obs - Hx_prior_mean) # /Hx_prior_spread - oso.df = oso.df[abs_FGD > Hx_prior_spread] + abs_FGD = abs(obs - Hx_prior_mean) # /Hx_prior_spread + oso.df = oso.df[abs_FGD > 0.05] # Hx_prior_spread] - # obs_dist_to_priormean = abs(obs - Hx_prior_mean) - # oso.df = oso.df[obs_dist_to_priormean > 5] - # print('removed', n_obs_orig-len(oso.df), 'observations with abs(FGD) smaller than 5') + # obs_dist_to_priormean = abs(obs - Hx_prior_mean) + # oso.df = oso.df[obs_dist_to_priormean > 5] + # print('removed', n_obs_orig-len(oso.df), 'observations with abs(FGD) smaller than 5') + + else: + # set obs to prior mean => no change in mean but spread is adjusted. + abs_FGD = abs(obs - Hx_prior_mean) # /Hx_prior_spread + Hx_prior_median = np.median(osf_prior.df.get_prior_Hx(), axis=1) + oso.df.loc[abs_FGD <= 0.05, 'observations'] = Hx_prior_median[abs_FGD <= 0.05] elif obscfg.get("sat_channel") == 6: # WV73 @@ -517,16 +302,127 @@ def qc_obs(oso, outfile='./obs_seq.out'): obs = oso.df.observations.values n_obs_orig = len(obs) - Hx_prior = df_final.get_prior_Hx().T - Hx_prior_mean = np.mean(Hx_prior, axis=0) abs_FGD = abs(obs - Hx_prior_mean) # /Hx_prior_spread oso.df = oso.df[abs_FGD > 5] else: raise NotImplementedError('no obs QC implemented for this obs type') print('QC removed', n_obs_orig-len(oso.df), 'observations') - oso.to_dart(outfile) - print('saved', outfile) + + # for archiving + f_out_archive = cluster.archivedir + "/obs_seq_out/" + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC") + os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True) + copy(cluster.dartrundir + "/obs_seq.out", f_out_archive) + + # for assimilation later + f_out_dart = cluster.dartrundir+'/obs_seq.out' + oso.to_dart(f_out_dart) + print('saved', f_out_dart) + + +def evaluate(assim_time, + output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_posterior_allobs"): + """Depending on input_list.txt, this function calculates either prior or posterior obs space values. + """ + + os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in + os.chdir(cluster.dartrundir) + + # link DART binaries to run_DART + os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py") + + # remove any existing observation files + os.system("rm -f input.nml obs_seq.final") + + print("prepare nature") + prepare_nature_dart(assim_time) # link WRF files to DART directory + + if not os.path.isfile(cluster.dartrundir+'/obs_seq.out'): + raise RuntimeError(cluster.dartrundir+'/obs_seq.out does not exist') + + set_DART_nml(just_prior_values=True) + filter(nproc=6) + + # archiving + fout = cluster.archivedir + "/obs_seq_final/" + assim_time.strftime(output_format) + os.makedirs(cluster.archivedir + "/obs_seq_final/", exist_ok=True) + copy(cluster.dartrundir + "/obs_seq.final", fout) + print(fout, "saved.") + + osf = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.final") + return osf + + + +def generate_obsseq_out(time): + + def ensure_physical_vis(oso): # set reflectance < surface albedo to surface albedo + print(" 2.2) removing obs below surface albedo ") + if_vis_obs = oso.df['kind'].values == 262 + if_obs_below_surface_albedo = oso.df['observations'].values < 0.2928 + + oso.df.loc[if_vis_obs & if_obs_below_surface_albedo, ('observations')] = 0.2928 + oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") + return oso + + + def apply_superobbing(oso): + try: + f_oso = dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-before_superob") + shutil.copy(cluster.dartrundir + "/obs_seq.out-before_superob", f_oso) + print('saved', f_oso) + except Exception as e: + warnings.warn(str(e)) + + print(" 2.3) superobbing to", exp.superob_km, "km") + oso.df = oso.df.superob(window_km=exp.superob_km) + oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") + + + ############################## + + dir_obsseq=cluster.archivedir + "/obs_seq_out/" + os.makedirs(dir_obsseq, exist_ok=True) + + osq.create_obsseqin_alltypes(time, exp.observations) + run_perfect_model_obs() # generate observation, draws from gaussian + + print(" 2.1) obs preprocessing") + oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") + + oso = ensure_physical_vis(oso) + + if getattr(exp, "superob_km", False): + oso = apply_superobbing(oso) + + # archive complete obsseqout + f_oso = dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out") + shutil.copy(cluster.dartrundir + "/obs_seq.out", f_oso) + print('saved', f_oso) + return oso + + +def get_obsseq_out(time): + + # did we specify an obsseqout inputfile? + if exp.use_existing_obsseq != False: + f_obsseq = time.strftime(exp.use_existing_obsseq) + copy(f_obsseq, cluster.dartrundir+'/obs_seq.out') + print(f_obsseq, 'copied to', cluster.dartrundir+'/obs_seq.out') + oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") + else: + # did we already generate observations? + f_oso_thisexp = cluster.archivedir+'/obs_seq_out/'+time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out") + if os.path.isfile(f_oso_thisexp): + # oso exists + copy(f_oso_thisexp, cluster.dartrundir+'/obs_seq.out') + print('copied existing obsseqout from', f_oso_thisexp) + oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") + else: + # we need to generate observations + oso = generate_obsseq_out(time) + + return oso if __name__ == "__main__": @@ -539,10 +435,12 @@ if __name__ == "__main__": 2) create obs_seq.in 3) create obs from nature (obs_seq.out) with user-defined obs-errors 4) Assimilate with assigned errors - - Note: - assim_time (dt.datetime): time of output - prior_valid_time (dt.datetime): valid time of prior (may be different to assim_time) + + Args: + assim_time: time of output + prior_init_time: forecast start of prior + prior_valid_time: valid time of prior (may be different to assim_time) + path to prior experiment Example call: python assim.py 2008-08-07_12:00 2008-08-06:00 2008-08-07_13:00 /home/fs71386/lkugler/data/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem @@ -552,6 +450,10 @@ if __name__ == "__main__": prior_init_time = dt.datetime.strptime(sys.argv[2], "%Y-%m-%d_%H:%M") prior_valid_time = dt.datetime.strptime(sys.argv[3], "%Y-%m-%d_%H:%M") prior_path_exp = str(sys.argv[4]) + options = [] + if len(sys.argv) >4: + options = sys.argv[5:] + nproc = 6 if 'headnode' in options else 48 archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/") os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in @@ -571,44 +473,29 @@ if __name__ == "__main__": prepare_prior_ensemble(time, prior_init_time, prior_valid_time, prior_path_exp) ################################################ - print(" 1) create observations with specified obs-error") - osq.create_obsseqin_alltypes(time, exp.observations) - set_DART_nml() - - run_perfect_model_obs() # create observations - - print(" 2.1) obs preprocessing") - - oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") - - if True: # set reflectance < surface albedo to surface albedo - print(" 2.2) removing obs below surface albedo ") - if_vis_obs = oso.df['kind'].values == 262 - if_obs_below_surface_albedo = oso.df['observations'].values < 0.2928 - - oso.df.loc[if_vis_obs & if_obs_below_surface_albedo, ('observations')] = 0.2928 - oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") - - if getattr(exp, "superob_km", False): - print(" 2.3) superobbing to", exp.superob_km, "km") - oso.df = oso.df.superob(window_km=exp.superob_km) - copy(cluster.dartrundir + "/obs_seq.out", cluster.dartrundir + "/obs_seq.out-orig") - oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") - + print(" 1) get observations with specified obs-error") + oso = get_obsseq_out(time) ################################################ - print(" 2.4) assign observation-errors for assimilation ") - set_obserr_assimilate_in_obsseqout(oso, outfile=cluster.dartrundir + "/obs_seq.out") + print('3.1) evaluate prior') # evaluate prior observations for all locations + osf_prior = evaluate(time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_prior_allobs") + + print(" 3.2) assign observation-errors for assimilation ") + set_obserr_assimilate_in_obsseqout(oso, osf_prior, outfile=cluster.dartrundir + "/obs_seq.out") if getattr(exp, "reject_smallFGD", False): - print(" 2.5) QC of observations ") - qc_obs(oso, outfile=cluster.dartrundir + "/obs_seq.out") + print(" 3.3) reject observations? ") + qc_obs(time, oso, osf_prior) - print(" 3) assimilate ") - archive_osq_out(time) - + print(" 3.4) assimilate (run filter) ") set_DART_nml() - assimilate() - + filter(nproc=nproc) archive_filteroutput(time) - archive_osq_final(time) + + # evaluate posterior observations for all locations + write_list_of_inputfiles_posterior(time) + if getattr(exp, "reject_smallFGD", False): + copy(cluster.archivedir+'/obs_seq_out/'+time.strftime('%Y-%m-%d_%H:%M_obs_seq.out-beforeQC'), + cluster.dartrundir+'/obs_seq.out') + evaluate(time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_posterior_allobs") + diff --git a/dartwrf/create_obs_upfront.py b/dartwrf/create_obs_upfront.py new file mode 100755 index 0000000..515d168 --- /dev/null +++ b/dartwrf/create_obs_upfront.py @@ -0,0 +1,65 @@ +from multiprocessing.sharedctypes import Value +import os, sys, shutil, warnings +import time as time_module +import datetime as dt +import numpy as np + +from config.cfg import exp, cluster +from dartwrf.utils import copy, print +import dartwrf.create_obsseq as osq +from dartwrf import obsseq + +from dartwrf import assim_synth_obs as aso + +tformat = '%Y-%m-%d_%H:%M' + + +if __name__ == "__main__": + """Create observations for multiple times upfront + """ + + args = sys.argv[1:] + times = [] + for tstr in args: + times.append(dt.datetime.strptime(tstr, tformat)) + + dir_for_obsseqout = exp.nature_wrfout + '/../../../obs_seq_out/'+exp.use_existing_obsseq + os.makedirs(dir_for_obsseqout, exist_ok=True) # create directory to run DART in + print('will save obsseq to', dir_for_obsseqout) + + os.chdir(cluster.dartrundir) + + # link DART binaries to run_DART + os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py") + + # remove any existing observation files + os.system("rm -f input.nml obs_seq.in obs_seq.out obs_seq.out-orig obs_seq.final") + aso.set_DART_nml() + + for time in times: + print('obsseq for time', time) + + aso.prepare_nature_dart(time) # link WRF files to DART directory + + osq.create_obsseqin_alltypes(time, exp.observations) # obs_seq.in + + aso.run_perfect_model_obs(nproc=6) # create observations (obs_seq.out) + + oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") + + if True: # set reflectance < surface albedo to surface albedo + print(" 2.2) removing obs below surface albedo ") + if_vis_obs = oso.df['kind'].values == 262 + if_obs_below_surface_albedo = oso.df['observations'].values < 0.2928 + + oso.df.loc[if_vis_obs & if_obs_below_surface_albedo, ('observations')] = 0.2928 + oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") + + if getattr(exp, "superob_km", False): + print(" 2.3) superobbing to", exp.superob_km, "km") + oso.df = oso.df.superob(window_km=exp.superob_km) + copy(cluster.dartrundir + "/obs_seq.out", cluster.dartrundir + "/obs_seq.out-orig") + oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") + + aso.archive_osq_out(time, dir_obsseq=dir_for_obsseqout) + diff --git a/dartwrf/create_obsseq.py b/dartwrf/create_obsseq.py index e178d11..34d64d2 100755 --- a/dartwrf/create_obsseq.py +++ b/dartwrf/create_obsseq.py @@ -454,10 +454,15 @@ if __name__ == '__main__': error_generate=0.03, error_assimilate=0.06, cov_loc_radius_km=32) + wv62 = dict(plotname='Brightness temperature WV 6.2µm', plotunits='[K]', + kind='MSG_4_SEVIRI_TB', sat_channel=5, n_obs=n_obs, + error_generate=1., error_assimilate=False, + cov_loc_radius_km=20) + wv73 = dict(plotname='Brightness temperature WV 7.3µm', plotunits='[K]', kind='MSG_4_SEVIRI_TB', sat_channel=6, n_obs=n_obs, error_generate=1., error_assimilate=False, - cov_loc_radius_km=32) + cov_loc_radius_km=20) ir108 = dict(plotname='Brightness temperature IR 10.8µm', plotunits='[K]', kind='MSG_4_SEVIRI_TB', sat_channel=9, n_obs=n_obs, @@ -482,7 +487,7 @@ if __name__ == '__main__': #create_obsseq_in(time_dt, radar, archive_obs_coords=False) #'./coords_stage1.pkl') - create_obsseqin_alltypes(time_dt, [vis], archive_obs_coords=False) #'./obs_coords.pkl') + create_obsseqin_alltypes(time_dt, [wv62], archive_obs_coords=False) #'./obs_coords.pkl') if False: error_assimilate = 5.*np.ones(n_obs*len(radar['heights'])) diff --git a/dartwrf/obs/error_models.py b/dartwrf/obs/error_models.py new file mode 100644 index 0000000..281ea36 --- /dev/null +++ b/dartwrf/obs/error_models.py @@ -0,0 +1,76 @@ +import numpy as np +from scipy.interpolate import interp1d + +def calc_obserr_WV(channel, Hx_nature, Hx_prior): + """Calculate parametrized error (for assimilation) + Args: + channel (str): satellite channel + Hx_nature (np.array): dim (observations) + Hx_prior (np.array): dim (ensemble_members, observations) + + Returns + np.array Observation error std-deviation with dim (observations) + """ + assert channel in ['WV62', 'WV73'] + debug = False + + n_obs = len(Hx_nature) + OEs = np.ones(n_obs) + for iobs in range(n_obs): + bt_y = Hx_nature[iobs] + bt_x_ens = Hx_prior[:, iobs] + + # compute Cloud impact for every pair (ensemble, truth) + CIs = [cloudimpact(channel, bt_x, bt_y) for bt_x in bt_x_ens] + mean_CI = np.mean(CIs) + + if channel == 'WV62': + oe_model = OE_model_harnisch_WV62(mean_CI) + elif channel == 'WV73': + oe_model = OE_model_harnisch_WV73(mean_CI) + + if debug: + print("BT_nature=", bt_y, "=> mean_CI=", mean_CI, "=> OE_assim=", oe_model) + + OEs[iobs] = oe_model + return OEs + + +def cloudimpact(channel, bt_mod, bt_obs): + """ + follows Harnisch 2016, Figure 3 + """ + if channel == 'WV73': + biascor_obs = 0 + bt_lim = 255 # Kelvin for 7.3 micron WV channel + elif channel == 'WV62': + biascor_obs = 0 + bt_lim = 232.5 # Kelvin for 6.2 micron WV channel + + ci_obs = max(0, bt_lim - (bt_obs - biascor_obs)) + ci_mod = max(0, bt_lim - bt_mod) + ci = (ci_obs + ci_mod) / 2 + return ci + + +def OE_model_harnisch_WV62(ci): + if ci >= 0 and ci < 7.5: + # Kelvin, fit of Fig 7a, Harnisch 2016 + x_ci = [0, 2.5, 4.5, 5.5, 7.5] # average cloud impact [K] + y_oe = [1.2, 3, 5, 6, 6.5] # adjusted observation error [K] + oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True) + return oe_73_linear(ci) + else: # assign highest observation error + return 6.5 + + +def OE_model_harnisch_WV73(ci): + if ci >= 0 and ci < 16: + # Kelvin, fit of Fig 7b, Harnisch 2016 + x_ci = [0, 5, 10.5, 13, 16] # average cloud impact [K] + y_oe = [1, 4.5, 10, 12, 13] # adjusted observation error [K] + oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True) + return oe_73_linear(ci) + else: # assign highest observation error + return 13.0 + -- GitLab