From ded54c869a404cf2f795d516b5a2702319f44a1f Mon Sep 17 00:00:00 2001 From: Kugler Lukas <lukas.kugler@univie.ac.at> Date: Tue, 1 Dec 2020 00:47:40 +0100 Subject: [PATCH] multi obstype, rewrite v1 --- config/cfg.py | 2 +- config/clusters.py | 3 +- scheduler.py | 33 ++-- scripts/assim_synth_obs.py | 331 +++++++++++++++++++++++++++++++++++++ scripts/create_obsseq.py | 113 +++++++++++-- scripts/gen_synth_obs.py | 275 ------------------------------ templates/input.nml | 7 +- 7 files changed, 458 insertions(+), 306 deletions(-) create mode 100755 scripts/assim_synth_obs.py delete mode 100755 scripts/gen_synth_obs.py diff --git a/config/cfg.py b/config/cfg.py index 65672a4..8451eba 100755 --- a/config/cfg.py +++ b/config/cfg.py @@ -19,7 +19,7 @@ n_obs = 64 # radar: n_obs for each observation height level vis = dict(sat_channel=1, n_obs=n_obs, err_std=0.03, cov_loc_radius_km=10) -wv = dict(sat_channel=6, n_obs=n_obs, err_std=5., +wv = dict(sat_channel=6, n_obs=n_obs, err_std=False, cov_loc_radius_km=10) ir108 = dict(sat_channel=9, n_obs=n_obs, err_std=5., cov_loc_radius_km=10) diff --git a/config/clusters.py b/config/clusters.py index e1f7af3..4272a85 100755 --- a/config/clusters.py +++ b/config/clusters.py @@ -34,8 +34,7 @@ vsc.namelist = vsc.scriptsdir+'/../templates/namelist.input' vsc.run_WRF = '/gpfs/data/fs71386/lkugler/DART-WRF/scripts/osse/run_ens.vsc.sh' vsc.slurm_cfg = {"account": "p71386", "partition": "mem_0384", "qos": "p71386_0384", - "nodes": "1", "ntasks": "1", - "ntasks-per-node": "48", "ntasks-per-core": 1, + "ntasks": "1", "nodes": "1", "ntasks-per-node": "48", "ntasks-per-core": "1", "mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"} diff --git a/scheduler.py b/scheduler.py index 0f0a6bc..1d31fc0 100755 --- a/scheduler.py +++ b/scheduler.py @@ -85,7 +85,7 @@ do mv $rundir/rsl.out.0000 $rundir/rsl.out.input done """ - s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens), + s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens), "nodes": "1", "time": "10", "mem-per-cpu": "2G"}) id = s.run(cmd, depends_on=[id]) return id @@ -188,18 +188,18 @@ def assimilate(assim_time, prior_init_time, +prior_path_exp, depends_on=[depends_on]) # prepare nature run, generate observations - s = my_Slurm("genSynthObs", cfg_update=dict(ntasks="48", time="12")) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/gen_synth_obs.py ' + s = my_Slurm("Assim", cfg_update=dict(nodes="1", ntasks="48", time="50", mem="200G")) + id = s.run(cluster.python+' '+cluster.scriptsdir+'/assim_synth_obs.py ' +time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id]) - # actuall assimilation step - s = my_Slurm("Assim", cfg_update=dict(ntasks="48", time="50", mem="200G")) - cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./filter; rm obs_seq_all.out' - id = s.run(cmd, depends_on=[id]) + # # actuall assimilation step + # s = my_Slurm("Assim", cfg_update=dict(nodes="1", ntasks="48", time="50", mem="200G")) + # cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./filter; rm obs_seq_all.out' + # id = s.run(cmd, depends_on=[id]) - s = my_Slurm("archiveAssim", cfg_update=dict(time="10")) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py ' - + assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id]) + # s = my_Slurm("archiveAssim", cfg_update=dict(time="10")) + # id = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py ' + # + assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id]) s = my_Slurm("updateIC", cfg_update=dict(time="8")) id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_filteroutput.py ' @@ -245,27 +245,28 @@ if is_new_run: first_guess = False elif start_from_existing_state: - id = prepare_wrfinput() # create initial conditions + #id = prepare_wrfinput() # create initial conditions # get initial conditions from archive init_time = dt.datetime(2008, 7, 30, 6) time = dt.datetime(2008, 7, 30, 10) exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.11_LMU_filter' - id = update_wrfinput_from_archive(time, init_time, exppath_arch, depends_on=id) + #id = update_wrfinput_from_archive(time, init_time, exppath_arch, depends_on=id) # values for assimilation assim_time = time prior_init_time = init_time -prior_path_exp = exppath_arch +prior_path_exp = False # use own exp path + -while time <= dt.datetime(2008, 7, 30, 18): +while time <= dt.datetime(2008, 7, 30, 10): id = assimilate(assim_time, prior_init_time, prior_path_exp=prior_path_exp, depends_on=id) - prior_path_exp = False # use own exp path + #prior_path_exp = False # use own exp path # integration this_forecast_init = assim_time # start integration from here @@ -282,6 +283,6 @@ while time <= dt.datetime(2008, 7, 30, 18): assim_time = time prior_init_time = assim_time - timedelta_btw_assim - create_satimages(depends_on=id) + #create_satimages(depends_on=id) mailme(id) diff --git a/scripts/assim_synth_obs.py b/scripts/assim_synth_obs.py new file mode 100755 index 0000000..e1e292b --- /dev/null +++ b/scripts/assim_synth_obs.py @@ -0,0 +1,331 @@ +import os, sys, shutil +import datetime as dt +import numpy as np +from scipy.interpolate import interp1d + +from config.cfg import exp, cluster +from utils import symlink, copy, sed_inplace, append_file +import create_obsseq as osq +import wrfout_add_geo + +earth_radius_km = 6370 + +# fit of Fig 7, Harnisch 2016 +x_ci = [0, 5, 10.5, 13, 16] +y_oe = [1, 4.5, 10, 12, 13] # Kelvin +oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True) + +def oe_73(ci): + if ci < 13: + return oe_73_linear(ci) + else: + return 16. + +def cloudimpact_73(bt_mod, bt_obs): + """ + follows Harnisch 2016 + """ + biascor_obs = 0 + bt_lim = 255 # Kelvin for 7.3 micron WV channel + + ci_obs = max(0, bt_lim-(bt_obs - biascor_obs)) + ci_mod = max(0, bt_lim-bt_mod) + ci = (ci_obs+ci_mod)/2 + return ci + +def read_prior_obs(f_obs_prior): + """ + docstring + """ + obsseq = open(f_obs_prior, 'r').readlines() + OBSs = [] + # read observations from obs_seq.final + for i, line in enumerate(obsseq): + if ' OBS ' in line: + observed = float(obsseq[i+1].strip()) + truth = float(obsseq[i+2].strip()) + prior_ensmean = float(obsseq[i+3].strip()) + prior_enssd = float(obsseq[i+4].strip()) + prior_ens = [] + for j in range(5, 5+exp.n_ens): + prior_ens.append(float(obsseq[i+j].strip())) + + OBSs.append(dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens))) + return OBSs + +def read_obsseqout(f): + obsseq = open(f, 'r').readlines() + true = [] + obs = [] + # read observations from obs_seq.out + for i, line in enumerate(obsseq): + if ' OBS ' in line: + observed = float(obsseq[i+1].strip()) + truth = float(obsseq[i+2].strip()) + true.append(truth) + obs.append(observed) + return true, obs + +def edit_obserr_in_obsseq(fpath_obsseqin, OEs): + """ + overwrite observation errors in a obs_seq.out file + according to the values in OEs + """ + # write to txt (write whole obs_seq.out again) + obsseq = open(fpath_obsseqin, 'r').readlines() + obsseq_new = obsseq.copy() + i_obs = 0 + for i, line in enumerate(obsseq): + if 'kind\n' in line: + i_line_oe = i+9 # 9 for satellite obs + obsseq_new[i_line_oe] = ' '+str(OEs[i_obs])+' \n' + i_obs += 1 + + os.rename(fpath_obsseqin, fpath_obsseqin+'-bak') # backup + # write cloud dependent errors (actually whole file) + with open(fpath_obsseqin, 'w') as f: + for line in obsseq_new: + f.write(line) + + +def set_input_nml(sat_channel=False, just_prior_values=False, + cov_loc_radius_km=32, cov_loc_vert_km=False): + """descr""" + cov_loc_radian = cov_loc_radius_km/earth_radius_km + + if just_prior_values: + template = cluster.scriptsdir+'/../templates/input.prioronly.nml' + else: + template = cluster.scriptsdir+'/../templates/input.nml' + copy(template, cluster.dartrundir+'/input.nml') + + options = {'<n_ens>': str(int(exp.n_ens)), + '<cov_loc_radian>': str(cov_loc_radian)} + + if cov_loc_vert_km: + cov_loc_vert_rad = cov_loc_vert_km*1000/cov_loc_radian + options['<horiz_dist_only>'] = '.false.' + options['<vert_norm_hgt>'] = str(cov_loc_vert_rad) + else: + options['<horiz_dist_only>'] = '.true.' + options['<vert_norm_hgt>'] = '50000.0' # dummy value + + for key, value in options.items(): + sed_inplace(cluster.dartrundir+'/input.nml', key, value) + + # input.nml for RTTOV + if sat_channel > 0: + if sat_channel in [1, 2, 3, 12]: # VIS channels + rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml' + else: # IR channels + rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml' + append_file(cluster.dartrundir+'/input.nml', rttov_nml) + else: + # append any rttov segment, needs to exist anyway + rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml' + append_file(cluster.dartrundir+'/input.nml', rttov_nml) + +def obs_operator_ensemble(): + os.chdir(cluster.dartrundir) + + if sat_channel: + list_ensemble_truths = [] + + for iens in range(1, exp.n_ens+1): + print('observation operator for ens #'+str(iens)) + # ens members are already linked to advance_temp<i>/wrfout_d01 + copy(cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01', + cluster.dartrundir+'/wrfout_d01') + + wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') + + # DART may need a wrfinput file as well, which serves as a template for dimension sizes + symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01') + + # run perfect_model obs (forward operator) + os.system('mpirun -np 12 ./perfect_model_obs > /dev/null') + + # truth values in obs_seq.out are H(x) values + true, _ = read_obsseqout(cluster.dartrundir+'/obs_seq.out') + list_ensemble_truths.append(true) + + n_obs = len(list_ensemble_truths[0]) + np_array = np.full((exp.n_ens, n_obs), np.nan) + for i in range(exp.n_ens): + np_array[i, :] = list_ensemble_truths[i] + return np_array + else: + raise NotImplementedError() + +def obs_operator_nature(): + prepare_nature_dart() + + set_input_nml(sat_channel=sat_channel) + os.chdir(cluster.dartrundir) + os.remove(cluster.dartrundir+'/obs_seq.out') + os.system('mpirun -np 12 ./perfect_model_obs') + true, _ = read_obsseqout(cluster.dartrundir+'/obs_seq.out') + return true + + +def prepare_nature_dart(): + # get wrfout_d01 from nature run + shutil.copy(time.strftime(cluster.nature_wrfout), + cluster.dartrundir+'/wrfout_d01') + + wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') + + # DART may need a wrfinput file as well, which serves as a template for dimension sizes + symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01') + +def calc_obserr_WV73(Hx_nature, Hx_prior): + + n_obs = len(Hx_nature) + OEs = np.ones(n_obs) + for iobs in range(n_obs): + + bt_y = Hx_nature[iobs] + bt_x_ens = Hx_prior[:, iobs] + CIs = [cloudimpact_73(bt_x, bt_y) for bt_x in bt_x_ens] + mean_CI = np.mean(CIs) + + oe_nature = oe_73(mean_CI) + print('oe_nature:', oe_nature, ', bt_y:', bt_y, ', mean_CI:', mean_CI) + OEs[iobs] = oe_nature + return OEs + + +def generate_observations(): + set_input_nml(sat_channel=sat_channel, cov_loc_radius_km=cov_loc, + cov_loc_vert_km=cov_loc_vert_km) + + # generate actual observations (with correct error) + os.chdir(cluster.dartrundir) + os.remove(cluster.dartrundir+'/obs_seq.out') + if not os.path.exists(cluster.dartrundir+'/obs_seq.in'): + raise RuntimeError('obs_seq.in does not exist in '+cluster.dartrundir) + os.system('mpirun -np 12 ./perfect_model_obs') + + +def assimilate(): + os.chdir(cluster.dartrundir) + os.remove(cluster.dartrundir+'/obs_seq.final') + if not os.path.exists(cluster.dartrundir+'/obs_seq.out'): + raise RuntimeError('obs_seq.out does not exist in '+cluster.dartrundir) + os.system('mpirun -np 48 ./filter') + +def archive_diagnostics(time): + print('archive obs space diagnostics') + savedir = cluster.archivedir()+'/obs_seq_final/' + mkdir(savedir) + copy(cluster.dartrundir+'/obs_seq.final', savedir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final')) + + try: + print('archive regression diagnostics') + savedir = cluster.archivedir()+'/reg_factor/' + mkdir(savedir) + copy(cluster.dartrundir+'/reg_diagnostics', savedir+time.strftime('/%Y-%m-%d_%H:%M_reg_diagnostics')) + except Exception as e: + warnings.warn(str(e)) + +def recycle_output(): + print('move output to input') + for iens in range(1, exp.n_ens+1): + os.rename(cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4), + cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01') + + +def archive_output_mean(time, istage): + istage = str(istage) + + print('copy prior posterior members to archive') + for iens in range(1, exp.n_ens+1): + savedir = cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/')+str(iens) + mkdir(savedir) + + copy(cluster.dartrundir+'/input.nml', + cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/input '+istage+'.nml')) + + # filter_in = cluster.dartrundir+'/preassim_member_'+str(iens).zfill(4)+'.nc' + # filter_out = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4) + + # copy mean and sd to archive + print('copy preassim, postassim mean and sd') + for f in ['output_mean.nc', 'output_sd.nc']: + copy(cluster.dartrundir+'/'+f, + cluster.archivedir()+'/'+f[:-3]+time.strftime('_'+istage+'_%Y-%m-%d_%H:%M:%S')) + + +if __name__ == "__main__": + + """Generate observations (obs_seq.out file) + as defined in config/cfg.py + for a certain timestamp (argument) of the nature run (defined in config/clusters.py) + + Workflow: + for each assimilation stage (one obs_seq.in and e.g. one observation type): + 1) prepare nature run for DART + optional: 2) calculate obs-error from parametrization + 3) create obs_seq.in with obs-errors from 2) + 4) generate actual observations (obs_seq.out) with obs_seq.in from 3) + + - calculate obs-error from parametrization + 1) create obs_seq.in with obs error=0 + 2) calculate y_nat = H(x_nature) and y_ens = H(x_ensemble) + 3) calculate obs error as function of y_nat, y_ensmean + + Assumptions: + - x_ensemble is already linked for DART to advance_temp<iens>/wrfout_d01 + """ + + time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') + folder_obs_coords = cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/') + + os.chdir(cluster.dartrundir) + os.system('rm -f obs_seq.out obs_seq.in obs_seq.final') # remove any existing observation files + + n_stages = len(exp.observations) + for istage, obscfg in enumerate(exp.observations): + + n_obs = obscfg['n_obs'] + sat_channel = obscfg.get('sat_channel', False) + obs_coords = osq.calc_obs_locations(n_obs, + coords_from_domaincenter=False, + distance_between_obs_km=obscfg.get('distance_between_obs_km', False), + folder_obs_coords=folder_obs_coords) + + set_DART_nml(sat_channel=sat_channel, + cov_loc=obscfg['cov_loc_radius_km'], + cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False)) + + use_error_parametrization = obscfg['err_std'] == False + if use_error_parametrization: + if sat_channel != 6: + raise NotImplementedError('sat channel '+str(sat_channel)) + + osq.create_obsseq_in(obscfg, np.zeros(n_obs)) + + Hx_nat = obs_operator_nature() + Hx_prior = obs_operator_ensemble() # files are already linked to DART directory + + obserr_var = calc_obserr_WV73(Hx_nat, Hx_prior) + else: + obserr_var = np.ones(n_obs) * obscfg['err_std']**2 + + osq.create_obsseq_in(obscfg, obserr_var) # now with correct errors + generate_observations() + + assimilate() + archive_diagnostics(time) + + if istage < n_stages-1: + # recirculation: filter output -> input + recycle_output() + archive_output_mean(time, istage) + + elif istage == n_stages-1: + # last assimilation, continue integration now + pass # call update wrfinput from filteroutput later + + else: + RuntimeError('this should never occur?!') diff --git a/scripts/create_obsseq.py b/scripts/create_obsseq.py index 202e42a..1d40504 100755 --- a/scripts/create_obsseq.py +++ b/scripts/create_obsseq.py @@ -34,7 +34,7 @@ def get_dart_date(time_dt): return dart_date_day, secs_thatday def calc_obs_locations(n_obs, coords_from_domaincenter=True, - distance_between_obs_km=9999, fpath_obs_locations=False): + distance_between_obs_km=9999, folder_obs_coords=False): """Calculate coordinate pairs for locations Args: @@ -104,6 +104,27 @@ def calc_obs_locations(n_obs, coords_from_domaincenter=True, assert len(coords) == n_obs, (len(coords), n_obs) return coords +def main_obspart(obs, last=False): + """ + Args: + obs (object) + last (bool): True if this is the last observation in the obs_seq file + """ + if last: + line_link = " "+str(obs.i-1)+" -1 -1" + else: + line_link = " -1 "+str(obs.i+1)+" -1" + + return """ + OBS """+str(obs.i)+""" +"""+line_link+""" +obdef +loc3d + """+obs.lon_rad+" "+obs.lat_rad+" "+obs.vert+" "+obs.vert_coord_sys+""" +kind + """+obs.kind_nr+""" + """+obs.secs_thatday+""" """+obs.dart_date_day+""" + """+obs.error_var def write_generic_obsseq(obs_name, obs_kind_nr, error_var, coords, dart_date_day, secs_thatday, output_path, @@ -324,23 +345,95 @@ def generic_obs(obs_kind, time_dt, coords, error_var, heights=False, output_path dart_date_day, secs_thatday, output_path, vert_coord_sfc=is_sfc_obs) +def obskind_read(): + raw_obskind_dart = """ + 5 RADIOSONDE_TEMPERATURE + 6 RADIOSONDE_SPECIFIC_HUMIDITY + 12 AIRCRAFT_U_WIND_COMPONENT + 13 AIRCRAFT_V_WIND_COMPONENT + 14 AIRCRAFT_TEMPERATURE + 16 ACARS_U_WIND_COMPONENT + 17 ACARS_V_WIND_COMPONENT + 18 ACARS_TEMPERATURE + 29 LAND_SFC_PRESSURE + 30 SAT_U_WIND_COMPONENT + 31 SAT_V_WIND_COMPONENT + 36 DOPPLER_RADIAL_VELOCITY + 37 RADAR_REFLECTIVITY + 83 GPSRO_REFRACTIVITY + 94 SYNOP_SURFACE_PRESSURE + 95 SYNOP_SPECIFIC_HUMIDITY + 96 SYNOP_TEMPERATURE + 254 MSG_4_SEVIRI_RADIANCE + 255 MSG_4_SEVIRI_TB + 256 MSG_4_SEVIRI_BDRF""" + + # lookup table for kind nr + alist = raw_obskind_dart.split() + assert len(alist) % 2 == 0, alist + obskind_nrs = {} + for i in range(0, len(alist)-1, 2): + obskind_nrs[alist[i+1]] = alist[i] + return obskind_nrs + +obskind_nrs = obskind_read() + + +def create_obsseq_in(obscfg, obserr_var): + """ + Args: + obserr_var (np.array): observation error variance + shape (n_obs,), one value for each observation, + """ + + self.coords = osq.calc_obs_locations(obscfg['n_obs'], + coords_from_domaincenter=False, + distance_between_obs_km=obscfg.get('distance_between_obs_km', False), + fpath_obs_locations=folder_obs_coords+'/obs_coords_'+obscfg['kind']+'.pkl') + + # for i_obs in obscfg['n_obs']: + + # instruction = dict(kind_nr = obskind_nrs[obscfg['kind']], + # sat_channel = obscfg.get('sat_channel', False), + # heights = obscfg.get('heights', False), + + # obs_kinds, time_dt, coords, error_var, heights=False, output_path='./'): + + if 'SYNOP' in obs_kind: + is_sfc_obs = True + heights = [2,] + else: + is_sfc_obs = False + + if not heights: + heights = [5000., ] + coords = calc_obs_locations_3d(coords, heights) + + dart_date_day, secs_thatday = get_dart_date(add_timezone_UTC(time_dt)) + print('secs, days:', secs_thatday, dart_date_day) + + obs_kind_nr = obs_kind_nrs[obs_kind] + + for obs_kind in obs_kinds: + + write_generic_obsseq2(obs_kind, obs_kind_nr, error_var, coords, + dart_date_day, secs_thatday, output_path, + vert_coord_sfc=is_sfc_obs) if __name__ == '__main__': - time_dt = dt.datetime(2008, 7, 30, 16, 31) - n_obs = 100 + time_dt = dt.datetime(2008, 7, 30, 10, 0) + n_obs = 64 sat_channel = 1 distance_between_obs_meters = 10000 - error_var = 0.001 + #error_var = 0.001 obs_coords = calc_obs_locations(n_obs, coords_from_domaincenter=False, distance_between_obs_km=distance_between_obs_meters, fpath_obs_locations=None) - sat(time_dt, sat_channel, obs_coords, error_var, output_path='./') + #sat(time_dt, sat_channel, obs_coords, error_var, output_path='./') # error_var = (5.)**2 - # generic_obs('RADAR', time_dt, n_obs, error_var, distance_between_obs_meters, - # output_path='./', fpath_obs_locations='./domain.pkl') + # generic_obs('RADAR_REFLECTIVITY', time_dt, obs_coords, error_var, heights=[5000.,], output_path='./') - # error_var = (0.5)**2 - # generic_obs('RASO_T', time_dt, n_obs, error_var, distance_between_obs_meters, - # output + error_var = (0.5)**2 + generic_obs('RADIOSONDE_TEMPERATURE', time_dt, obs_coords, error_var, heights=[5000.,]) diff --git a/scripts/gen_synth_obs.py b/scripts/gen_synth_obs.py deleted file mode 100755 index 12a4628..0000000 --- a/scripts/gen_synth_obs.py +++ /dev/null @@ -1,275 +0,0 @@ -import os, sys, shutil -import datetime as dt -import numpy as np -from scipy.interpolate import interp1d - -from config.cfg import exp, cluster -from utils import symlink, copy, sed_inplace, append_file -import create_obsseq as osq -import wrfout_add_geo - -earth_radius_km = 6370 - -# fit of Fig 7, Harnisch 2016 -x_ci = [0, 5, 10.5, 13, 16] -y_oe = [1, 4.5, 10, 12, 13] # Kelvin -oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True) - -def oe_73(ci): - if ci < 13: - return oe_73_linear(ci) - else: - return 16. - -def cloudimpact_73(bt_mod, bt_obs): - """ - follows Harnisch 2016 - """ - biascor_obs = 0 - bt_lim = 255 # Kelvin for 7.3 micron WV channel - - ci_obs = max(0, bt_lim-(bt_obs - biascor_obs)) - ci_mod = max(0, bt_lim-bt_mod) - ci = (ci_obs+ci_mod)/2 - return ci - -def read_prior_obs(f_obs_prior): - """ - docstring - """ - obsseq = open(f_obs_prior, 'r').readlines() - OBSs = [] - # read observations from obs_seq.final - for i, line in enumerate(obsseq): - if ' OBS ' in line: - observed = float(obsseq[i+1].strip()) - truth = float(obsseq[i+2].strip()) - prior_ensmean = float(obsseq[i+3].strip()) - prior_enssd = float(obsseq[i+4].strip()) - prior_ens = [] - for j in range(5, 5+exp.n_ens): - prior_ens.append(float(obsseq[i+j].strip())) - - OBSs.append(dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens))) - return OBSs - -def read_obsseqout(f): - obsseq = open(f, 'r').readlines() - true = [] - obs = [] - # read observations from obs_seq.out - for i, line in enumerate(obsseq): - if ' OBS ' in line: - observed = float(obsseq[i+1].strip()) - truth = float(obsseq[i+2].strip()) - true.append(truth) - obs.append(observed) - return true, obs - -def edit_obserr_in_obsseq(fpath_obsseqin, OEs): - """ - overwrite observation errors in a obs_seq.out file - according to the values in OEs - """ - # write to txt (write whole obs_seq.out again) - obsseq = open(fpath_obsseqin, 'r').readlines() - obsseq_new = obsseq.copy() - i_obs = 0 - for i, line in enumerate(obsseq): - if 'kind\n' in line: - i_line_oe = i+9 # 9 for satellite obs - obsseq_new[i_line_oe] = ' '+str(OEs[i_obs])+' \n' - i_obs += 1 - - os.rename(fpath_obsseqin, fpath_obsseqin+'-bak') # backup - # write cloud dependent errors (actually whole file) - with open(fpath_obsseqin, 'w') as f: - for line in obsseq_new: - f.write(line) - - -def set_input_nml(sat_channel=False, just_prior_values=False, - cov_loc_radius_km=32, cov_loc_vert_km=False): - """descr""" - cov_loc_radian = cov_loc_radius_km/earth_radius_km - - if just_prior_values: - template = cluster.scriptsdir+'/../templates/input.prioronly.nml' - else: - template = cluster.scriptsdir+'/../templates/input.nml' - copy(template, cluster.dartrundir+'/input.nml') - - options = {'<n_ens>': str(int(exp.n_ens)), - '<cov_loc_radian>': str(cov_loc_radian)} - - if cov_loc_vert_km: - cov_loc_vert_rad = cov_loc_vert_km*1000/cov_loc_radian - options['<horiz_dist_only>'] = '.false.' - options['<vert_norm_hgt>'] = str(cov_loc_vert_rad) - else: - options['<horiz_dist_only>'] = '.true.' - options['<vert_norm_hgt>'] = '50000.0' # dummy value - - for key, value in options.items(): - sed_inplace(cluster.dartrundir+'/input.nml', key, value) - - # input.nml for RTTOV - if sat_channel > 0: - if sat_channel in [1, 2, 3, 12]: # VIS channels - rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml' - else: # IR channels - rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml' - append_file(cluster.dartrundir+'/input.nml', rttov_nml) - else: - # append any rttov segment, needs to exist anyway - rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml' - append_file(cluster.dartrundir+'/input.nml', rttov_nml) - -def obs_operator_ensemble(): - os.chdir(cluster.dartrundir) - - if sat_channel: - list_ensemble_truths = [] - - for iens in range(1, exp.n_ens+1): - print('observation operator for ens #'+str(iens)) - # ens members are already linked to advance_temp<i>/wrfout_d01 - copy(cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01', - cluster.dartrundir+'/wrfout_d01') - - t = dt.datetime.now() - wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') - print((dt.datetime.now()-t).total_seconds(), 'secs for adding geodata') - - # DART may need a wrfinput file as well, which serves as a template for dimension sizes - symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01') - - # run perfect_model obs (forward operator) - os.system('mpirun -np 12 ./perfect_model_obs > /dev/null') - - # truth values in obs_seq.out are H(x) values - vals, _ = read_obsseqout(cluster.dartrundir+'/obs_seq.out') - list_ensemble_truths.append(vals) - - n_obs = len(list_ensemble_truths[0]) - np_array = np.full((exp.n_ens, n_obs), np.nan) - for i in range(exp.n_ens): - np_array[i, :] = list_ensemble_truths[i] - return np_array - else: - raise NotImplementedError() - - -if __name__ == "__main__": - - time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') - fpath_obs_coords = cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/obs_coords.pkl') - - - os.chdir(cluster.dartrundir) - os.system('rm -f obs_seq_*.out obs_seq.in obs_seq.final') # remove any existing observation files - - def prepare_nature_dart(): - # get wrfout_d01 from nature run - shutil.copy(time.strftime(cluster.nature_wrfout), - cluster.dartrundir+'/wrfout_d01') - - wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') - - # DART may need a wrfinput file as well, which serves as a template for dimension sizes - symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01') - - prepare_nature_dart() - - # loop over observation types - for i_obs, obscfg in enumerate(exp.observations): - - n_obs = obscfg['n_obs'] - error_var = (obscfg['err_std'])**2 - sat_channel = obscfg.get('sat_channel', False) - cov_loc = obscfg['cov_loc_radius_km'] - dist_obs = obscfg.get('distance_between_obs_km', False) - cov_loc_vert_km = obscfg.get('cov_loc_vert_km', False) - heights = obscfg.get('heights', False) - - # generate obs_seq.in - obs_coords = osq.calc_obs_locations(n_obs, coords_from_domaincenter=False, - distance_between_obs_km=dist_obs, - fpath_obs_locations=fpath_obs_coords) - - if sat_channel: - osq.sat(time, sat_channel, obs_coords, error_var, - output_path=cluster.dartrundir) - else: - osq.generic_obs(obscfg['kind'], time, obs_coords, error_var, - heights=heights, - output_path=cluster.dartrundir) - - if not os.path.exists(cluster.dartrundir+'/obs_seq.in'): - raise RuntimeError('obs_seq.in does not exist in '+cluster.dartrundir) - - os.chdir(cluster.dartrundir) - if sat_channel == 6: - """cloud dependent observation error - - # methodologically: - 1) gen H(x_nature) - 2) gen H(x_prior) - 3) find the observation error for a pair of (H(x_nature), H(x_background)) - 4) generate actual observation - with the observation error as function of H(x_nature) and H(x_background) - - # technically: - 4) the file 'obs_seq.in' needs to be edited to show corrected observation errors - """ - # 1) gen H(x_nature) - set_input_nml(sat_channel=sat_channel, - cov_loc_radius_km=cov_loc, - cov_loc_vert_km=cov_loc_vert_km) - os.system('mpirun -np 12 ./perfect_model_obs') - Hx_nature, _ = read_obsseqout(cluster.dartrundir+'/obs_seq.out') - - # 2) gen H(x_prior) for the whole ensemble - Hx_prior = obs_operator_ensemble() # files are already linked to DART directory - - # 3) find the observation error for a pair of (H(x_nature), H(x_background)) - # necessary to achieve a certain FGD distribution which is near to operational - n_obs = len(Hx_nature) - OEs = [] - for iobs in range(n_obs): - - bt_y = Hx_nature[iobs] - bt_x_ens = Hx_prior[:,iobs] - CIs = [cloudimpact_73(bt_x, bt_y) for bt_x in bt_x_ens] - mean_CI = np.mean(CIs) - - oe_nature = oe_73(mean_CI) - print('oe_nature=', oe_nature, ' K') - OEs.append(oe_nature) - - # correct obs_err in obs_seq.in (to produce actual observations later on) - fpath_obsseqout = cluster.dartrundir+'/obs_seq.in' - edit_obserr_in_obsseq(fpath_obsseqout, OEs) - - # ensure correct nature file linked - # nature should be the real nature again (was changed in the meantime) - prepare_nature_dart() - - # correct input.nml for actual assimilation later on - set_input_nml(sat_channel=sat_channel, - cov_loc_radius_km=cov_loc, - cov_loc_vert_km=cov_loc_vert_km) - - # 4) generate actual observations (with correct error) - os.chdir(cluster.dartrundir) - os.system('mpirun -np 12 ./perfect_model_obs') - - # rename according to i_obs - os.rename(cluster.dartrundir+'/obs_seq.out', - cluster.dartrundir+'/obs_seq_'+str(i_obs)+'.out') - - # concatenate the created obs_seq_*.out files - os.chdir(cluster.dartrundir) - os.system('cat obs_seq_*.out >> obs_seq_all.out') - - print(dt.datetime.now()) diff --git a/templates/input.nml b/templates/input.nml index 31d31e1..62cfe33 100644 --- a/templates/input.nml +++ b/templates/input.nml @@ -31,7 +31,7 @@ async = 0, adv_ens_command = "../shell_scripts/advance_model.csh", ens_size = <n_ens>, - obs_sequence_in_name = "obs_seq_all.out", + obs_sequence_in_name = "obs_seq.out", obs_sequence_out_name = "obs_seq.final", input_state_file_list = "input_list.txt" output_state_file_list = "output_list.txt" @@ -160,7 +160,10 @@ 'MSG_4_SEVIRI_RADIANCE', 'MSG_4_SEVIRI_TB', 'MSG_4_SEVIRI_BDRF', - 'LAND_SFC_PRESSURE' + 'LAND_SFC_PRESSURE', + 'SYNOP_SURFACE_PRESSURE', + 'SYNOP_TEMPERATURE', + 'SYNOP_SPECIFIC_HUMIDITY' evaluate_these_obs_types = 'RADIOSONDE_SPECIFIC_HUMIDITY', / -- GitLab