From e31441f4e12443fde3d448386d53277b65f950d8 Mon Sep 17 00:00:00 2001 From: Kugler Lukas <lukas.kugler@univie.ac.at> Date: Thu, 8 Oct 2020 10:39:01 +0200 Subject: [PATCH] update obsseq generation, cloud dep. error, etc --- scheduler.py | 120 ++++++--- .../{create_obs_sat.py => create_obsseq.py} | 228 ++++++++++++++---- scripts/pre_gen_synth_obs.py | 28 --- 3 files changed, 266 insertions(+), 110 deletions(-) rename scripts/{create_obs_sat.py => create_obsseq.py} (50%) mode change 100644 => 100755 delete mode 100755 scripts/pre_gen_synth_obs.py diff --git a/scheduler.py b/scheduler.py index 7c5ffc3..34a9f91 100755 --- a/scheduler.py +++ b/scheduler.py @@ -23,6 +23,14 @@ def my_Slurm(*args, cfg_update=dict(), **kwargs): """ return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update), **kwargs) +class Cmdline(object): + def __init__(self, name, cfg_update): + self.name = name + + def run(self, cmd, **kwargs): + print('running', self.name, 'without SLURM') + os.system(cmd) + def slurm_submit(bashcmd, name=None, cfg_update=None, depends_on=None): """Submit a 'workflow task'=script=job to the SLURM queue. Args: @@ -73,8 +81,6 @@ for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) do rundir="""+cluster.userdir+'/run_WRF/'+exp.expname+"""/$n mv $rundir/rsl.out.0000 $rundir/rsl.out.input - mkdir -p """+cluster.archivedir()+"""/wrfinput/$n - cp $rundir/wrfinput_d01 """+cluster.archivedir()+"""/wrfinput/$n/wrfinput_d01 done """ id = slurm_submit(cmd, name="ideal", cfg_update={"ntasks": str(exp.n_ens), @@ -94,7 +100,7 @@ def update_wrfinput_from_archive(time, background_init_time, exppath, depends_on +IC_path, depends_on=[depends_on]) return id -def run_ENS(begin, end, depends_on=None, **kwargs): +def run_ENS(begin, end, depends_on=None): prev_id = depends_on s = my_Slurm("preWRF", cfg_update=dict(time="2")) @@ -104,7 +110,7 @@ def run_ENS(begin, end, depends_on=None, **kwargs): depends_on=[prev_id]) runtime_real_hours = (end-begin).total_seconds()/3600 - runtime_wallclock_mins_expected = int(11+runtime_real_hours*10) # usually below 8 min/hour + runtime_wallclock_mins_expected = int(5+runtime_real_hours*9) # usually below 8 min/hour s = my_Slurm("runWRF", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), "time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"}) cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname) @@ -138,44 +144,85 @@ def gen_synth_obs(time, depends_on=None): def assimilate(assim_time, background_init_time, - first_guess=None, depends_on=None, **kwargs): - prev_id = depends_on + prior_from_different_exp=False, depends_on=None): + """Run the assimilation process. + + Expects a obs_seq.out file present in `dartrundir` + + Args: + assim_time (dt.datetime): for timestamp of prior wrfout files + background_init_time (dt.datetime): for directory of prior wrfout files + prior_from_different_exp (bool or str): + put a `str` if you want to take the prior from a different experiment + if False: use `archivedir` to get prior state + if str: use this directory to get prior state + """ + id = depends_on + + if prior_from_different_exp: + prior_expdir = prior_from_different_exp + else: + prior_expdir = cluster.archivedir() + + # prepare state of nature run, from which observation is sampled + id = slurm_submit(cluster.python+' '+cluster.scriptsdir+'/prepare_nature.py ' + +time.strftime('%Y-%m-%d_%H:%M'), name='prep_nature', + cfg_update=dict(time="2"), depends_on=[depends_on]) - if first_guess is None: - first_guess = cluster.archivedir() + #s = my_Slurm("gensynth", cfg_update=dict(ntasks="48", time="20")) + #cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./perfect_model_obs; ' \ + # + 'cat obs_seq.out >> obs_seq_all.out' # combine all observations + #id2 = s.run(cmd, depends_on=[id]) + # prepare prior model state s = my_Slurm("preAssim", cfg_update=dict(time="2")) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/pre_assim.py ' \ + id = s.run(cluster.python+' '+cluster.scriptsdir+'/pre_assim.py ' +assim_time.strftime('%Y-%m-%d_%H:%M ') +background_init_time.strftime('%Y-%m-%d_%H:%M ') - +first_guess, - depends_on=[prev_id]) - - s = my_Slurm("Assim", cfg_update=dict(time="50", mem="200G")) + +prior_expdir, + depends_on=[id]) + + # generate observations + s = my_Slurm("gensynthobs", cfg_update=dict(ntasks="48", time="10")) + id = s.run(cluster.python+' '+cluster.scriptsdir+'/gen_synth_obs.py ' + +time.strftime('%Y-%m-%d_%H:%M'), + depends_on=[id]) + + # actuall assimilation step + s = my_Slurm("Assim", cfg_update=dict(ntasks="48", time="50", mem="200G")) cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./filter; rm obs_seq_all.out' - id2 = s.run(cmd, depends_on=[id]) + id = s.run(cmd, depends_on=[id]) s = my_Slurm("archiveAssim", cfg_update=dict(time="10")) - id3 = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py ' - + assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2]) + id = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py ' + + assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id]) + + s = my_Slurm("updateIC", cfg_update=dict(time="8")) + id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_filteroutput.py ' + +assim_time.strftime('%Y-%m-%d_%H:%M ') + +background_init_time.strftime('%Y-%m-%d_%H:%M ') + +prior_expdir, depends_on=[id]) + return id - s = my_Slurm("updateIC", cfg_update=dict(time="3")) - id4 = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_filteroutput.py ' - +assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id3]) - return id4 + +def create_satimages(depends_on=None): + s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "20"}) + s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/loop.py '+exp.expname, + depends_on=[depends_on]) def mailme(depends_on=None): - id = depends_on - if id: + if depends_on: s = my_Slurm("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"}) - s.run('sleep 1', depends_on=[id]) + s.run('sleep 1', depends_on=[depends_on]) ################################ print('starting osse') +timedelta_integrate = dt.timedelta(minutes=30) +timedelta_btw_assim = dt.timedelta(minutes=30) -clear_logs(backup_existing_to_archive=False) +clear_logs(backup_existing_to_archive=True) is_new_run = False if is_new_run: @@ -188,38 +235,35 @@ if is_new_run: end=integration_end_time, depends_on=id) time = integration_end_time + first_guess = False else: - #id = prepare_wrfinput() # create initial conditions + # id = prepare_wrfinput() # create initial conditions id = None # get initial conditions from archive - background_init_time = dt.datetime(2008, 7, 30, 10, 45) - time = dt.datetime(2008, 7, 30, 11, 0) + background_init_time = dt.datetime(2008, 7, 30, 10) + time = dt.datetime(2008, 7, 30, 10,30) exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.11_LMU_filter' - first_guess = exppath_arch + first_guess = False #exppath_arch #id = update_wrfinput_from_archive(time, background_init_time, exppath_arch, # depends_on=id) -# now, start the ensemble data assimilation cycles -timedelta_integrate = dt.timedelta(minutes=15) -timedelta_btw_assim = dt.timedelta(minutes=15) - -while time < dt.datetime(2008, 7, 30, 16, 15): +while time <= dt.datetime(2008, 7, 30, 16): assim_time = time - id = gen_synth_obs(assim_time, depends_on=id) + #id = gen_synth_obs(assim_time, depends_on=id) id = assimilate(assim_time, background_init_time, - first_guess=first_guess, + prior_from_different_exp=first_guess, depends_on=id) - - first_guess = None + first_guess = False background_init_time = assim_time # start integration from here integration_end_time = assim_time + timedelta_integrate id = run_ENS(begin=background_init_time, end=integration_end_time, depends_on=id) - time += timedelta_btw_assim + create_satimages(depends_on=id) + mailme(id) diff --git a/scripts/create_obs_sat.py b/scripts/create_obsseq.py old mode 100644 new mode 100755 similarity index 50% rename from scripts/create_obs_sat.py rename to scripts/create_obsseq.py index fd2b225..58a7938 --- a/scripts/create_obs_sat.py +++ b/scripts/create_obsseq.py @@ -6,6 +6,13 @@ import numpy as np import datetime as dt from pysolar.solar import get_altitude, get_azimuth +# position on earth to calculate solar angles +lat0 = 45. +lon0 = 0. + +sat_az = "180.0" +sat_zen = "45.0" + def degr_to_rad(degr): """Convert to DART convention = radians""" if degr < 0: @@ -19,55 +26,34 @@ def add_timezone_UTC(t): return dt.datetime(t.year, t.month, t.day, t.hour, t.minute, tzinfo=dt.timezone.utc) def get_dart_date(time_dt): + # assumes input is UTC! + #try: + # time_dt = add_timezone_UTC(time_dt) + #except: + # time_dt = time_dt.replace(tzinfo=dt.timezone.utc) # overwrites existing timezone! days_since_010120 = 145731 ref_day = dt.datetime(2000,1,1, tzinfo=dt.timezone.utc) dart_date_day = str((time_dt - ref_day).days + days_since_010120) secs_thatday = str(int((time_dt - round_to_day(time_dt)).total_seconds())) return dart_date_day, secs_thatday - -def run(time_dt, channel_id, n_obs, error_variance, output_path='./', - fpath_obs_locations=False): - """Create obs_seq.in +def calc_obs_locations(n_obs, distance_between_obs_meters, + coords_from_domaincenter=True, fpath_obs_locations=False): + """Calculate coordinate pairs for locations Args: - time_dt (dt.datetime): time of observation - channel_id (int): SEVIRI channel number - see https://nwp-saf.eumetsat.int/downloads/rtcoef_rttov12/ir_srf/rtcoef_msg_4_seviri_srf.html n_obs (int): number of observations (must be a square of an integer: 4, 9, 1000, ...) - error_variance (float): - gaussian error with this variance is added to the truth at observation time - output_path (str): directory where `obs_seq.in` will be saved fpath_obs_locations (False or str): write an obs_coords.pkl, can be used to check observation locations if str, write to file - """ - time_dt = add_timezone_UTC(time_dt) - # time_dt = dt.datetime(2008, 7, 30, 15, 30, tzinfo=dt.timezone.utc) - assert n_obs == int(n_obs) - n_obs_str = str(int(n_obs)) - error_variance = str(error_variance) - - # Brightness temperature or Reflectance? - channel_id = int(channel_id) - if channel_id in [1, 2, 3, 12]: - line_obstypedef = ' 256 MSG_4_SEVIRI_BDRF' - else: - line_obstypedef = ' 255 MSG_4_SEVIRI_TB' - channel_id = str(channel_id) - - # position on earth to calculate solar angles - lat0 = 45. - lon0 = 0. - sat_az = "180.0" - sat_zen = "45.0" + Returns: + list of (lat, lon) tuples + """ radius_earth_meters = 6.371*1E6 - distance_between_obs_meters = 40000 coords = [] - coords_from_domaincenter = False if coords_from_domaincenter: """ Create equally spaced grid for satellite observations every 4 km @@ -106,13 +92,53 @@ def run(time_dt, channel_id, n_obs, error_variance, output_path='./', lons[skip+i*dx,skip+j*dx])) try: - import pickle - os.makedirs(os.path.dirname(fpath_obs_locations), exist_ok=True) - with open(fpath_obs_locations, 'wb') as f: - pickle.dump(coords, f); print(fpath_obs_locations, 'saved.') + if fpath_obs_locations: + import pickle + os.makedirs(os.path.dirname(fpath_obs_locations), exist_ok=True) + with open(fpath_obs_locations, 'wb') as f: + pickle.dump(coords, f); print(fpath_obs_locations, 'saved.') except Exception as e: warnings.warn(str(e)) + assert len(coords) == n_obs, (len(coords), n_obs) + return coords + +def sat(time_dt, channel_id, n_obs, error_var, distance_between_obs_meters, + output_path='./', fpath_obs_locations=False): + """Create obs_seq.in + + Args: + time_dt (dt.datetime): time of observation + channel_id (int): SEVIRI channel number + see https://nwp-saf.eumetsat.int/downloads/rtcoef_rttov12/ir_srf/rtcoef_msg_4_seviri_srf.html + n_obs (int): + number of observations squared (must be a square of an integer: 4, 9, 1000, ...) + error_var (float): + gaussian error with this variance is added to the truth at observation time + output_path (str): directory where `obs_seq.in` will be saved + fpath_obs_locations (False or str): + write an obs_coords.pkl, can be used to check observation locations + if str, write to file + """ + # time_dt = add_timezone_UTC(time_dt) + # time_dt = dt.datetime(2008, 7, 30, 15, 30, tzinfo=dt.timezone.utc) + assert n_obs == int(n_obs) + error_var = str(error_var) + + # Brightness temperature or Reflectance? + channel_id = int(channel_id) + if channel_id in [1, 2, 3, 12]: + line_obstypedef = ' 256 MSG_4_SEVIRI_BDRF' + code = '256' + else: + line_obstypedef = ' 255 MSG_4_SEVIRI_TB' + code = '255' + channel_id = str(channel_id) + + coords = calc_obs_locations(n_obs, distance_between_obs_meters, + coords_from_domaincenter=False, + fpath_obs_locations=False) + time_dt = add_timezone_UTC(time_dt) sun_az = str(get_azimuth(lat0, lon0, time_dt)) sun_zen = str(90. - get_altitude(lat0, lon0, time_dt)) print('sunzen', sun_zen, 'sunazi', sun_az) @@ -120,6 +146,9 @@ def run(time_dt, channel_id, n_obs, error_variance, output_path='./', dart_date_day, secs_thatday = get_dart_date(time_dt) print('secs, days:', secs_thatday, dart_date_day) + n_obs_str = str(int(n_obs)) + error_var = str(error_var) + msg = """ obs_sequence obs_kind_definitions @@ -146,7 +175,7 @@ obdef loc3d """+lon_rad+""" """+lat_rad+""" -888888.0000000000 -2 kind - 256 + """+code+""" visir """+sat_az+""" """+sat_zen+""" """+sun_az+""" """+sun_zen+""" @@ -154,7 +183,7 @@ kind -888888.000000000 1 """+secs_thatday+""" """+dart_date_day+""" - """+error_variance + """+error_var if i_obs == int(n_obs): # last_observation # compile text msg += """ @@ -164,7 +193,7 @@ obdef loc3d """+lon_rad+""" """+lat_rad+""" -888888.0000000000 -2 kind - 256 + """+code+""" visir """+sat_az+""" """+sat_zen+""" """+sun_az+""" """+sun_zen+""" @@ -172,7 +201,107 @@ kind -888888.000000000 1 """+secs_thatday+""" """+dart_date_day+""" - """+error_variance + """+error_var + + fpath = output_path+'/obs_seq.in' + try: + os.remove(fpath) + except OSError: + pass + + with open(fpath, 'w') as f: + f.write(msg) + print(fpath, 'saved.') + + +def gen_coords(n_obs, distance_between_obs_meters, heights, + coords_from_domaincenter=True, fpath_obs_locations=False): + + coords = calc_obs_locations(n_obs, distance_between_obs_meters, + coords_from_domaincenter=True, fpath_obs_locations=False) + + # append height + coords2 = [] + for i in range(len(coords)): + for hgt_m in heights: + coords2.append(coords[i] + (hgt_m,)) + + n_obs = len(coords2) + print('effective number of observations (with vertical levels):', n_obs, ' on each level:', len(coords)) + return coords2 + + +def generic_obs(obs_type, time_dt, n_obs, error_var, distance_between_obs_meters, + output_path='./', fpath_obs_locations=False): + + obs_codes = {'RASO_T': {'name': 'RADIOSONDE_TEMPERATURE', 'nr': '5'}, + 'RADAR': {'name': 'RADAR_REFLECTIVITY', 'nr': '37'} + } + + coords_from_domaincenter = True + heights = np.arange(1000, 15001, 1000) + + coords = gen_coords(n_obs, distance_between_obs_meters, heights, + coords_from_domaincenter=True, fpath_obs_locations=False) + + dart_date_day, secs_thatday = get_dart_date(time_dt) + print('secs, days:', secs_thatday, dart_date_day) + + obs_name = obs_codes[obs_type]['name'] + obs_kind_nr = obs_codes[obs_type]['nr'] + + write_generic_obsseq(obs_name, obs_kind_nr, error_var, coords, + dart_date_day, secs_thatday, output_path) + + +def write_generic_obsseq(obs_name, obs_kind_nr, error_var, coords, + dart_date_day, secs_thatday, output_path): + n_obs_str = str(int(n_obs)) + error_var = str(error_var) + line_obstypedef = obs_kind_nr+' '+obs_name + + msg = """ + obs_sequence +obs_kind_definitions + 1 + """+line_obstypedef+""" + num_copies: 0 num_qc: 0 + num_obs: """+n_obs_str+" max_num_obs: "+n_obs_str+""" + first: 1 last: """+n_obs_str + + for i_obs in range(1, int(n_obs)+1): + + lon = coords[i_obs-1][1] + lat = coords[i_obs-1][0] + hgt_m = str(coords[i_obs-1][2])+'.000' + + lon_rad = str(degr_to_rad(lon)) + lat_rad = str(degr_to_rad(lat)) + + # compile text + if i_obs < int(n_obs): + msg += """ + OBS """+str(i_obs)+""" + -1 """+str(i_obs+1)+""" -1 +obdef +loc3d + """+lon_rad+""" """+lat_rad+""" """+hgt_m+""" 3 +kind + """+obs_kind_nr+""" + """+secs_thatday+""" """+dart_date_day+""" + """+error_var + if i_obs == int(n_obs): # last_observation + # compile text + msg += """ + OBS """+str(i_obs)+""" + """+str(i_obs-1)+""" -1 -1 +obdef +loc3d + """+lon_rad+""" """+lat_rad+""" """+hgt_m+""" 3 +kind + """+obs_kind_nr+""" + """+secs_thatday+""" """+dart_date_day+""" + """+error_var fpath = output_path+'/obs_seq.in' try: @@ -185,9 +314,20 @@ kind print(fpath, 'saved.') if __name__ == '__main__': - time_dt = dt.datetime(2008, 7, 30, 15, 30, tzinfo=dt.timezone.utc) + time_dt = dt.datetime(2008, 7, 30, 15, 30) n_obs = 100 channel_id = 1 - error_variance = 0.001 - run(time_dt, channel_id, n_obs, error_variance, output_path='./', - fpath_obs_locations='./domain.pkl') + + distance_between_obs_meters = 10000 + + # error_var = 0.001 + # sat(time_dt, channel_id, n_obs, error_var, distance_between_obs_meters, + # output_path='./', fpath_obs_locations='./domain.pkl') + + error_var = (5.)**2 + generic_obs('RADAR', time_dt, n_obs, error_var, distance_between_obs_meters, + output_path='./', fpath_obs_locations='./domain.pkl') + + # error_var = (0.5)**2 + # generic_obs('RASO_T', time_dt, n_obs, error_var, distance_between_obs_meters, + # output diff --git a/scripts/pre_gen_synth_obs.py b/scripts/pre_gen_synth_obs.py deleted file mode 100755 index 45cf176..0000000 --- a/scripts/pre_gen_synth_obs.py +++ /dev/null @@ -1,28 +0,0 @@ -import os, sys, shutil -import datetime as dt -from config.cfg import exp, cluster -from utils import symlink, copy, sed_inplace, append_file -import create_obs_sat - -time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') -channel_id = int(sys.argv[2]) - -# ensure correct input.nml -copy(cluster.scriptsdir+'/../templates/input.nml', - cluster.dartrundir+'/input.nml') -sed_inplace(cluster.dartrundir+'/input.nml', '<n_ens>', str(int(exp.n_ens))) - -if channel_id in [1, 2, 3, 12]: - rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml' -else: - rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml' -append_file(cluster.dartrundir+'/input.nml', rttov_nml) - -# prepare observation file -create_obs_sat.run(time, channel_id, exp.n_obs, exp.error_variance, - output_path=cluster.dartrundir, - fpath_obs_locations=cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M') - +'/obs_coords_id'+str(channel_id)+'.pkl') - -if not os.path.exists(cluster.dartrundir+'/obs_seq.in'): - raise RuntimeError('obs_seq.in does not exist in '+cluster.dartrundir) -- GitLab