Skip to content
Snippets Groups Projects
Commit 5868fa84 authored by lkugler's avatar lkugler
Browse files

merge

parents 823eaee9 6004d35b
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python3
"""
running the forecast model without assimilation
"""
import os, sys, shutil
import datetime as dt
from dartwrf import utils
from config.cfg import exp, cluster
log_dir = cluster.archivedir+'/logs/'
slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/'
print('logging to', log_dir)
print('scripts, which are submitted to SLURM:', slurm_scripts_dir)
###############################
utils.backup_scripts()
prior_path_exp = '/mnt/jetfs/scratch/lkugler/data/sim_archive/exp_v1.19_P3_wbub7_noDA'
prior_init_time = dt.datetime(2008,7,30,12)
prior_valid_time = dt.datetime(2008,7,30,12,30)
assim_time = prior_valid_time
os.system(
cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ')
+prior_init_time.strftime('%Y-%m-%d_%H:%M ')
+prior_valid_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp
)
# id_sat = create_satimages(time, depends_on=id)
\ No newline at end of file
from config import clusters
from dartwrf import utils
from config import clusters # from . = problem in archivedir
cluster = clusters.jet # change cluster configuration here
class ExperimentConfiguration(object):
def __init__(self):
pass
exp = ExperimentConfiguration()
exp.expname = "exp_v1.22_P2_rr_WV62_obs10_loc20_oe2"
exp = utils.ExperimentConfiguration()
exp.expname = "test_srvx1" #"exp_v1.22_P3_wbub7_WV62_obs10_loc20_oe1"
exp.model_dx = 2000
exp.n_ens = 40
exp.size_jobarray = 40
exp.filter_kind = 1
exp.inflation = True
......@@ -99,7 +95,7 @@ psfc = dict(plotname='SYNOP Pressure', plotunits='[Pa]',
cov_loc_radius_km=32)
exp.observations = [wv62]
exp.observations = [t]
exp.update_vars = ['U', 'V', 'W', 'THM', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'PSFC']
#exp.update_vars = ['U', 'V', 'W', 'T', 'PH', 'MU', 'QVAPOR', 'PSFC']
......
import os, sys
import datetime as dt
from dartwrf import utils
"""Configuration name docs
......@@ -41,29 +42,13 @@ slurm_cfg python dictionary, containing options of SLURM
"""
class ClusterConfig(object):
"""Helper class, contains useful abbreviations to use in code later on"""
def __init__(self):
pass
@property
def archivedir(self):
return self.archive_base+'/'+self.expname
def wrf_rundir(self, iens):
return self.wrf_rundir_base+'/'+self.expname+'/'+str(iens)
@property
def scripts_rundir(self):
return self.archivedir+'/DART-WRF/'
@property
def dartrundir(self):
return self.dart_rundir_base+'/'+self.expname+'/'
vsc = ClusterConfig()
vsc = utils.ClusterConfig()
vsc.name = 'vsc'
vsc.max_nproc = 20
vsc.size_jobarray = 10 # 10 jobs with each 4 WRF processes per node
vsc.use_slurm = True
# binaries
vsc.python = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/python'
......@@ -92,8 +77,11 @@ vsc.slurm_cfg = {"account": "p71386", "partition": "skylake_0384", "qos": "p7138
"nodes": "1", "ntasks": "1", "ntasks-per-node": "48", "ntasks-per-core": "1",
"mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"}
jet = ClusterConfig()
jet = utils.ClusterConfig()
jet.name = 'jet'
jet.max_nproc = 12
jet.use_slurm = True
jet.size_jobarray = 40
# binaries
jet.python = '/jetfs/home/lkugler/miniconda3/envs/DART/bin/python'
......@@ -123,3 +111,38 @@ jet.run_WRF = '/jetfs/home/lkugler/DART-WRF/dartwrf/run_ens.jet.sh'
jet.slurm_cfg = {"account": "lkugler", "partition": "compute", #"nodelist": "jet07",
"ntasks": "1", "ntasks-per-core": "1", "mem": "50G",
"mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"}
srvx1 = utils.ClusterConfig()
srvx1.name = 'srvx1'
srvx1.max_nproc = 6
srvx1.size_jobarray = 40
srvx1.use_slurm = False
# binaries
srvx1.python = '/mnt/jetfs/home/lkugler/miniconda3/envs/DART/bin/python'
srvx1.python_verif = '/jetfs/home/lkugler/miniconda3/envs/enstools/bin/python'
srvx1.ncks = '/jetfs/spack/opt/spack/linux-rhel8-skylake_avx512/intel-20.0.2/nco-4.9.3-dhlqiyog7howjmaleyfhm6lkt7ra37xf/bin/ncks'
srvx1.ideal = '/jetfs/home/lkugler/bin/ideal-v4.3_v1.22.exe'
srvx1.wrfexe = '/jetfs/home/lkugler/bin/wrf-v4.3_v1.22.exe'
srvx1.container = ''
# paths for data output
srvx1.wrf_rundir_base = '/jetfs/home/lkugler/data/run_WRF/' # path for temporary files
srvx1.dart_rundir_base = '/jetfs/home/lkugler/data/run_DART/' # path for temporary files
srvx1.archive_base = '/mnt/jetfs/scratch/lkugler/data/sim_archive/'
# paths used as input
srvx1.srcdir = '/users/staff/lkugler/AdvDA23/DART/WRF-4.3/run'
srvx1.dart_srcdir = '/users/staff/lkugler/AdvDA23/DART/models/wrf/work'
srvx1.rttov_srcdir = '/users/staff/lkugler/AdvDA23/RTTOV13/rtcoef_rttov13/'
srvx1.scriptsdir = '/jetfs/home/lkugler/DART-WRF/dartwrf/'
srvx1.geo_em = '/mnt/jetfs/scratch/lkugler/data/geo_em.d01.nc'
# templates/run scripts
srvx1.namelist = srvx1.scriptsdir+'/../templates/namelist.input'
srvx1.run_WRF = srvx1.scriptsdir+'/run_ens.jet.sh'
srvx1.slurm_cfg = {"account": "lkugler", "partition": "compute",
"ntasks": "1", "ntasks-per-core": "1", "mem": "50G",
"mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"}
......@@ -5,58 +5,15 @@ submitting jobs into SLURM queue
"""
import os, sys, shutil, glob, warnings
import datetime as dt
from slurmpy import Slurm
from dartwrf.utils import script_to_str, symlink, copy, create_job
from config.cfg import exp, cluster
from dartwrf.utils import script_to_str, symlink, copy
log_dir = cluster.archivedir+'/logs/'
slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/'
print('logging to', log_dir)
print('scripts, which are submitted to SLURM:', slurm_scripts_dir)
class Shellslurm():
def __init__(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
print(args[0])
os.system(args[0])
def my_Slurm(*args, cfg_update=dict(), **kwargs):
"""Shortcut to slurmpy's class; keep certain default kwargs
and only update some with kwarg `cfg_update`
see https://github.com/brentp/slurmpy
"""
debug = False # run without SLURM, locally on headnode
if debug:
return Shellslurm(*args)
return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update),
log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs)
def backup_scripts():
os.makedirs(cluster.archivedir, exist_ok=True)
try:
shutil.copytree(cluster.scriptsdir, cluster.scripts_rundir)
except FileExistsError:
pass
except:
raise
try:
copy(os.path.basename(__file__), cluster.scripts_rundir+'/')
except Exception as e:
warnings.warn(str(e))
def prepare_WRFrundir(init_time):
"""Create WRF/run directories and wrfinput files
"""
cmd = cluster.python+' '+cluster.scripts_rundir+'/prepare_wrfrundir.py '+init_time.strftime('%Y-%m-%d_%H:%M')
print(cmd)
os.system(cmd)
# return id
def run_ideal(depends_on=None):
"""Run ideal for every ensemble member"""
cmd = """# run ideal.exe in parallel, then add geodata
......@@ -75,7 +32,7 @@ do
mv $rundir/rsl.out.0000 $rundir/rsl.out.input
done
"""
s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens),
s = create_job("ideal", cfg_update={"ntasks": str(exp.n_ens),
"time": "10", "mem": "100G"})
id = s.run(cmd, depends_on=[depends_on])
return id
......@@ -84,7 +41,7 @@ def wrfinput_insert_wbubble(perturb=True, depends_on=None):
"""Given that directories with wrfinput files exist,
update these wrfinput files with warm bubbles
"""
s = my_Slurm("ins_wbubble", cfg_update={"time": "5"})
s = create_job("ins_wbubble", cfg_update={"time": "5"})
pstr = ' '
if perturb:
pstr = ' perturb'
......@@ -107,7 +64,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True,
# hist_interval = 1
# radt = 1 # calc CFRAC also in first minute
# begin_plus1 = begin+dt.timedelta(minutes=1)
# s = my_Slurm("preWRF1", cfg_update=dict(time="2"))
# s = create_job("preWRF1", cfg_update=dict(time="2"))
# args = [cluster.python, cluster.scripts_rundir+'/prepare_namelist.py',
# begin.strftime('%Y-%m-%d_%H:%M'),
# begin_plus1.strftime('%Y-%m-%d_%H:%M'),
......@@ -116,13 +73,13 @@ def run_ENS(begin, end, depends_on=None, first_minute=True,
# '--restart='+restart_flag,]
# id = s.run(' '.join(args), depends_on=[id])
# s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
# s = create_job("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
# "time": "2", "mem-per-cpu": "2G"})
# cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
# id = s.run(cmd, depends_on=[id])
# # apply forward operator (DART filter without assimilation)
# s = my_Slurm("fwOP-1m", cfg_update=dict(time="10", ntasks=48))
# s = create_job("fwOP-1m", cfg_update=dict(time="10", ntasks=48))
# id = s.run(cluster.python+' '+cluster.scripts_rundir+'/apply_obs_op_dart.py '
# + begin.strftime('%Y-%m-%d_%H:%M')+' '
# + begin_plus1.strftime('%Y-%m-%d_%H:%M'),
......@@ -141,12 +98,12 @@ def run_ENS(begin, end, depends_on=None, first_minute=True,
if output_restart_interval:
args.append('--restart_interval='+str(int(float(output_restart_interval))))
s = my_Slurm("preWRF", cfg_update=dict(time="2"))
s = create_job("preWRF", cfg_update=dict(time="2"))
id = s.run(' '.join(args), depends_on=[id])
time_in_simulation_hours = (end-begin).total_seconds()/3600
runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9.5) # usually below 9 min/hour
s = my_Slurm("WRF", cfg_update={"array": "1-"+str(exp.n_nodes), "ntasks": "10", "nodes": "1",
s = create_job("WRF", cfg_update={"array": "1-"+str(cluster.size_jobarray), "ntasks": "10", "nodes": "1",
"time": str(runtime_wallclock_mins_expected), "mem": "140G"})
cmd = script_to_str(cluster.run_WRF).replace('<exp.expname>', exp.expname
).replace('<cluster.wrf_rundir_base>', cluster.wrf_rundir_base)
......@@ -166,7 +123,7 @@ def assimilate(assim_time, prior_init_time, prior_valid_time, prior_path_exp,
if not os.path.exists(prior_path_exp):
raise IOError('prior_path_exp does not exist: '+prior_path_exp)
id = my_Slurm("Assim", cfg_update={"ntasks": "12", "time": "60",
id = create_job("Assim", cfg_update={"ntasks": "12", "time": "60",
"mem": "200G", "ntasks-per-node": "12", "ntasks-per-core": "2"}
).run(cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ')
......@@ -183,7 +140,7 @@ def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, new
else:
tnew = ''
id = my_Slurm("IC-prior", cfg_update=dict(time="8")
id = create_job("IC-prior", cfg_update=dict(time="8")
).run(cluster.python+' '+cluster.scripts_rundir+'/prep_IC_prior.py '
+prior_path_exp
+prior_init_time.strftime(' %Y-%m-%d_%H:%M')
......@@ -193,14 +150,14 @@ def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, new
def update_IC_from_DA(assim_time, depends_on=None):
id = my_Slurm("IC-update", cfg_update=dict(time="8")
id = create_job("IC-update", cfg_update=dict(time="8")
).run(cluster.python+' '+cluster.scripts_rundir+'/update_IC.py '
+assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on])
return id
def create_satimages(init_time, depends_on=None):
s = my_Slurm("RTTOV", cfg_update={"ntasks": "12", "time": "80", "mem": "200G"})
s = create_job("RTTOV", cfg_update={"ntasks": "12", "time": "80", "mem": "200G"})
id = s.run(cluster.python_verif+' ~/RTTOV-WRF/run_init.py '+cluster.archivedir
+init_time.strftime('/%Y-%m-%d_%H:%M/'),
depends_on=[depends_on])
......@@ -209,31 +166,31 @@ def create_satimages(init_time, depends_on=None):
def mailme(depends_on=None):
if depends_on:
s = my_Slurm("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"})
s = create_job("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"})
s.run('sleep 1', depends_on=[depends_on])
def gen_obsseq(depends_on=None):
s = my_Slurm("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"})
s = create_job("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"})
id = s.run(cluster.python+' '+cluster.scripts_rundir+'/obsseq_to_netcdf.py',
depends_on=[depends_on])
return id
def verify_sat(depends_on=None):
s = my_Slurm("verif-SAT-"+exp.expname, cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "20",
s = create_job("verif-SAT-"+exp.expname, cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "20",
"ntasks-per-node": "20", "ntasks-per-core": "1", "mem": "100G",})
cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node sat verif1d FSS BS'
s.run(cmd, depends_on=[depends_on])
def verify_wrf(depends_on=None):
s = my_Slurm("verif-WRF-"+exp.expname, cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "20",
s = create_job("verif-WRF-"+exp.expname, cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "20",
"ntasks-per-node": "20", "ntasks-per-core": "1", "mem": "250G"})
cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node wrf verif1d FSS BS'
cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node wrf verif1d verif3d FSS BS'
s.run(cmd, depends_on=[depends_on])
def verify_fast(depends_on=None):
s = my_Slurm("verif-fast-"+exp.expname, cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1",
s = create_job("verif-fast-"+exp.expname, cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1",
"ntasks-per-node": "1", "ntasks-per-core": "1"})
cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_fast/plot_single_exp.py '+exp.expname
s.run(cmd, depends_on=[depends_on])
......@@ -316,7 +273,6 @@ if __name__ == "__main__":
# update time variables
prior_init_time = time - timedelta_btw_assim
#id = gen_obsseq(id)
verify_sat(id_sat)
verify_wrf(id)
verify_fast(id)
......@@ -472,7 +472,7 @@ if __name__ == "__main__":
options = []
if len(sys.argv) >4:
options = sys.argv[5:]
nproc = 6 if 'headnode' in options else 12
nproc = cluster.max_nproc
archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/")
os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in
......
import os, sys, shutil, glob
import os, sys, shutil, glob, warnings
import builtins as __builtin__
#copy = shutil.copy
import subprocess
import datetime as dt
from slurmpy import Slurm
from config.cfg import cluster
class ExperimentConfiguration(object):
"""Collection of variables to use in code later on"""
def __init__(self):
pass
class ClusterConfig(object):
"""Collection of variables to use in code later on"""
def __init__(self):
pass
@property
def archivedir(self):
return self.archive_base+'/'+self.expname
def wrf_rundir(self, iens):
return self.wrf_rundir_base+'/'+self.expname+'/'+str(iens)
@property
def scripts_rundir(self):
return self.archivedir+'/DART-WRF/'
@property
def dartrundir(self):
return self.dart_rundir_base+'/'+self.expname+'/'
class Shellslurm():
"""Like Slurmpy class, but runs locally"""
def __init__(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
print(args[0])
os.system(args[0])
def create_job(*args, cfg_update=dict(), **kwargs):
"""Shortcut to slurmpy's class; keep certain default kwargs
and only update some with kwarg `cfg_update`
see https://github.com/brentp/slurmpy
with_slurm (bool) : if True, use SLURM, else run locally
"""
if cluster.use_slurm:
return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update),
log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs)
else:
return Shellslurm(*args)
def backup_scripts():
"""Copies scripts and configuration to archive dir output folder"""
os.makedirs(cluster.archivedir, exist_ok=True)
try:
shutil.copytree(cluster.scriptsdir, cluster.scripts_rundir)
except FileExistsError:
pass
except:
raise
try:
copy(os.path.basename(__file__), cluster.scripts_rundir+'/')
except Exception as e:
warnings.warn(str(e))
def prepare_WRFrundir(init_time):
"""Create WRF/run directories and wrfinput files
"""
cmd = cluster.python+' '+cluster.scripts_rundir+'/prepare_wrfrundir.py '+init_time.strftime('%Y-%m-%d_%H:%M')
print(cmd)
os.system(cmd)
def shell(args):
print(args)
......
......@@ -8,15 +8,14 @@ import pandas as pd
from slurmpy import Slurm
from config.cfg import exp, cluster
from dartwrf.utils import script_to_str, symlink
from dartwrf.utils import script_to_str, symlink, backup_scripts
from cycled_exp import *
log_dir = cluster.archivedir+'/logs/'
slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/'
print('logging to', log_dir)
print('scripts, which are submitted to SLURM:', slurm_scripts_dir)
from scheduler import *
################################
print('starting osse')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment