diff --git a/scheduler.py b/scheduler.py index fa77d92f0002f5f2a9d8b66117309f65b009dfec..7c5ffc38d679f3ae620612706d6e0180723fbdbc 100755 --- a/scheduler.py +++ b/scheduler.py @@ -23,13 +23,21 @@ def my_Slurm(*args, cfg_update=dict(), **kwargs): """ return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update), **kwargs) -def slurm_submit(bashcmd, slurm_cfg_update=None, depends_on=None): - function_name = sys._getframe(1).f_code.co_name # magic - id = my_Slurm(function_name, cfg_update=slurm_cfg_update, **kwargs - ).run(bashcmd, depends_on=[depends_on]) +def slurm_submit(bashcmd, name=None, cfg_update=None, depends_on=None): + """Submit a 'workflow task'=script=job to the SLURM queue. + Args: + bashcmd (str): command to run (i.e. call to script) + name (str): SLURM job name (useful for debugging) + cfg_update (dict): enforce these SLURM parameters + depends_on (int): SLURM id; job starts as soon as this id has finished + Returns: + int : SLURM job id, can be used in `depends_on` of another `slurm_submit` call + """ + if name is None: # slurm job name = name of calling function + name = sys._getframe(1).f_code.co_name + id = my_Slurm(name, cfg_update=cfg_update).run(bashcmd, depends_on=depends_on) return id - def clear_logs(backup_existing_to_archive=True): dirs = ['/logs/', '/slurm-scripts/'] for d in dirs: @@ -46,11 +54,11 @@ def clear_logs(backup_existing_to_archive=True): def prepare_wrfinput(): """Create WRF/run directories and wrfinput files """ - s = my_Slurm("pre_osse", cfg_update={"time": "5", "mail-type": "BEGIN"}) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py') + # s = my_Slurm("pre_osse", cfg_update={"time": "5", "mail-type": "BEGIN"}) + # id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py') + id = slurm_submit(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py', + name='prep_wrfinput', cfg_update={"time": "5", "mail-type": "BEGIN"}) - s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_nens), "time": "10", - "mem-per-cpu": "2G"}) cmd = """# run ideal.exe in parallel, then add geodata export SLURM_STEP_GRES=none for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) @@ -69,7 +77,8 @@ do cp $rundir/wrfinput_d01 """+cluster.archivedir()+"""/wrfinput/$n/wrfinput_d01 done """ - id = s.run(cmd, depends_on=[id]) + id = slurm_submit(cmd, name="ideal", cfg_update={"ntasks": str(exp.n_ens), + "time": "10", "mem-per-cpu": "2G"}, depends_on=[id]) return id def update_wrfinput_from_archive(time, background_init_time, exppath, depends_on=None): @@ -112,7 +121,7 @@ def gen_synth_obs(time, depends_on=None): # prepare state of nature run, from which observation is sampled id = slurm_submit(cluster.python+' '+cluster.scriptsdir+'/prepare_nature.py ' - +time.strftime('%Y-%m-%d_%H:%M ') + str(channel_id), + +time.strftime('%Y-%m-%d_%H:%M'), name='prep_nature', cfg_update=dict(time="2"), depends_on=[depends_on]) for channel_id in exp.sat_channels: @@ -121,9 +130,9 @@ def gen_synth_obs(time, depends_on=None): +time.strftime('%Y-%m-%d_%H:%M ') + str(channel_id), depends_on=[id]) - s = my_Slurm("gensynth", cfg_update=dict(time="20")) - cmd = 'cd '+cluster.dartrundir+'; mpirun -np 24 ./perfect_model_obs; ' \ - + 'obs_seq.out >> obs_seq_all.out' # combine all observations + s = my_Slurm("gensynth", cfg_update=dict(ntasks="48", time="20")) + cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./perfect_model_obs; ' \ + + 'cat obs_seq.out >> obs_seq_all.out' # combine all observations id2 = s.run(cmd, depends_on=[id]) return id2 @@ -166,9 +175,9 @@ def mailme(depends_on=None): print('starting osse') -clear_logs(backup_existing_to_archive=True) +clear_logs(backup_existing_to_archive=False) -is_new_run = True +is_new_run = False if is_new_run: id = prepare_wrfinput() # create initial conditions @@ -180,18 +189,21 @@ if is_new_run: depends_on=id) time = integration_end_time else: + #id = prepare_wrfinput() # create initial conditions + id = None # get initial conditions from archive background_init_time = dt.datetime(2008, 7, 30, 10, 45) time = dt.datetime(2008, 7, 30, 11, 0) exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.11_LMU_filter' first_guess = exppath_arch - id = update_wrfinput_from_archive(time, background_init_time, exppath_arch) + #id = update_wrfinput_from_archive(time, background_init_time, exppath_arch, + # depends_on=id) # now, start the ensemble data assimilation cycles timedelta_integrate = dt.timedelta(minutes=15) timedelta_btw_assim = dt.timedelta(minutes=15) -while time < dt.datetime(2008, 7, 30, 14, 15): +while time < dt.datetime(2008, 7, 30, 16, 15): assim_time = time id = gen_synth_obs(assim_time, depends_on=id) @@ -200,7 +212,7 @@ while time < dt.datetime(2008, 7, 30, 14, 15): first_guess=first_guess, depends_on=id) - # first_guess = None # + first_guess = None background_init_time = assim_time # start integration from here integration_end_time = assim_time + timedelta_integrate diff --git a/scripts/create_obs_sat.py b/scripts/create_obs_sat.py index 40ed0166fe86d81cfe45972a1a6d1e751166d6c2..fd2b2258f65a92af1fa3d6f1009e848f55b6d87a 100644 --- a/scripts/create_obs_sat.py +++ b/scripts/create_obs_sat.py @@ -51,7 +51,7 @@ def run(time_dt, channel_id, n_obs, error_variance, output_path='./', # Brightness temperature or Reflectance? channel_id = int(channel_id) - if channel_id in [1, 2, 3, 11]: + if channel_id in [1, 2, 3, 12]: line_obstypedef = ' 256 MSG_4_SEVIRI_BDRF' else: line_obstypedef = ' 255 MSG_4_SEVIRI_TB' @@ -97,13 +97,17 @@ def run(time_dt, channel_id, n_obs, error_variance, output_path='./', lons = ds.XLONG_M.isel(Time=0).values lats = ds.XLAT_M.isel(Time=0).values n_obs_x = int(np.sqrt(n_obs)) + dx = int(len(ds.south_north)/n_obs_x) + skip = int(dx/2) for i in range(n_obs_x): for j in range(n_obs_x): - coords.append((lats[i,j], lons[i,j])) + coords.append((lats[skip+i*dx,skip+j*dx], + lons[skip+i*dx,skip+j*dx])) try: import pickle + os.makedirs(os.path.dirname(fpath_obs_locations), exist_ok=True) with open(fpath_obs_locations, 'wb') as f: pickle.dump(coords, f); print(fpath_obs_locations, 'saved.') except Exception as e: @@ -125,16 +129,14 @@ obs_kind_definitions num_obs: """+n_obs_str+" max_num_obs: "+n_obs_str+""" first: 1 last: """+n_obs_str - for i_obs in range(int(n_obs)): - # data + for i_obs in range(1, int(n_obs)+1): - lon = coords[i_obs][1] - lat = coords[i_obs][0] + lon = coords[i_obs-1][1] + lat = coords[i_obs-1][0] lon_rad = str(degr_to_rad(lon)) lat_rad = str(degr_to_rad(lat)) - # compile text if i_obs < int(n_obs): msg += """ diff --git a/scripts/pre_gen_synth_obs.py b/scripts/pre_gen_synth_obs.py index e8a8da9d08b139aee1e5dc49db984295c3495c94..45cf176fc1c2767b092892b72b5c704dfe2c6e83 100755 --- a/scripts/pre_gen_synth_obs.py +++ b/scripts/pre_gen_synth_obs.py @@ -11,7 +11,12 @@ channel_id = int(sys.argv[2]) copy(cluster.scriptsdir+'/../templates/input.nml', cluster.dartrundir+'/input.nml') sed_inplace(cluster.dartrundir+'/input.nml', '<n_ens>', str(int(exp.n_ens))) -append_file(cluster.dartrundir+'/input.nml', cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml') + +if channel_id in [1, 2, 3, 12]: + rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml' +else: + rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml' +append_file(cluster.dartrundir+'/input.nml', rttov_nml) # prepare observation file create_obs_sat.run(time, channel_id, exp.n_obs, exp.error_variance,