Skip to content
Snippets Groups Projects
Commit cf0189f7 authored by lkugler's avatar lkugler
Browse files

small things

parent b17a7c30
Branches
Tags
No related merge requests found
#!/usr/bin/python3
"""
high level control script
submitting jobs into SLURM queue
"""
import os, sys, shutil
import datetime as dt
from slurmpy import Slurm
# necessary to find modules in folder, since SLURM runs the script elsewhere
sys.path.append(os.getcwd())
from config.cfg import exp, cluster
from scripts.utils import script_to_str, symlink
# allow scripts to access the configuration
symlink(cluster.scriptsdir+'/../config', cluster.scriptsdir+'/config')
log_dir = cluster.archivedir()+'/logs/'
slurm_scripts_dir = cluster.archivedir()+'/slurm-scripts/'
print('logging to', log_dir)
print('scripts, which are submitted to SLURM:', slurm_scripts_dir)
def my_Slurm(*args, cfg_update=dict(), **kwargs):
"""Shortcut to slurmpy's class; keep certain default kwargs
and only update some with kwarg `cfg_update`
see https://github.com/brentp/slurmpy
"""
return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update),
log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs)
class Cmdline(object):
def __init__(self, name, cfg_update):
self.name = name
def run(self, cmd, **kwargs):
print('running', self.name, 'without SLURM')
os.system(cmd)
from scheduler import backup_scripts
def prepare_wrfinput():
"""Create WRF/run directories and wrfinput files
"""
s = my_Slurm("prep_wrfinput", cfg_update={"time": "5", "mail-type": "BEGIN"})
id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py')
cmd = """# run ideal.exe in parallel, then add geodata
export SLURM_STEP_GRES=none
for ((n=1; n<="""+str(exp.n_ens)+"""; n++))
do
rundir="""+cluster.userdir+'/run_WRF/'+exp.expname+"""/$n
echo $rundir
cd $rundir
mpirun -np 1 ./ideal.exe &
done
wait
for ((n=1; n<="""+str(exp.n_ens)+"""; n++))
do
rundir="""+cluster.userdir+'/run_WRF/'+exp.expname+"""/$n
mv $rundir/rsl.out.0000 $rundir/rsl.out.input
done
"""
s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens), "nodes": "1",
"time": "10", "mem-per-cpu": "2G"})
id = s.run(cmd, depends_on=[id])
return id
def update_wrfinput_from_archive(valid_time, background_init_time, exppath, depends_on=None):
"""Given that directories with wrfinput files exist,
update these wrfinput files according to wrfout files
"""
s = my_Slurm("upd_wrfinput", cfg_update={"time": "5"})
# path of initial conditions, <iens> is replaced by member index
IC_path = exppath + background_init_time.strftime('/%Y-%m-%d_%H:%M/') \
+'*iens*/'+valid_time.strftime('/wrfout_d01_%Y-%m-%d_%H:%M:%S')
id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_wrfout.py '
+IC_path, depends_on=[depends_on])
return id
def run_ENS(begin, end, depends_on=None):
"""Run forecast for 1 minute, save output.
Then run whole timespan with 5 minutes interval.
"""
id = depends_on
# first minute forecast (needed for validating an assimilation)
hist_interval = 1
radt = 1 # calc CFRAC also in first minute
begin_plus1 = begin+dt.timedelta(minutes=1)
s = my_Slurm("preWRF1", cfg_update=dict(time="2"))
id = s.run(' '.join([cluster.python,
cluster.scriptsdir+'/prepare_namelist.py',
begin.strftime('%Y-%m-%d_%H:%M'),
begin_plus1.strftime('%Y-%m-%d_%H:%M'),
str(hist_interval), str(radt),]),
depends_on=[id])
s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
"time": "2", "mem-per-cpu": "2G"})
cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
id = s.run(cmd, depends_on=[id])
# apply forward operator (DART filter without assimilation)
s = my_Slurm("fwOP-1m", cfg_update=dict(time="10", ntasks=48))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/apply_obs_op_dart.py '
+ begin.strftime('%Y-%m-%d_%H:%M')+' '
+ begin_plus1.strftime('%Y-%m-%d_%H:%M'),
depends_on=[id])
# whole forecast timespan
hist_interval = 5
radt = 5
s = my_Slurm("preWRF2", cfg_update=dict(time="2"))
id = s.run(' '.join([cluster.python,
cluster.scriptsdir+'/prepare_namelist.py',
begin.strftime('%Y-%m-%d_%H:%M'),
end.strftime('%Y-%m-%d_%H:%M'),
str(hist_interval), str(radt),]),
depends_on=[id])
time_in_simulation_hours = (end-begin).total_seconds()/3600
runtime_wallclock_mins_expected = int(4+time_in_simulation_hours*9) # usually below 8 min/hour
s = my_Slurm("runWRF2", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
"time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"})
cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
id = s.run(cmd, depends_on=[id])
# not needed, since wrf.exe writes directly to archive folder
#s = my_Slurm("archiveWRF", cfg_update=dict(nodes="1", ntasks="1", time="10"))
#id3 = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_wrf.py '
# + begin.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2])
return id
def assimilate(assim_time, prior_init_time,
prior_path_exp=False, depends_on=None):
"""Creates observations from a nature run and assimilates them.
Args:
assim_time (dt.datetime): timestamp of prior wrfout files
prior_init_time (dt.datetime):
timestamp to find the directory where the prior wrfout files are
prior_path_exp (bool or str):
put a `str` to take the prior from a different experiment
if False: use `archivedir` (defined in config) to get prior state
if str: use this directory to get prior state
"""
if not prior_path_exp:
prior_path_exp = cluster.archivedir()
elif not isinstance(prior_path_exp, str):
raise TypeError('prior_path_exp either str or False, is '+str(type(prior_path_exp)))
# prepare state of nature run, from which observation is sampled
#s = my_Slurm("prepNature", cfg_update=dict(time="2"))
#id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_nature.py '
# +time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on])
# prepare prior model state
s = my_Slurm("preAssim", cfg_update=dict(time="2"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/pre_assim.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ')
+prior_init_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp, depends_on=[depends_on])
# prepare nature run, generate observations
s = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "30",
"mem": "300G", "ntasks-per-node": "96", "ntasks-per-core": "2"})
id = s.run(cluster.python+' '+cluster.scriptsdir+'/assim_synth_obs.py '
+time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id])
# # actuall assimilation step
# s = my_Slurm("Assim", cfg_update=dict(nodes="1", ntasks="48", time="50", mem="200G"))
# cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./filter; rm obs_seq_all.out'
# id = s.run(cmd, depends_on=[id])
# s = my_Slurm("archiveAssim", cfg_update=dict(time="10"))
# id = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py '
# + assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id])
s = my_Slurm("updateIC", cfg_update=dict(time="8"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_filteroutput.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ')
+prior_init_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp, depends_on=[id])
return id
def create_satimages(depends_on=None):
s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "40"})
s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/loop.py '+exp.expname,
depends_on=[depends_on])
def mailme(depends_on=None):
if depends_on:
s = my_Slurm("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"})
s.run('sleep 1', depends_on=[depends_on])
################################
print('starting osse')
backup_scripts()
id = None
id = prepare_wrfinput() # create initial conditions
begin = dt.datetime(2008, 7, 30, 6, 0)
end = dt.datetime(2008, 7, 30, 20, 0)
# whole forecast timespan
hist_interval = 5
radt = 5
s = my_Slurm("namelist", cfg_update=dict(time="2"))
id = s.run(' '.join([cluster.python,
cluster.scriptsdir+'/prepare_namelist.py',
begin.strftime('%Y-%m-%d_%H:%M'),
end.strftime('%Y-%m-%d_%H:%M'),
str(hist_interval), str(radt),]),
depends_on=[id])
s = my_Slurm("EnsWRF", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
"mem-per-cpu": "2G", "mail-type": "BEGIN,FAIL,END"})
cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
id = s.run(cmd, depends_on=[id])
......@@ -148,7 +148,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True):
depends_on=[id])
time_in_simulation_hours = (end-begin).total_seconds()/3600
runtime_wallclock_mins_expected = int(4+time_in_simulation_hours*9) # usually below 8 min/hour
runtime_wallclock_mins_expected = int(6+time_in_simulation_hours*10) # usually below 8 min/hour
s = my_Slurm("runWRF2", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
"time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"})
cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
......@@ -229,41 +229,43 @@ def mailme(depends_on=None):
if __name__ == "__main__":
print('starting osse')
timedelta_integrate = dt.timedelta(minutes=75)
timedelta_btw_assim = dt.timedelta(minutes=60)
timedelta_integrate = dt.timedelta(minutes=45)
timedelta_btw_assim = dt.timedelta(minutes=30)
backup_scripts()
id = None
start_from_existing_state = False
start_from_existing_state = True
is_new_run = not start_from_existing_state
if is_new_run:
id = prepare_wrfinput() # create initial conditions
# spin up the ensemble
init_time = dt.datetime(2008, 7, 30, 6, 0)
integration_end_time = dt.datetime(2008, 7, 30, 9, 0)
init_time = dt.datetime(2008, 7, 30, 6)
integration_end_time = dt.datetime(2008, 7, 30, 9)
id = run_ENS(begin=init_time,
end=integration_end_time,
first_minute=False,
depends_on=id)
time = integration_end_time
prior_path_exp = False # for next assimilation
elif start_from_existing_state:
id = prepare_wrfinput() # create initial conditions
# get initial conditions from archive
init_time = dt.datetime(2008, 7, 30, 6)
integration_end_time = dt.datetime(2008, 7, 30, 10)
exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.13_P0_ps-t2'
integration_end_time = dt.datetime(2008, 7, 30, 9)
exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_P0_40mem'
id = update_wrfinput_from_archive(integration_end_time, init_time, exppath_arch, depends_on=id)
prior_path_exp = exppath_arch # for next assimilation
# values for assimilation
time = integration_end_time
assim_time = integration_end_time
prior_init_time = init_time
prior_path_exp = False #exppath_arch
while time <= dt.datetime(2008, 7, 30, 18):
while time <= dt.datetime(2008, 7, 30, 15):
id = assimilate(assim_time,
prior_init_time,
......
......@@ -10,8 +10,10 @@ background_init_time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M')
exppath_firstguess = str(sys.argv[3])
"""
# assumes T = THM (dry potential temperature as prognostic variable)
sets initial condition data (wrfinput file) in run_WRF directories
from a DART output state (set of filter_restart files)
# assumes T = THM (dry potential temperature as prognostic variable)
"""
update_vars = ['Times', 'U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'TSK', 'CLDFRA']
updates = ','.join(update_vars)
......
......@@ -21,6 +21,8 @@ for iens in range(1, exp.n_ens+1):
print('updating', wrfin, 'to state in', wrfout)
assert os.path.isfile(wrfout), wrfout
os.makedirs(os.path.dirname(wrfin), exist_ok=True)
# overwrite variables in wrfinput file
# os.system(cluster.ncks+' -A -v '+vars+' '+wrfout+' '+wrfin)
copy(wrfout, wrfin)
......@@ -249,10 +249,10 @@
# based on your grid size. nlon must be an odd number.
&location_nml
horiz_dist_only = <horiz_dist_only>,
vert_normalization_pressure = 6666666.7,
vert_normalization_pressure = -1,
vert_normalization_height = <vert_norm_hgt>,
vert_normalization_level = 2666.7,
vert_normalization_scale_height = 10.0,
vert_normalization_level = -1,
vert_normalization_scale_height = -1,
approximate_distance = .false.,
nlon = 71,
nlat = 36,
......
......@@ -43,7 +43,7 @@
j_parent_start = 0, 15, 15,
parent_grid_ratio = 1, 3, 3,
parent_time_step_ratio = 1, 3, 3,
eta_levels = 1.0000, 0.9969, 0.9932, 0.9889, 0.9837, 0.9776, 0.9704, 0.9620, 0.9522, 0.9408, 0.9277, 0.9126, 0.8954, 0.8760, 0.8543, 0.8303, 0.8040, 0.7755, 0.7450, 0.7128, 0.6793, 0.6448, 0.6097, 0.5744, 0.5393, 0.5048, 0.4711, 0.4381, 0.4061, 0.3749, 0.3448, 0.3157, 0.2877, 0.2609, 0.2352, 0.2107, 0.1874, 0.1654, 0.1447, 0.1252, 0.1069, 0.0902, 0.0752, 0.0618, 0.0498, 0.0390, 0.0294, 0.0208, 0.0131, 0.0062, 0.0000
eta_levels = 1.0000, 0.9969, 0.9923, 0.9857, 0.9762, 0.9627, 0.9443, 0.9200, 0.8892, 0.8525, 0.8111, 0.7671, 0.7223, 0.6782, 0.6357, 0.5949, 0.5558, 0.5183, 0.4824, 0.4482, 0.4155, 0.3844, 0.3547, 0.3266, 0.2999, 0.2750, 0.2519, 0.2304, 0.2105, 0.1919, 0.1747, 0.1586, 0.1437, 0.1299, 0.1170, 0.1050, 0.0939, 0.0836, 0.0740, 0.0650, 0.0567, 0.0490, 0.0419, 0.0352, 0.0290, 0.0232, 0.0179, 0.0129, 0.0083, 0.0040, 0.0000
/
&physics
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment