From 9401b627184d1688bf9eea11698494c5b275243b Mon Sep 17 00:00:00 2001 From: lkugler <lukas.kugler@gmail.com> Date: Thu, 19 Jan 2023 18:11:04 +0100 Subject: [PATCH] more explicit slurm --- scheduler.py | 48 ++++++++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/scheduler.py b/scheduler.py index 31c4bdd..ba4d092 100755 --- a/scheduler.py +++ b/scheduler.py @@ -22,16 +22,20 @@ class Shellslurm(): print(args[0]) os.system(args[0]) -def my_Slurm(*args, cfg_update=dict(), **kwargs): +def run_job(*args, cfg_update=dict(), with_slurm=True, **kwargs): """Shortcut to slurmpy's class; keep certain default kwargs and only update some with kwarg `cfg_update` see https://github.com/brentp/slurmpy + + with_slurm (bool) : if True, use SLURM, else run locally + """ - debug = False # run without SLURM, locally on headnode - if debug: - return Shellslurm(*args) - return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update), + if with_slurm: + return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update), log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs) + else: + return Shellslurm(*args) + @@ -75,7 +79,7 @@ do mv $rundir/rsl.out.0000 $rundir/rsl.out.input done """ - s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens), + s = run_job("ideal", cfg_update={"ntasks": str(exp.n_ens), "time": "10", "mem": "100G"}) id = s.run(cmd, depends_on=[depends_on]) return id @@ -84,7 +88,7 @@ def wrfinput_insert_wbubble(perturb=True, depends_on=None): """Given that directories with wrfinput files exist, update these wrfinput files with warm bubbles """ - s = my_Slurm("ins_wbubble", cfg_update={"time": "5"}) + s = run_job("ins_wbubble", cfg_update={"time": "5"}) pstr = ' ' if perturb: pstr = ' perturb' @@ -107,7 +111,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True, # hist_interval = 1 # radt = 1 # calc CFRAC also in first minute # begin_plus1 = begin+dt.timedelta(minutes=1) - # s = my_Slurm("preWRF1", cfg_update=dict(time="2")) + # s = run_job("preWRF1", cfg_update=dict(time="2")) # args = [cluster.python, cluster.scripts_rundir+'/prepare_namelist.py', # begin.strftime('%Y-%m-%d_%H:%M'), # begin_plus1.strftime('%Y-%m-%d_%H:%M'), @@ -116,13 +120,13 @@ def run_ENS(begin, end, depends_on=None, first_minute=True, # '--restart='+restart_flag,] # id = s.run(' '.join(args), depends_on=[id]) - # s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), + # s = run_job("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), # "time": "2", "mem-per-cpu": "2G"}) # cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname) # id = s.run(cmd, depends_on=[id]) # # apply forward operator (DART filter without assimilation) - # s = my_Slurm("fwOP-1m", cfg_update=dict(time="10", ntasks=48)) + # s = run_job("fwOP-1m", cfg_update=dict(time="10", ntasks=48)) # id = s.run(cluster.python+' '+cluster.scripts_rundir+'/apply_obs_op_dart.py ' # + begin.strftime('%Y-%m-%d_%H:%M')+' ' # + begin_plus1.strftime('%Y-%m-%d_%H:%M'), @@ -141,12 +145,12 @@ def run_ENS(begin, end, depends_on=None, first_minute=True, if output_restart_interval: args.append('--restart_interval='+str(int(float(output_restart_interval)))) - s = my_Slurm("preWRF", cfg_update=dict(time="2")) + s = run_job("preWRF", cfg_update=dict(time="2")) id = s.run(' '.join(args), depends_on=[id]) time_in_simulation_hours = (end-begin).total_seconds()/3600 runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9.5) # usually below 9 min/hour - s = my_Slurm("WRF", cfg_update={"array": "1-"+str(exp.n_nodes), "ntasks": "10", "nodes": "1", + s = run_job("WRF", cfg_update={"array": "1-"+str(exp.n_nodes), "ntasks": "10", "nodes": "1", "time": str(runtime_wallclock_mins_expected), "mem": "140G"}) cmd = script_to_str(cluster.run_WRF).replace('<exp.expname>', exp.expname ).replace('<cluster.wrf_rundir_base>', cluster.wrf_rundir_base) @@ -166,7 +170,7 @@ def assimilate(assim_time, prior_init_time, prior_valid_time, prior_path_exp, if not os.path.exists(prior_path_exp): raise IOError('prior_path_exp does not exist: '+prior_path_exp) - id = my_Slurm("Assim", cfg_update={"ntasks": "12", "time": "60", + id = run_job("Assim", cfg_update={"ntasks": "12", "time": "60", "mem": "200G", "ntasks-per-node": "12", "ntasks-per-core": "2"} ).run(cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py ' +assim_time.strftime('%Y-%m-%d_%H:%M ') @@ -183,7 +187,7 @@ def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, new else: tnew = '' - id = my_Slurm("IC-prior", cfg_update=dict(time="8") + id = run_job("IC-prior", cfg_update=dict(time="8") ).run(cluster.python+' '+cluster.scripts_rundir+'/prep_IC_prior.py ' +prior_path_exp +prior_init_time.strftime(' %Y-%m-%d_%H:%M') @@ -193,14 +197,14 @@ def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, new def update_IC_from_DA(assim_time, depends_on=None): - id = my_Slurm("IC-update", cfg_update=dict(time="8") + id = run_job("IC-update", cfg_update=dict(time="8") ).run(cluster.python+' '+cluster.scripts_rundir+'/update_IC.py ' +assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on]) return id def create_satimages(init_time, depends_on=None): - s = my_Slurm("RTTOV", cfg_update={"ntasks": "12", "time": "80", "mem": "200G"}) + s = run_job("RTTOV", cfg_update={"ntasks": "12", "time": "80", "mem": "200G"}) id = s.run(cluster.python_verif+' ~/RTTOV-WRF/run_init.py '+cluster.archivedir +init_time.strftime('/%Y-%m-%d_%H:%M/'), depends_on=[depends_on]) @@ -209,31 +213,31 @@ def create_satimages(init_time, depends_on=None): def mailme(depends_on=None): if depends_on: - s = my_Slurm("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"}) + s = run_job("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"}) s.run('sleep 1', depends_on=[depends_on]) def gen_obsseq(depends_on=None): - s = my_Slurm("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"}) + s = run_job("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"}) id = s.run(cluster.python+' '+cluster.scripts_rundir+'/obsseq_to_netcdf.py', depends_on=[depends_on]) return id def verify_sat(depends_on=None): - s = my_Slurm("verif-SAT-"+exp.expname, cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "20", + s = run_job("verif-SAT-"+exp.expname, cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "20", "ntasks-per-node": "20", "ntasks-per-core": "1", "mem": "100G",}) cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node sat verif1d FSS BS' s.run(cmd, depends_on=[depends_on]) def verify_wrf(depends_on=None): - s = my_Slurm("verif-WRF-"+exp.expname, cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "20", + s = run_job("verif-WRF-"+exp.expname, cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "20", "ntasks-per-node": "20", "ntasks-per-core": "1", "mem": "250G"}) cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node wrf verif1d FSS BS' s.run(cmd, depends_on=[depends_on]) def verify_fast(depends_on=None): - s = my_Slurm("verif-fast-"+exp.expname, cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1", + s = run_job("verif-fast-"+exp.expname, cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1", "ntasks-per-node": "1", "ntasks-per-core": "1"}) cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_fast/plot_single_exp.py '+exp.expname s.run(cmd, depends_on=[depends_on]) @@ -307,7 +311,7 @@ if __name__ == "__main__": # as we have WRF output, we can use own exp path as prior prior_path_exp = cluster.archivedir - + id_sat = create_satimages(time, depends_on=id) # increment time -- GitLab