Skip to content
Snippets Groups Projects
Commit fe007678 authored by Lukas Kugler's avatar Lukas Kugler
Browse files

docs, configs

parent 57ff4b1f
No related branches found
No related tags found
No related merge requests found
......@@ -18,22 +18,14 @@ from dartwrf.utils import Config
class WorkFlows(object):
def __init__(self, cfg: Config):
"""Set up the experiment folder in `archivedir`.
1. Copy the selected config files
2. Import configurations
3. Prepare obskind.py file (dictionary of observation types)
4. Copy the scripts and configs to `archivedir`
5. Set python path
6. Set log path and slurm scripts path
Args:
exp_config (str): Path to exp config file
server_config (str): Path to the cluster config file
"""Create the archive directory, copy scripts to archive
Attributes:
cluster (obj): cluster configuration as defined in server_config file
exp (obj): experiment configuration as defined in exp_config file
use_slurm: if True, run jobs with SLURM
dir_log: Logging directory for slurm
dir_slurm: Scripts directory for slurm
dir_dartwrf_run: as defined in Config
python: python command with prepended PYTHONPATH
"""
print('------------------------------------------------------')
print('>>> Experiment name: "'+cfg.name+'"')
......@@ -42,7 +34,7 @@ class WorkFlows(object):
# Copy scripts and config files to `self.archivedir` folder
dirs_exist_ok = False
if os.path.exists(cfg.dir_archive+'/DART-WRF/'):
if input('The experiment name already exists! Overwrite existing experiment? (Y/n) ') in ['Y', 'y']:
if input('The experiment name already exists! Overwrite existing scripts? (Y/n) ') in ['Y', 'y']:
dirs_exist_ok = True
shutil.copytree(cfg.dir_dartwrf_dev,
......@@ -68,10 +60,9 @@ class WorkFlows(object):
print('------------------------------------------------------')
# use this python path
run_wrf_from_this_folder = cfg.dir_dartwrf_run+'/../'
pythonpath_archive = cfg.dir_dartwrf_run+'/../'
self.dir_dartwrf_run = cfg.dir_dartwrf_run
self.python = 'export PYTHONPATH=' +run_wrf_from_this_folder+ '; '+cfg.python
self.cfg = cfg
self.python = 'export PYTHONPATH=' +pythonpath_archive+ '; '+cfg.python
def run_job(self, cmd, cfg, depends_on=None, **kwargs):
"""Run scripts in a shell
......@@ -92,7 +83,10 @@ class WorkFlows(object):
from slurmpy import Slurm
# name of calling function
path_to_script = inspect.stack()[1].function
jobname = path_to_script.split('/')[-1]+'-'+cfg.f_cfg_current.split('/')[-1].replace('.py','')
if 'time' in cfg:
jobname = path_to_script.split('/')[-1]+'-'+cfg.time.strftime('%H:%M')
else:
jobname = path_to_script.split('/')[-1]+'-'+cfg.name
print('> SLURM job:', jobname)
slurm_kwargs = cfg.slurm_kwargs.copy()
......@@ -175,12 +169,11 @@ class WorkFlows(object):
Returns:
str: job ID of the submitted job
"""
cmd = script_to_str(self.cfg.WRF_ideal_template
).replace('<expname>', cfg.name
).replace('<wrf_rundir_base>', cfg.dir_wrf_run
cmd = script_to_str(cfg.WRF_ideal_template
).replace('<wrf_rundir_base>', cfg.dir_wrf_run.replace('<ens>', '$IENS')
).replace('<wrf_modules>', cfg.wrf_modules,
)
id = self.run_job(cmd, cfg, depends_on=[depends_on], time="30")
id = self.run_job(cmd, cfg, depends_on=[depends_on], time="30", array="1-"+str(cfg.ensemble_size))
return id
def run_WRF(self, cfg, depends_on=None):
......@@ -191,31 +184,32 @@ class WorkFlows(object):
end = cfg.WRF_end
# SLURM configuration for WRF
slurm_kwargs = {"array": "1-"+str(self.cfg.ensemble_size),
slurm_kwargs = {"array": "1-"+str(cfg.ensemble_size),
"nodes": "1",
"ntasks": str(self.cfg.max_nproc_for_each_ensemble_member),
"ntasks-per-core": "1", "mem": "90G", }
"ntasks": str(cfg.max_nproc_for_each_ensemble_member),
"ntasks-per-core": "1", "mem": "90G",
"ntasks-per-node": str(cfg.max_nproc_for_each_ensemble_member),}
# command from template file
wrf_cmd = script_to_str(self.cfg.WRF_exe_template
).replace('<dir_wrf_run>', self.cfg.dir_wrf_run.replace('<ens>', '$IENS')
).replace('<wrf_modules>', self.cfg.wrf_modules,
).replace('<WRF_number_of_processors>', str(self.cfg.max_nproc_for_each_ensemble_member),
wrf_cmd = script_to_str(cfg.WRF_exe_template
).replace('<dir_wrf_run>', cfg.dir_wrf_run.replace('<ens>', '$IENS')
).replace('<wrf_modules>', cfg.wrf_modules,
).replace('<WRF_number_of_processors>', str(cfg.max_nproc_for_each_ensemble_member),
)
# prepare namelist
path_to_script = self.dir_dartwrf_run + '/prepare_namelist.py'
cmd = ' '.join([self.python, path_to_script, self.cfg.f_cfg_current])
cmd = ' '.join([self.python, path_to_script, cfg.f_cfg_current])
id = self.run_job(cmd, cfg, depends_on=[depends_on])
# run WRF ensemble
time_in_simulation_hours = (end-start).total_seconds()/3600
runtime_wallclock_mins_expected = int(
time_in_simulation_hours*30 + 10) # usually <15 min/hour
# runtime_wallclock_mins_expected = int(time_in_simulation_hours*15*1.5 + 15)
runtime_wallclock_mins_expected = int(time_in_simulation_hours*30*1.5 + 15)
# usually max 15 min/hour + 50% margin + 15 min buffer
slurm_kwargs.update({"time": str(runtime_wallclock_mins_expected)})
if runtime_wallclock_mins_expected > 25:
if runtime_wallclock_mins_expected > 20:
slurm_kwargs.update({"partition": "amd"})
# #cfg_update.update({"exclude": "jet03"})
#cfg_update.update({"exclude": "jet03"})
id = self.run_job(wrf_cmd, cfg, depends_on=[id], **slurm_kwargs)
return id
......@@ -271,18 +265,30 @@ class WorkFlows(object):
id = self.run_job(cmd, cfg, depends_on=[depends_on], time="10")
return id
def create_satimages(self, cfg, depends_on=None):
def run_RTTOV(self, cfg, depends_on=None):
"""Run a job array, one job per ensemble member, to create satellite images"""
prefix = 'module purge; module load rttov/v13.2-gcc-8.5.0; python'
path_to_script = '~/RTTOV-WRF/run_init.py'
cmd = ' '.join([prefix, path_to_script, cfg.f_cfg_current])
cmd = ' '.join([prefix, path_to_script,
cfg.dir_archive +cfg.time.strftime('/%Y-%m-%d_%H:%M/'),
'$SLURM_ARRAY_TASK_ID'])
id = self.run_job(cmd, cfg, depends_on=[depends_on],
**{"ntasks": "1", "time": "60", "mem": "10G",
"array": "1-"+str(self.cfg.ensemble_size)})
"array": "1-"+str(cfg.ensemble_size)})
return id
def verify(self, cfg: Config, depends_on=None):
"""Not included in DART-WRF"""
cmd = ' '.join(['python /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py',
cfg.name, cfg.verify_against, #cfg.time.strftime('%y%m%d_%H:%M'),
'sat', 'wrf', 'has_node', 'np=10', 'mem=250G'])
self.run_job(cmd, cfg, depends_on=[depends_on],
**{"time": "03:00:00", "mail-type": "FAIL,END",
"ntasks": "10", "ntasks-per-node": "10",
"ntasks-per-core": "1", "mem": "250G"})
def evaluate_obs_posterior_after_analysis(self, cfg, depends_on=None):
path_to_script = self.dir_dartwrf_run + '/evaluate_obs_space.py'
......@@ -297,13 +303,4 @@ class WorkFlows(object):
# id = self.run_job(cmd, 'linpost'+self.cfg.name, cfg_update={"ntasks": "16", "mem": "80G", "ntasks-per-node": "16", "ntasks-per-core": "2",
# "time": "15", "mail-type": "FAIL"},
# depends_on=[id])
return id
def verify(self, cfg, depends_on=None):
"""Not included in DART-WRF"""
cmd = ' '.join(['python /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py',
cfg.f_cfg_current])
id = self.run_job(cmd, cfg, depends_on=[depends_on],
**{"time": "210", "mail-type": "FAIL,END",
"ntasks": "10", "ntasks-per-node": "10",
"ntasks-per-core": "1", "mem": "250G"})
return id
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment