From d88bcbebaa1e63b4e0ac862c7d4a56f97d1baf73 Mon Sep 17 00:00:00 2001 From: Kugler Lukas <lukas.kugler@univie.ac.at> Date: Wed, 2 Dec 2020 12:02:29 +0100 Subject: [PATCH] fixes --- scheduler.py | 11 ++--- scripts/apply_obs_op_dart.py | 53 +++++++++++++------- scripts/assim_synth_obs.py | 92 +++++++++++++++++++---------------- scripts/utils.py | 10 ++++ templates/input.prioronly.nml | 1 + 5 files changed, 101 insertions(+), 66 deletions(-) diff --git a/scheduler.py b/scheduler.py index 1d31fc0..a73045d 100755 --- a/scheduler.py +++ b/scheduler.py @@ -127,7 +127,7 @@ def run_ENS(begin, end, depends_on=None): id = s.run(cmd, depends_on=[id]) # apply forward operator (DART filter without assimilation) - s = my_Slurm("fwOP-1m", cfg_update=dict(time="2")) + s = my_Slurm("fwOP-1m", cfg_update=dict(time="10")) id = s.run(cluster.python+' '+cluster.scriptsdir+'/apply_obs_op_dart.py ' + begin.strftime('%Y-%m-%d_%H:%M')+' ' + begin_plus1.strftime('%Y-%m-%d_%H:%M'), @@ -140,7 +140,7 @@ def run_ENS(begin, end, depends_on=None): id = s.run(' '.join([cluster.python, cluster.scriptsdir+'/prepare_namelist.py', begin.strftime('%Y-%m-%d_%H:%M'), - begin_plus1.strftime('%Y-%m-%d_%H:%M'), + end.strftime('%Y-%m-%d_%H:%M'), str(hist_interval), str(radt),]), depends_on=[id]) @@ -257,16 +257,15 @@ elif start_from_existing_state: # values for assimilation assim_time = time prior_init_time = init_time -prior_path_exp = False # use own exp path - +prior_path_exp = exppath_arch -while time <= dt.datetime(2008, 7, 30, 10): +while time <= dt.datetime(2008, 7, 30, 16): id = assimilate(assim_time, prior_init_time, prior_path_exp=prior_path_exp, depends_on=id) - #prior_path_exp = False # use own exp path + prior_path_exp = False # use own exp path # integration this_forecast_init = assim_time # start integration from here diff --git a/scripts/apply_obs_op_dart.py b/scripts/apply_obs_op_dart.py index 084e127..5b380c6 100644 --- a/scripts/apply_obs_op_dart.py +++ b/scripts/apply_obs_op_dart.py @@ -4,9 +4,10 @@ import numpy as np from scipy.interpolate import interp1d from config.cfg import exp, cluster -from utils import symlink, copy, mkdir, sed_inplace, append_file +from utils import symlink, copy, mkdir, sed_inplace, append_file, print import create_obsseq as osq -from gen_synth_obs import read_prior_obs, set_input_nml +import assim_synth_obs as aso +from assim_synth_obs import read_prior_obs, set_DART_nml, generate_observations, assimilate import pre_assim def run_operator(obscfg, time): @@ -16,12 +17,11 @@ def run_operator(obscfg, time): # get observation file (obs not important, but their locations) # this should correspond to configuration to have same locations as in real assim - obs_seq_all_out = cluster.dartrundir + '/obs_seq_all.out' os.chdir(cluster.dartrundir) n_obs = obscfg['n_obs'] error_var = (obscfg['err_std'])**2 - sat_channel = obscfg.get('channel', False) + sat_channel = obscfg.get('sat_channel', False) cov_loc = obscfg['cov_loc_radius_km'] dist_obs = obscfg.get('distance_between_obs_km', False) @@ -38,37 +38,56 @@ def run_operator(obscfg, time): wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') - print('running perfect model obs', flush=True) + print('running perfect model obs') os.system('mpirun -np 12 ./perfect_model_obs') # set namelist for filter (calc only forward op) - set_input_nml(sat_channel=obscfg.get('channel', False), + aso.set_DART_nml(sat_channel=sat_channel, just_prior_values=True) # run filter - os.system('mv obs_seq.out obs_seq_all.out') - assert os.path.exists(obs_seq_all_out) - print('running filter', flush=True) + assert os.path.exists(cluster.dartrundir+'/obs_seq.out') + print('running filter') os.system('mpirun -np 40 ./filter') # copy output to archive savedir = cluster.archivedir()+'/obs_seq_final_1min/' mkdir(savedir) - obsname = obscfg.get('kind', 'satch'+str(obscfg['sat_channel'])) - fout = time.strftime('/%Y-%m-%d_%H:%M_'+obsname+'_obs_seq.final') + obsname = obscfg['kind'] + copy(cluster.dartrundir+'/obs_seq.final', savedir+fout) print('output of observation operator saved to', fout) if __name__ == '__main__': prev_forecast_init = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') - time_to_run_fw_op = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M') + time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M') exppath_firstguess = cluster.archivedir() - print(prev_forecast_init, time_to_run_fw_op) + print(prev_forecast_init, time) # link ensemble states to run_DART directory - pre_assim.run(time_to_run_fw_op, prev_forecast_init, exppath_firstguess) + pre_assim.run(time, prev_forecast_init, exppath_firstguess) + + savedir = cluster.archivedir()+'/obs_seq_final_1min/' + mkdir(savedir) + + n_stages = len(exp.observations) + for istage, obscfg in enumerate(exp.observations): + + kind = obscfg['kind'] + n_obs = obscfg['n_obs'] + sat_channel = obscfg.get('sat_channel', False) + obscfg['folder_obs_coords'] = False + + aso.set_DART_nml(sat_channel=sat_channel, + cov_loc_radius_km=obscfg['cov_loc_radius_km'], + cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False), + just_prior_values=True) + + osq.create_obsseq_in(time, obscfg) + aso.generate_observations() + aso.assimilate() + + archive_stage = savedir+kind + aso.archive_diagnostics(archive_stage, time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final')) - # run fwd operator, save to archive - for i_obs, obscfg in enumerate(exp.observations): - run_operator(obscfg, time_to_run_fw_op) diff --git a/scripts/assim_synth_obs.py b/scripts/assim_synth_obs.py index a41e8c5..c4f1de4 100755 --- a/scripts/assim_synth_obs.py +++ b/scripts/assim_synth_obs.py @@ -1,10 +1,11 @@ import os, sys, shutil +import warnings import datetime as dt import numpy as np from scipy.interpolate import interp1d from config.cfg import exp, cluster -from utils import symlink, copy, sed_inplace, append_file +from utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print import create_obsseq as osq import wrfout_add_geo @@ -66,34 +67,38 @@ def read_obsseqout(f): obs.append(observed) return true, obs -def edit_obserr_in_obsseq(fpath_obsseqin, OEs): - """ - overwrite observation errors in a obs_seq.out file - according to the values in OEs - """ - # write to txt (write whole obs_seq.out again) - obsseq = open(fpath_obsseqin, 'r').readlines() - obsseq_new = obsseq.copy() - i_obs = 0 - for i, line in enumerate(obsseq): - if 'kind\n' in line: - i_line_oe = i+9 # 9 for satellite obs - obsseq_new[i_line_oe] = ' '+str(OEs[i_obs])+' \n' - i_obs += 1 - - os.rename(fpath_obsseqin, fpath_obsseqin+'-bak') # backup - # write cloud dependent errors (actually whole file) - with open(fpath_obsseqin, 'w') as f: - for line in obsseq_new: - f.write(line) - - -def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False): +# def edit_obserr_in_obsseq(fpath_obsseqin, OEs): +# """ +# overwrite observation errors in a obs_seq.out file +# according to the values in OEs +# """ +# # write to txt (write whole obs_seq.out again) +# obsseq = open(fpath_obsseqin, 'r').readlines() +# obsseq_new = obsseq.copy() +# i_obs = 0 +# for i, line in enumerate(obsseq): +# if 'kind\n' in line: +# i_line_oe = i+9 # 9 for satellite obs +# obsseq_new[i_line_oe] = ' '+str(OEs[i_obs])+' \n' +# i_obs += 1 + +# os.rename(fpath_obsseqin, fpath_obsseqin+'-bak') # backup +# # write cloud dependent errors (actually whole file) +# with open(fpath_obsseqin, 'w') as f: +# for line in obsseq_new: +# f.write(line) + + +def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False, + just_prior_values=False): """descr""" cov_loc_radian = cov_loc_radius_km/earth_radius_km - copy(cluster.scriptsdir+'/../templates/input.nml', - cluster.dartrundir+'/input.nml') + if just_prior_values: + template = cluster.scriptsdir+'/../templates/input.prioronly.nml' + else: + template = cluster.scriptsdir+'/../templates/input.nml' + copy(template, cluster.dartrundir+'/input.nml') # options are overwritten with settings options = {'<n_ens>': str(int(exp.n_ens)), @@ -123,6 +128,7 @@ def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False) append_file(cluster.dartrundir+'/input.nml', rttov_nml) def obs_operator_ensemble(): + print('running obs operator on ensemble forecast') os.chdir(cluster.dartrundir) if sat_channel: @@ -155,6 +161,7 @@ def obs_operator_ensemble(): raise NotImplementedError() def obs_operator_nature(): + print('running obs operator on nature run') prepare_nature_dart() os.chdir(cluster.dartrundir) @@ -163,7 +170,6 @@ def obs_operator_nature(): true, _ = read_obsseqout(cluster.dartrundir+'/obs_seq.out') return true - def prepare_nature_dart(): # get wrfout_d01 from nature run shutil.copy(time.strftime(cluster.nature_wrfout), @@ -191,24 +197,25 @@ def calc_obserr_WV73(Hx_nature, Hx_prior): return OEs def generate_observations(): - # generate actual observations (with correct error) + print('generate actual observations') os.chdir(cluster.dartrundir) - os.remove(cluster.dartrundir+'/obs_seq.out') + try_remove(cluster.dartrundir+'/obs_seq.out') if not os.path.exists(cluster.dartrundir+'/obs_seq.in'): raise RuntimeError('obs_seq.in does not exist in '+cluster.dartrundir) os.system('mpirun -np 12 ./perfect_model_obs') def assimilate(): + print('running filter') os.chdir(cluster.dartrundir) - os.remove(cluster.dartrundir+'/obs_seq.final') + try_remove(cluster.dartrundir+'/obs_seq.final') if not os.path.exists(cluster.dartrundir+'/obs_seq.out'): raise RuntimeError('obs_seq.out does not exist in '+cluster.dartrundir) os.system('mpirun -np 48 ./filter') -def archive_diagnostics(archive_stage): +def archive_diagnostics(archive_stage, fname_final): print('archive obs space diagnostics') mkdir(archive_stage) - copy(cluster.dartrundir+'/obs_seq.final', archive_stage+'/obs_seq.final') + copy(cluster.dartrundir+'/obs_seq.final', archive_stage+'/'+fname_final) try: print('archive regression diagnostics') @@ -223,14 +230,11 @@ def recycle_output(): cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01') def archive_output_mean(archive_stage): - for iens in range(1, exp.n_ens+1): - savedir = archive_stage+'/'+str(iens) - mkdir(savedir) - - copy(cluster.dartrundir+'/input.nml', archive_stage+'/input.nml') - - # filter_in = cluster.dartrundir+'/preassim_member_'+str(iens).zfill(4)+'.nc' - # filter_out = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4) + # for iens in range(1, exp.n_ens+1): + # savedir = archive_stage+'/'+str(iens) + # mkdir(savedir) + # filter_in = cluster.dartrundir+'/preassim_member_'+str(iens).zfill(4)+'.nc' + # filter_out = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4) # copy mean and sd to archive for f in ['output_mean.nc', 'output_sd.nc']: @@ -269,13 +273,13 @@ if __name__ == "__main__": for istage, obscfg in enumerate(exp.observations): kind = obscfg['kind'] - archive_stage = archive_time + '/stage'+str(istage)+'_'+kind + archive_stage = archive_time + '/assim_stage'+str(istage)+'_'+kind n_obs = obscfg['n_obs'] sat_channel = obscfg.get('sat_channel', False) obscfg['folder_obs_coords'] = archive_stage+'/obs_coords.pkl' set_DART_nml(sat_channel=sat_channel, - cov_loc=obscfg['cov_loc_radius_km'], + cov_loc_radius_km=obscfg['cov_loc_radius_km'], cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False)) use_error_parametrization = obscfg['err_std'] == False @@ -296,15 +300,17 @@ if __name__ == "__main__": generate_observations() assimilate() - archive_diagnostics(archive_stage) + archive_diagnostics(archive_stage, '/obs_seq.final') if istage < n_stages-1: # recirculation: filter output -> input recycle_output() + copy(cluster.dartrundir+'/input.nml', archive_stage+'/input.nml') archive_output_mean(archive_stage) elif istage == n_stages-1: # last assimilation, continue integration now + copy(cluster.dartrundir+'/input.nml', archive_stage+'/input.nml') pass # call update wrfinput from filteroutput later else: diff --git a/scripts/utils.py b/scripts/utils.py index 238325d..2b331b1 100755 --- a/scripts/utils.py +++ b/scripts/utils.py @@ -1,6 +1,10 @@ import os, sys, shutil, glob +import builtins as __builtin__ #copy = shutil.copy +def print(*args): + __builtin__.print(*args, flush=True) + def copy(src, dst): try: os.remove(dst) @@ -8,6 +12,12 @@ def copy(src, dst): pass shutil.copy(src, dst) +def try_remove(f): + try: + os.remove(f) + except: + pass + def mkdir(path): os.system('mkdir -p '+path) diff --git a/templates/input.prioronly.nml b/templates/input.prioronly.nml index 53ab2df..21d7de5 100644 --- a/templates/input.prioronly.nml +++ b/templates/input.prioronly.nml @@ -570,3 +570,4 @@ interp_test_vertcoord = 'VERTISHEIGHT' verbose = .false. / + -- GitLab