Skip to content
Snippets Groups Projects
Commit 709d4390 authored by lkugler's avatar lkugler
Browse files

restructure

parent 10b37368
Branches
No related tags found
No related merge requests found
#!/usr/bin/python3
"""
running the forecast model without assimilation
"""
import os, sys, shutil
import datetime as dt
from config.cfg import exp, cluster
log_dir = cluster.archivedir+'/logs/'
slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/'
print('logging to', log_dir)
print('scripts, which are submitted to SLURM:', slurm_scripts_dir)
from utils import create_job, backup_scripts
###############################
backup_scripts()
prior_path_exp = '/mnt/jetfs/scratch/lkugler/data/sim_archive/exp_v1.19_P3_wbub7_noDA'
prior_init_time = dt.datetime(2008,7,30,12)
prior_valid_time = dt.datetime(2008,7,30,12,30)
assim_time = prior_valid_time
create_job('assim').run(
cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ')
+prior_init_time.strftime('%Y-%m-%d_%H:%M ')
+prior_valid_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp
)
# id_sat = create_satimages(time, depends_on=id)
\ No newline at end of file
......@@ -62,6 +62,7 @@ class ClusterConfig(object):
vsc = ClusterConfig()
vsc.name = 'vsc'
srvx1.max_nproc = 20
# binaries
vsc.python = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/python'
......@@ -92,6 +93,7 @@ vsc.slurm_cfg = {"account": "p71386", "partition": "skylake_0384", "qos": "p7138
jet = ClusterConfig()
jet.name = 'jet'
srvx1.max_nproc = 12
# binaries
jet.python = '/jetfs/home/lkugler/miniconda3/envs/DART/bin/python'
......@@ -120,3 +122,36 @@ jet.run_WRF = '/jetfs/home/lkugler/DART-WRF/dartwrf/run_ens.jet.sh'
jet.slurm_cfg = {"account": "lkugler", "partition": "compute",
"ntasks": "1", "ntasks-per-core": "1", "mem": "50G",
"mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"}
srvx1 = ClusterConfig()
srvx1.name = 'srvx1'
srvx1.max_nproc = 6
# binaries
srvx1.python = '/mnt/jetfs/home/lkugler/miniconda3/envs/DART/bin/python'
srvx1.python_verif = '/jetfs/home/lkugler/miniconda3/envs/enstools/bin/python'
srvx1.ncks = '/jetfs/spack/opt/spack/linux-rhel8-skylake_avx512/intel-20.0.2/nco-4.9.3-dhlqiyog7howjmaleyfhm6lkt7ra37xf/bin/ncks'
srvx1.ideal = '/jetfs/home/lkugler/bin/ideal-v4.3_v1.22.exe'
srvx1.wrfexe = '/jetfs/home/lkugler/bin/wrf-v4.3_v1.22.exe'
srvx1.container = ''
# paths for data output
srvx1.wrf_rundir_base = '/jetfs/home/lkugler/data/run_WRF/' # path for temporary files
srvx1.dart_rundir_base = '/jetfs/home/lkugler/data/run_DART/' # path for temporary files
srvx1.archive_base = '/mnt/jetfs/scratch/lkugler/data/sim_archive/'
# paths used as input
srvx1.srcdir = '/users/staff/lkugler/AdvDA23/DART/WRF-4.3/run'
srvx1.dart_srcdir = '/users/staff/lkugler/AdvDA23/DART/models/wrf/work'
srvx1.rttov_srcdir = '/users/staff/lkugler/AdvDA23/RTTOV13/rtcoef_rttov13/'
srvx1.scriptsdir = '/jetfs/home/lkugler/DART-WRF/dartwrf/'
srvx1.geo_em = '/mnt/jetfs/scratch/lkugler/data/geo_em.d01.nc'
# templates/run scripts
srvx1.namelist = srvx1.scriptsdir+'/../templates/namelist.input'
srvx1.run_WRF = srvx1.scriptsdir+'/run_ens.jet.sh'
srvx1.slurm_cfg = {"account": "lkugler", "partition": "compute",
"ntasks": "1", "ntasks-per-core": "1", "mem": "50G",
"mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"}
......@@ -5,63 +5,16 @@ submitting jobs into SLURM queue
"""
import os, sys, shutil, glob, warnings
import datetime as dt
from slurmpy import Slurm
from config.cfg import exp, cluster
from dartwrf.utils import script_to_str, symlink, copy
from utils import create_job
log_dir = cluster.archivedir+'/logs/'
slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/'
print('logging to', log_dir)
print('scripts, which are submitted to SLURM:', slurm_scripts_dir)
class Shellslurm():
"""Like Slurm class, but runs locally"""
def __init__(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
print(args[0])
os.system(args[0])
def run_job(*args, cfg_update=dict(), with_slurm=True, **kwargs):
"""Shortcut to slurmpy's class; keep certain default kwargs
and only update some with kwarg `cfg_update`
see https://github.com/brentp/slurmpy
with_slurm (bool) : if True, use SLURM, else run locally
"""
if with_slurm:
return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update),
log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs)
else:
return Shellslurm(*args)
def backup_scripts():
os.makedirs(cluster.archivedir, exist_ok=True)
try:
shutil.copytree(cluster.scriptsdir, cluster.scripts_rundir)
except FileExistsError:
pass
except:
raise
try:
copy(os.path.basename(__file__), cluster.scripts_rundir+'/')
except Exception as e:
warnings.warn(str(e))
def prepare_WRFrundir(init_time):
"""Create WRF/run directories and wrfinput files
"""
cmd = cluster.python+' '+cluster.scripts_rundir+'/prepare_wrfrundir.py '+init_time.strftime('%Y-%m-%d_%H:%M')
print(cmd)
os.system(cmd)
# return id
def run_ideal(depends_on=None):
"""Run ideal for every ensemble member"""
cmd = """# run ideal.exe in parallel, then add geodata
......@@ -80,7 +33,7 @@ do
mv $rundir/rsl.out.0000 $rundir/rsl.out.input
done
"""
s = run_job("ideal", cfg_update={"ntasks": str(exp.n_ens),
s = create_job("ideal", cfg_update={"ntasks": str(exp.n_ens),
"time": "10", "mem": "100G"})
id = s.run(cmd, depends_on=[depends_on])
return id
......@@ -89,7 +42,7 @@ def wrfinput_insert_wbubble(perturb=True, depends_on=None):
"""Given that directories with wrfinput files exist,
update these wrfinput files with warm bubbles
"""
s = run_job("ins_wbubble", cfg_update={"time": "5"})
s = create_job("ins_wbubble", cfg_update={"time": "5"})
pstr = ' '
if perturb:
pstr = ' perturb'
......@@ -112,7 +65,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True,
# hist_interval = 1
# radt = 1 # calc CFRAC also in first minute
# begin_plus1 = begin+dt.timedelta(minutes=1)
# s = run_job("preWRF1", cfg_update=dict(time="2"))
# s = create_job("preWRF1", cfg_update=dict(time="2"))
# args = [cluster.python, cluster.scripts_rundir+'/prepare_namelist.py',
# begin.strftime('%Y-%m-%d_%H:%M'),
# begin_plus1.strftime('%Y-%m-%d_%H:%M'),
......@@ -121,13 +74,13 @@ def run_ENS(begin, end, depends_on=None, first_minute=True,
# '--restart='+restart_flag,]
# id = s.run(' '.join(args), depends_on=[id])
# s = run_job("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
# s = create_job("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
# "time": "2", "mem-per-cpu": "2G"})
# cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
# id = s.run(cmd, depends_on=[id])
# # apply forward operator (DART filter without assimilation)
# s = run_job("fwOP-1m", cfg_update=dict(time="10", ntasks=48))
# s = create_job("fwOP-1m", cfg_update=dict(time="10", ntasks=48))
# id = s.run(cluster.python+' '+cluster.scripts_rundir+'/apply_obs_op_dart.py '
# + begin.strftime('%Y-%m-%d_%H:%M')+' '
# + begin_plus1.strftime('%Y-%m-%d_%H:%M'),
......@@ -146,12 +99,12 @@ def run_ENS(begin, end, depends_on=None, first_minute=True,
if output_restart_interval:
args.append('--restart_interval='+str(int(float(output_restart_interval))))
s = run_job("preWRF", cfg_update=dict(time="2"))
s = create_job("preWRF", cfg_update=dict(time="2"))
id = s.run(' '.join(args), depends_on=[id])
time_in_simulation_hours = (end-begin).total_seconds()/3600
runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9.5) # usually below 9 min/hour
s = run_job("WRF", cfg_update={"array": "1-"+str(exp.n_nodes), "ntasks": "10", "nodes": "1",
s = create_job("WRF", cfg_update={"array": "1-"+str(exp.n_nodes), "ntasks": "10", "nodes": "1",
"time": str(runtime_wallclock_mins_expected), "mem": "140G"})
cmd = script_to_str(cluster.run_WRF).replace('<exp.expname>', exp.expname
).replace('<cluster.wrf_rundir_base>', cluster.wrf_rundir_base)
......@@ -171,7 +124,7 @@ def assimilate(assim_time, prior_init_time, prior_valid_time, prior_path_exp,
if not os.path.exists(prior_path_exp):
raise IOError('prior_path_exp does not exist: '+prior_path_exp)
id = run_job("Assim", cfg_update={"ntasks": "12", "time": "60",
id = create_job("Assim", cfg_update={"ntasks": "12", "time": "60",
"mem": "200G", "ntasks-per-node": "12", "ntasks-per-core": "2"}
).run(cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ')
......@@ -188,7 +141,7 @@ def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, new
else:
tnew = ''
id = run_job("IC-prior", cfg_update=dict(time="8")
id = create_job("IC-prior", cfg_update=dict(time="8")
).run(cluster.python+' '+cluster.scripts_rundir+'/prep_IC_prior.py '
+prior_path_exp
+prior_init_time.strftime(' %Y-%m-%d_%H:%M')
......@@ -198,14 +151,14 @@ def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, new
def update_IC_from_DA(assim_time, depends_on=None):
id = run_job("IC-update", cfg_update=dict(time="8")
id = create_job("IC-update", cfg_update=dict(time="8")
).run(cluster.python+' '+cluster.scripts_rundir+'/update_IC.py '
+assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on])
return id
def create_satimages(init_time, depends_on=None):
s = run_job("RTTOV", cfg_update={"ntasks": "12", "time": "80", "mem": "200G"})
s = create_job("RTTOV", cfg_update={"ntasks": "12", "time": "80", "mem": "200G"})
id = s.run(cluster.python_verif+' ~/RTTOV-WRF/run_init.py '+cluster.archivedir
+init_time.strftime('/%Y-%m-%d_%H:%M/'),
depends_on=[depends_on])
......@@ -214,31 +167,31 @@ def create_satimages(init_time, depends_on=None):
def mailme(depends_on=None):
if depends_on:
s = run_job("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"})
s = create_job("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"})
s.run('sleep 1', depends_on=[depends_on])
def gen_obsseq(depends_on=None):
s = run_job("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"})
s = create_job("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"})
id = s.run(cluster.python+' '+cluster.scripts_rundir+'/obsseq_to_netcdf.py',
depends_on=[depends_on])
return id
def verify_sat(depends_on=None):
s = run_job("verif-SAT-"+exp.expname, cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "20",
s = create_job("verif-SAT-"+exp.expname, cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "20",
"ntasks-per-node": "20", "ntasks-per-core": "1", "mem": "100G",})
cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node sat verif1d FSS BS'
s.run(cmd, depends_on=[depends_on])
def verify_wrf(depends_on=None):
s = run_job("verif-WRF-"+exp.expname, cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "20",
s = create_job("verif-WRF-"+exp.expname, cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "20",
"ntasks-per-node": "20", "ntasks-per-core": "1", "mem": "250G"})
cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node wrf verif1d FSS BS'
s.run(cmd, depends_on=[depends_on])
def verify_fast(depends_on=None):
s = run_job("verif-fast-"+exp.expname, cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1",
s = create_job("verif-fast-"+exp.expname, cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1",
"ntasks-per-node": "1", "ntasks-per-core": "1"})
cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_fast/plot_single_exp.py '+exp.expname
s.run(cmd, depends_on=[depends_on])
......
......@@ -466,7 +466,7 @@ if __name__ == "__main__":
options = []
if len(sys.argv) >4:
options = sys.argv[5:]
nproc = 6 if 'headnode' in options else 12
nproc = cluster.max_nproc
archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/")
os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in
......
utils.py 0 → 100644
import os, sys, shutil, glob, warnings
import datetime as dt
from slurmpy import Slurm
from config.cfg import exp, cluster
from dartwrf.utils import script_to_str, symlink, copy
class Shellslurm():
"""Like Slurm class, but runs locally"""
def __init__(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
print(args[0])
os.system(args[0])
def create_job(*args, cfg_update=dict(), with_slurm=True, **kwargs):
"""Shortcut to slurmpy's class; keep certain default kwargs
and only update some with kwarg `cfg_update`
see https://github.com/brentp/slurmpy
with_slurm (bool) : if True, use SLURM, else run locally
"""
if with_slurm:
return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update),
log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs)
else:
return Shellslurm(*args)
def backup_scripts():
"""Copies scripts and configuration to archive dir output folder"""
os.makedirs(cluster.archivedir, exist_ok=True)
try:
shutil.copytree(cluster.scriptsdir, cluster.scripts_rundir)
except FileExistsError:
pass
except:
raise
try:
copy(os.path.basename(__file__), cluster.scripts_rundir+'/')
except Exception as e:
warnings.warn(str(e))
def prepare_WRFrundir(init_time):
"""Create WRF/run directories and wrfinput files
"""
cmd = cluster.python+' '+cluster.scripts_rundir+'/prepare_wrfrundir.py '+init_time.strftime('%Y-%m-%d_%H:%M')
print(cmd)
os.system(cmd)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment