diff --git a/scheduler.py b/scheduler.py index 2c8e526209abc39ae22e9c34acfc98d90c15b9ff..1fdbf78edbaa68b39497ddfd7225f0df427de53c 100755 --- a/scheduler.py +++ b/scheduler.py @@ -8,7 +8,7 @@ import datetime as dt from slurmpy import Slurm from config.cfg import exp, cluster -from scripts.utils import script_to_str, symlink +from scripts.utils import script_to_str, symlink, copy # allow scripts to access the configuration @@ -28,30 +28,29 @@ def my_Slurm(*args, cfg_update=dict(), **kwargs): log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs) def backup_scripts(): - current = cluster.scriptsdir - main_a = cluster.scripts_rundir - # old_a = main_a+'/old/' os.makedirs(cluster.archivedir, exist_ok=True) - # def func(a, b, method): # call method if not link or directory - # if os.path.islink(a) or os.path.isdir(a): - # pass - # else: - # method(a, b) try: shutil.copytree(cluster.scriptsdir, cluster.scripts_rundir) except FileExistsError: pass except: raise + try: + copy(os.path.basename(__file__), cluster.scripts_rundir+'/') + except Exception as e: + warnings.warn(str(e)) -def prepare_wrfinput(init_time): +def prepare_WRFrundir(init_time): """Create WRF/run directories and wrfinput files """ - s = my_Slurm("prep_wrfinput", cfg_update={"time": "10", "mail-type": "BEGIN"}) - id = s.run(cluster.python+' '+cluster.scripts_rundir+'/prepare_wrfinput.py ' + s = my_Slurm("prep_wrfrundir", cfg_update={"time": "5", "mail-type": "BEGIN"}) + id = s.run(cluster.python+' '+cluster.scripts_rundir+'/prepare_wrfrundir.py ' +init_time.strftime('%Y-%m-%d_%H:%M')) + return id +def run_ideal(depends_on=None): + """Run ideal for every ensemble member""" cmd = """# run ideal.exe in parallel, then add geodata export SLURM_STEP_GRES=none for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) @@ -70,7 +69,7 @@ done """ s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens), "nodes": "1", "time": "10", "mem-per-cpu": "2G"}) - id = s.run(cmd, depends_on=[id]) + id = s.run(cmd, depends_on=[depends_on]) return id def update_wrfinput_from_archive(valid_time, background_init_time, exppath, depends_on=None): @@ -86,33 +85,49 @@ def update_wrfinput_from_archive(valid_time, background_init_time, exppath, depe +IC_path, depends_on=[depends_on]) return id -def wrfinput_insert_wbubble(depends_on=None): +def wrfinput_insert_wbubble(perturb=True, depends_on=None): """Given that directories with wrfinput files exist, update these wrfinput files with warm bubbles """ s = my_Slurm("ins_wbubble", cfg_update={"time": "5"}) - id = s.run(cluster.python+' '+cluster.scripts_rundir+'/create_wbubble_wrfinput.py', + pstr = ' ' + if perturb: + pstr = ' perturb' + id = s.run(cluster.python+' '+cluster.scripts_rundir+'/create_wbubble_wrfinput.py'+pstr, depends_on=[depends_on]) return id -def run_ENS(begin, end, depends_on=None, first_minute=True): +def run_ENS(begin, end, depends_on=None, first_minute=True, + input_is_restart=False, restart_path=False, output_restart_interval=720): """Run forecast for 1 minute, save output. Then run whole timespan with 5 minutes interval. + + if input_is_restart: # start WRF in restart mode + if restart_path: + # restart from a wrfrst file in restart_path directory + # e.g. when restarting from a state in an archivedir (from any existing experiment) + else: + # restart from wrfrst files that are already in run_WRF directories + # e.g. after updateIC, it puts wrfrst in run_WRF directories """ id = depends_on + restart_flag = '.false.' if not input_is_restart else '.true.' - if first_minute: + if False: # doesnt work with restarts at the moment# first_minute: # first minute forecast (needed for validating an assimilation) hist_interval = 1 radt = 1 # calc CFRAC also in first minute begin_plus1 = begin+dt.timedelta(minutes=1) s = my_Slurm("preWRF1", cfg_update=dict(time="2")) - id = s.run(' '.join([cluster.python, - cluster.scripts_rundir+'/prepare_namelist.py', - begin.strftime('%Y-%m-%d_%H:%M'), - begin_plus1.strftime('%Y-%m-%d_%H:%M'), - str(hist_interval), str(radt),]), - depends_on=[id]) + args = [cluster.python, cluster.scripts_rundir+'/prepare_namelist.py', + begin.strftime('%Y-%m-%d_%H:%M'), + begin_plus1.strftime('%Y-%m-%d_%H:%M'), + str(hist_interval), + '--radt='+str(radt), + '--restart='+restart_flag,] + if restart_path: # restart from a wrfrst file in restart_path directory + args.append('--rst_inname='+restart_path) + id = s.run(' '.join(args), depends_on=[id]) s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), "time": "2", "mem-per-cpu": "2G"}) @@ -129,16 +144,23 @@ def run_ENS(begin, end, depends_on=None, first_minute=True): # whole forecast timespan hist_interval = 5 radt = 5 + args = [cluster.python, + cluster.scripts_rundir+'/prepare_namelist.py', + begin.strftime('%Y-%m-%d_%H:%M'), + end.strftime('%Y-%m-%d_%H:%M'), + str(hist_interval), + '--radt='+str(radt), + '--restart='+restart_flag,] + if restart_path: + args.append('--rst_inname='+restart_path) + if output_restart_interval: + args.append('--restart_interval='+str(int(float(output_restart_interval)))) + s = my_Slurm("preWRF2", cfg_update=dict(time="2")) - id = s.run(' '.join([cluster.python, - cluster.scripts_rundir+'/prepare_namelist.py', - begin.strftime('%Y-%m-%d_%H:%M'), - end.strftime('%Y-%m-%d_%H:%M'), - str(hist_interval), str(radt),]), - depends_on=[id]) + id = s.run(' '.join(args), depends_on=[id]) time_in_simulation_hours = (end-begin).total_seconds()/3600 - runtime_wallclock_mins_expected = int(6+time_in_simulation_hours*10) # usually below 9 min/hour + runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9.5) # usually below 9 min/hour s = my_Slurm("runWRF2", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), "time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"}) cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname) @@ -151,7 +173,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True): return id def assimilate(assim_time, prior_init_time, prior_valid_time, - prior_path_exp=False, depends_on=None): + prior_path_exp=False, input_is_restart=False, depends_on=None): """Creates observations from a nature run and assimilates them. Args: @@ -168,26 +190,16 @@ def assimilate(assim_time, prior_init_time, prior_valid_time, elif not isinstance(prior_path_exp, str): raise TypeError('prior_path_exp either str or False, is '+str(type(prior_path_exp))) - - # # prepare prior model state - # s = my_Slurm("preAssim", cfg_update=dict(time="2")) - # id = s.run(cluster.python+' '+cluster.scripts_rundir+'/pre_assim.py ' - # +assim_time.strftime('%Y-%m-%d_%H:%M ') - # +prior_init_time.strftime('%Y-%m-%d_%H:%M ') - # +prior_valid_time.strftime('%Y-%m-%d_%H:%M ') - # +prior_path_exp, depends_on=[depends_on]) - - s = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "60", - "mem": "300G", "ntasks-per-node": "96", "ntasks-per-core": "2"}) - id = s.run(cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py ' + id = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "60", + "mem": "300G", "ntasks-per-node": "96", "ntasks-per-core": "2"} + ).run(cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py ' +assim_time.strftime('%Y-%m-%d_%H:%M ') +prior_init_time.strftime('%Y-%m-%d_%H:%M ') +prior_valid_time.strftime('%Y-%m-%d_%H:%M ') +prior_path_exp, depends_on=[depends_on]) - - s = my_Slurm("updateIC", cfg_update=dict(time="8")) - id = s.run(cluster.python+' '+cluster.scripts_rundir+'/update_wrfinput_from_filteroutput.py ' + id = my_Slurm("WRF_IC", cfg_update=dict(time="8") + ).run(cluster.python+' '+cluster.scripts_rundir+'/prepare_wrf_initials.py ' +assim_time.strftime('%Y-%m-%d_%H:%M ') +prior_init_time.strftime('%Y-%m-%d_%H:%M ') +prior_path_exp, depends_on=[id]) @@ -195,10 +207,11 @@ def assimilate(assim_time, prior_init_time, prior_valid_time, def create_satimages(init_time, depends_on=None): - s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "30", "nodes": "1"}) - s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/run_init.py '+cluster.archivedir - +init_time.strftime('/%Y-%m-%d_%H:%M/'), + s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "60", "nodes": "1"}) + id = s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/run_init.py '+cluster.archivedir + +init_time.strftime('/%Y-%m-%d_%H:%M/'), depends_on=[depends_on]) + return id def mailme(depends_on=None): if depends_on: @@ -230,8 +243,8 @@ def copy_to_jet(depends_on=None): if __name__ == "__main__": print('starting osse') - timedelta_integrate = dt.timedelta(minutes=15) - timedelta_btw_assim = dt.timedelta(minutes=15) + timedelta_integrate = dt.timedelta(minutes=30) + timedelta_btw_assim = dt.timedelta(minutes=30) backup_scripts() id = None @@ -239,9 +252,11 @@ if __name__ == "__main__": start_from_existing_state = True is_new_run = not start_from_existing_state + init_time = dt.datetime(2008, 7, 30, 10) + id = prepare_WRFrundir(init_time) + if is_new_run: - init_time = dt.datetime(2008, 7, 30, 9) - id = prepare_wrfinput(init_time) # create initial conditions + id = run_ideal(depends_on=id) id = wrfinput_insert_wbubble(depends_on=id) # spin up the ensemble @@ -253,17 +268,15 @@ if __name__ == "__main__": prior_path_exp = False # for next assimilation elif start_from_existing_state: - time = dt.datetime(2008, 7, 30, 11) - - # prior init time - init_time = dt.datetime(2008, 7, 30, 6) #prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_Pwbub_40mem' - #prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem' - prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_P1_40mem' + #prior_path_exp = cluster.archivedir # + prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem_rst' + #prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.18_Pwbub_PriorPert10_40mem' #id = update_wrfinput_from_archive(integration_end_time, init_time, prior_path_exp, depends_on=id) #id = wrfinput_insert_wbubble(depends_on=id) # values for assimilation + time = dt.datetime(2008, 7, 30, 11) assim_time = time prior_init_time = init_time @@ -271,22 +284,26 @@ if __name__ == "__main__": id = assimilate(assim_time, prior_init_time, - prior_valid_time=time+dt.timedelta(hours=2), + prior_valid_time=time, #+dt.timedelta(hours=2), prior_path_exp=prior_path_exp, + input_is_restart=True, depends_on=id) - prior_path_exp = False # use own exp path as prior + prior_path_exp = cluster.archivedir # use own exp path as prior # integration this_forecast_init = assim_time # start integration from here timedelta_integrate = timedelta_btw_assim - if this_forecast_init.minute in [0,30]: # longer forecast every full hour - timedelta_integrate = dt.timedelta(hours=2) + if this_forecast_init.minute in [0,]: # longer forecast every full hour + timedelta_integrate = dt.timedelta(hours=3) this_forecast_end = assim_time + timedelta_integrate id = run_ENS(begin=this_forecast_init, end=this_forecast_end, + input_is_restart=True, + restart_path=False, #cluster.archivedir+prior_init_time.strftime('/%Y-%m-%d_%H:%M/'), + output_restart_interval=30, #timedelta_btw_assim.total_seconds()/60, depends_on=id) create_satimages(this_forecast_init, depends_on=id)