Skip to content
Snippets Groups Projects
Commit 9401b627 authored by lkugler's avatar lkugler
Browse files

more explicit slurm

parent d6d77dbd
No related branches found
No related tags found
No related merge requests found
...@@ -22,16 +22,20 @@ class Shellslurm(): ...@@ -22,16 +22,20 @@ class Shellslurm():
print(args[0]) print(args[0])
os.system(args[0]) os.system(args[0])
def my_Slurm(*args, cfg_update=dict(), **kwargs): def run_job(*args, cfg_update=dict(), with_slurm=True, **kwargs):
"""Shortcut to slurmpy's class; keep certain default kwargs """Shortcut to slurmpy's class; keep certain default kwargs
and only update some with kwarg `cfg_update` and only update some with kwarg `cfg_update`
see https://github.com/brentp/slurmpy see https://github.com/brentp/slurmpy
with_slurm (bool) : if True, use SLURM, else run locally
""" """
debug = False # run without SLURM, locally on headnode if with_slurm:
if debug: return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update),
return Shellslurm(*args)
return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update),
log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs) log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs)
else:
return Shellslurm(*args)
...@@ -75,7 +79,7 @@ do ...@@ -75,7 +79,7 @@ do
mv $rundir/rsl.out.0000 $rundir/rsl.out.input mv $rundir/rsl.out.0000 $rundir/rsl.out.input
done done
""" """
s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens), s = run_job("ideal", cfg_update={"ntasks": str(exp.n_ens),
"time": "10", "mem": "100G"}) "time": "10", "mem": "100G"})
id = s.run(cmd, depends_on=[depends_on]) id = s.run(cmd, depends_on=[depends_on])
return id return id
...@@ -84,7 +88,7 @@ def wrfinput_insert_wbubble(perturb=True, depends_on=None): ...@@ -84,7 +88,7 @@ def wrfinput_insert_wbubble(perturb=True, depends_on=None):
"""Given that directories with wrfinput files exist, """Given that directories with wrfinput files exist,
update these wrfinput files with warm bubbles update these wrfinput files with warm bubbles
""" """
s = my_Slurm("ins_wbubble", cfg_update={"time": "5"}) s = run_job("ins_wbubble", cfg_update={"time": "5"})
pstr = ' ' pstr = ' '
if perturb: if perturb:
pstr = ' perturb' pstr = ' perturb'
...@@ -107,7 +111,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True, ...@@ -107,7 +111,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True,
# hist_interval = 1 # hist_interval = 1
# radt = 1 # calc CFRAC also in first minute # radt = 1 # calc CFRAC also in first minute
# begin_plus1 = begin+dt.timedelta(minutes=1) # begin_plus1 = begin+dt.timedelta(minutes=1)
# s = my_Slurm("preWRF1", cfg_update=dict(time="2")) # s = run_job("preWRF1", cfg_update=dict(time="2"))
# args = [cluster.python, cluster.scripts_rundir+'/prepare_namelist.py', # args = [cluster.python, cluster.scripts_rundir+'/prepare_namelist.py',
# begin.strftime('%Y-%m-%d_%H:%M'), # begin.strftime('%Y-%m-%d_%H:%M'),
# begin_plus1.strftime('%Y-%m-%d_%H:%M'), # begin_plus1.strftime('%Y-%m-%d_%H:%M'),
...@@ -116,13 +120,13 @@ def run_ENS(begin, end, depends_on=None, first_minute=True, ...@@ -116,13 +120,13 @@ def run_ENS(begin, end, depends_on=None, first_minute=True,
# '--restart='+restart_flag,] # '--restart='+restart_flag,]
# id = s.run(' '.join(args), depends_on=[id]) # id = s.run(' '.join(args), depends_on=[id])
# s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), # s = run_job("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
# "time": "2", "mem-per-cpu": "2G"}) # "time": "2", "mem-per-cpu": "2G"})
# cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname) # cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
# id = s.run(cmd, depends_on=[id]) # id = s.run(cmd, depends_on=[id])
# # apply forward operator (DART filter without assimilation) # # apply forward operator (DART filter without assimilation)
# s = my_Slurm("fwOP-1m", cfg_update=dict(time="10", ntasks=48)) # s = run_job("fwOP-1m", cfg_update=dict(time="10", ntasks=48))
# id = s.run(cluster.python+' '+cluster.scripts_rundir+'/apply_obs_op_dart.py ' # id = s.run(cluster.python+' '+cluster.scripts_rundir+'/apply_obs_op_dart.py '
# + begin.strftime('%Y-%m-%d_%H:%M')+' ' # + begin.strftime('%Y-%m-%d_%H:%M')+' '
# + begin_plus1.strftime('%Y-%m-%d_%H:%M'), # + begin_plus1.strftime('%Y-%m-%d_%H:%M'),
...@@ -141,12 +145,12 @@ def run_ENS(begin, end, depends_on=None, first_minute=True, ...@@ -141,12 +145,12 @@ def run_ENS(begin, end, depends_on=None, first_minute=True,
if output_restart_interval: if output_restart_interval:
args.append('--restart_interval='+str(int(float(output_restart_interval)))) args.append('--restart_interval='+str(int(float(output_restart_interval))))
s = my_Slurm("preWRF", cfg_update=dict(time="2")) s = run_job("preWRF", cfg_update=dict(time="2"))
id = s.run(' '.join(args), depends_on=[id]) id = s.run(' '.join(args), depends_on=[id])
time_in_simulation_hours = (end-begin).total_seconds()/3600 time_in_simulation_hours = (end-begin).total_seconds()/3600
runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9.5) # usually below 9 min/hour runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9.5) # usually below 9 min/hour
s = my_Slurm("WRF", cfg_update={"array": "1-"+str(exp.n_nodes), "ntasks": "10", "nodes": "1", s = run_job("WRF", cfg_update={"array": "1-"+str(exp.n_nodes), "ntasks": "10", "nodes": "1",
"time": str(runtime_wallclock_mins_expected), "mem": "140G"}) "time": str(runtime_wallclock_mins_expected), "mem": "140G"})
cmd = script_to_str(cluster.run_WRF).replace('<exp.expname>', exp.expname cmd = script_to_str(cluster.run_WRF).replace('<exp.expname>', exp.expname
).replace('<cluster.wrf_rundir_base>', cluster.wrf_rundir_base) ).replace('<cluster.wrf_rundir_base>', cluster.wrf_rundir_base)
...@@ -166,7 +170,7 @@ def assimilate(assim_time, prior_init_time, prior_valid_time, prior_path_exp, ...@@ -166,7 +170,7 @@ def assimilate(assim_time, prior_init_time, prior_valid_time, prior_path_exp,
if not os.path.exists(prior_path_exp): if not os.path.exists(prior_path_exp):
raise IOError('prior_path_exp does not exist: '+prior_path_exp) raise IOError('prior_path_exp does not exist: '+prior_path_exp)
id = my_Slurm("Assim", cfg_update={"ntasks": "12", "time": "60", id = run_job("Assim", cfg_update={"ntasks": "12", "time": "60",
"mem": "200G", "ntasks-per-node": "12", "ntasks-per-core": "2"} "mem": "200G", "ntasks-per-node": "12", "ntasks-per-core": "2"}
).run(cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py ' ).run(cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ') +assim_time.strftime('%Y-%m-%d_%H:%M ')
...@@ -183,7 +187,7 @@ def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, new ...@@ -183,7 +187,7 @@ def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, new
else: else:
tnew = '' tnew = ''
id = my_Slurm("IC-prior", cfg_update=dict(time="8") id = run_job("IC-prior", cfg_update=dict(time="8")
).run(cluster.python+' '+cluster.scripts_rundir+'/prep_IC_prior.py ' ).run(cluster.python+' '+cluster.scripts_rundir+'/prep_IC_prior.py '
+prior_path_exp +prior_path_exp
+prior_init_time.strftime(' %Y-%m-%d_%H:%M') +prior_init_time.strftime(' %Y-%m-%d_%H:%M')
...@@ -193,14 +197,14 @@ def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, new ...@@ -193,14 +197,14 @@ def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, new
def update_IC_from_DA(assim_time, depends_on=None): def update_IC_from_DA(assim_time, depends_on=None):
id = my_Slurm("IC-update", cfg_update=dict(time="8") id = run_job("IC-update", cfg_update=dict(time="8")
).run(cluster.python+' '+cluster.scripts_rundir+'/update_IC.py ' ).run(cluster.python+' '+cluster.scripts_rundir+'/update_IC.py '
+assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on]) +assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on])
return id return id
def create_satimages(init_time, depends_on=None): def create_satimages(init_time, depends_on=None):
s = my_Slurm("RTTOV", cfg_update={"ntasks": "12", "time": "80", "mem": "200G"}) s = run_job("RTTOV", cfg_update={"ntasks": "12", "time": "80", "mem": "200G"})
id = s.run(cluster.python_verif+' ~/RTTOV-WRF/run_init.py '+cluster.archivedir id = s.run(cluster.python_verif+' ~/RTTOV-WRF/run_init.py '+cluster.archivedir
+init_time.strftime('/%Y-%m-%d_%H:%M/'), +init_time.strftime('/%Y-%m-%d_%H:%M/'),
depends_on=[depends_on]) depends_on=[depends_on])
...@@ -209,31 +213,31 @@ def create_satimages(init_time, depends_on=None): ...@@ -209,31 +213,31 @@ def create_satimages(init_time, depends_on=None):
def mailme(depends_on=None): def mailme(depends_on=None):
if depends_on: if depends_on:
s = my_Slurm("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"}) s = run_job("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"})
s.run('sleep 1', depends_on=[depends_on]) s.run('sleep 1', depends_on=[depends_on])
def gen_obsseq(depends_on=None): def gen_obsseq(depends_on=None):
s = my_Slurm("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"}) s = run_job("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"})
id = s.run(cluster.python+' '+cluster.scripts_rundir+'/obsseq_to_netcdf.py', id = s.run(cluster.python+' '+cluster.scripts_rundir+'/obsseq_to_netcdf.py',
depends_on=[depends_on]) depends_on=[depends_on])
return id return id
def verify_sat(depends_on=None): def verify_sat(depends_on=None):
s = my_Slurm("verif-SAT-"+exp.expname, cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "20", s = run_job("verif-SAT-"+exp.expname, cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "20",
"ntasks-per-node": "20", "ntasks-per-core": "1", "mem": "100G",}) "ntasks-per-node": "20", "ntasks-per-core": "1", "mem": "100G",})
cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node sat verif1d FSS BS' cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node sat verif1d FSS BS'
s.run(cmd, depends_on=[depends_on]) s.run(cmd, depends_on=[depends_on])
def verify_wrf(depends_on=None): def verify_wrf(depends_on=None):
s = my_Slurm("verif-WRF-"+exp.expname, cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "20", s = run_job("verif-WRF-"+exp.expname, cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "20",
"ntasks-per-node": "20", "ntasks-per-core": "1", "mem": "250G"}) "ntasks-per-node": "20", "ntasks-per-core": "1", "mem": "250G"})
cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node wrf verif1d FSS BS' cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node wrf verif1d FSS BS'
s.run(cmd, depends_on=[depends_on]) s.run(cmd, depends_on=[depends_on])
def verify_fast(depends_on=None): def verify_fast(depends_on=None):
s = my_Slurm("verif-fast-"+exp.expname, cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1", s = run_job("verif-fast-"+exp.expname, cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1",
"ntasks-per-node": "1", "ntasks-per-core": "1"}) "ntasks-per-node": "1", "ntasks-per-core": "1"})
cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_fast/plot_single_exp.py '+exp.expname cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_fast/plot_single_exp.py '+exp.expname
s.run(cmd, depends_on=[depends_on]) s.run(cmd, depends_on=[depends_on])
...@@ -307,7 +311,7 @@ if __name__ == "__main__": ...@@ -307,7 +311,7 @@ if __name__ == "__main__":
# as we have WRF output, we can use own exp path as prior # as we have WRF output, we can use own exp path as prior
prior_path_exp = cluster.archivedir prior_path_exp = cluster.archivedir
id_sat = create_satimages(time, depends_on=id) id_sat = create_satimages(time, depends_on=id)
# increment time # increment time
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment