Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • dev-lkugler
  • master
  • old_config_2023-05
  • teaching-2024
  • v2024.2.20
  • v2024.6
  • v2025.2
7 results

Target

Select target project
  • dataassimilation/DART-WRF
1 result
Select Git revision
  • dev-lkugler
  • master
  • old_config_2023-05
  • teaching-2024
  • v2024.2.20
  • v2024.6
  • v2025.2
7 results
Show changes
Commits on Source (2)
...@@ -223,11 +223,10 @@ def set_obserr_assimilate_in_obsseqout(oso, outfile="./obs_seq.out"): ...@@ -223,11 +223,10 @@ def set_obserr_assimilate_in_obsseqout(oso, outfile="./obs_seq.out"):
osf_prior (ObsSeq): python representation of obs_seq.final (output of filter in evaluate-mode without posterior) osf_prior (ObsSeq): python representation of obs_seq.final (output of filter in evaluate-mode without posterior)
contains prior values; used for parameterized errors contains prior values; used for parameterized errors
""" """
obs_kind_nrs = obskind_read(cfg.dir_dart_src)
for obscfg in cfg.assimilate_these_observations: for obscfg in cfg.assimilate_these_observations:
kind_str = obscfg['kind'] # e.g. 'RADIOSONDE_TEMPERATURE' kind_str = obscfg['kind'] # e.g. 'RADIOSONDE_TEMPERATURE'
kind = obs_kind_nrs[kind_str] # e.g. 263 kind = cfg.obs_kind_nrs[kind_str] # e.g. 263
# modify observation error of each kind sequentially # modify observation error of each kind sequentially
where_oso_iskind = oso.df.kind == kind where_oso_iskind = oso.df.kind == kind
...@@ -520,13 +519,14 @@ def get_obsseq_out(time, prior_path_exp, prior_init_time, prior_valid_time, lag= ...@@ -520,13 +519,14 @@ def get_obsseq_out(time, prior_path_exp, prior_init_time, prior_valid_time, lag=
""" """
use_ACF = False use_ACF = False
if 'assimilate_cloudfractions' in cfg: if 'assimilate_cloudfractions' in cfg:
use_ACF = True if cfg.assimilate_cloudfractions:
use_ACF = True
oso = None oso = None
if isinstance(cfg.use_existing_obsseq, str): if isinstance(cfg.assimilate_existing_obsseq, str):
# assume that the user wants to use an existing obs_seq.out file # assume that the user wants to use an existing obs_seq.out file
f_obsseq = time.strftime(cfg.use_existing_obsseq) f_obsseq = time.strftime(cfg.assimilate_existing_obsseq)
if os.path.isfile(f_obsseq): if os.path.isfile(f_obsseq):
# copy to run_DART folder # copy to run_DART folder
copy(f_obsseq, cfg.dir_dart_run+'/obs_seq.out') copy(f_obsseq, cfg.dir_dart_run+'/obs_seq.out')
...@@ -534,21 +534,33 @@ def get_obsseq_out(time, prior_path_exp, prior_init_time, prior_valid_time, lag= ...@@ -534,21 +534,33 @@ def get_obsseq_out(time, prior_path_exp, prior_init_time, prior_valid_time, lag=
else: else:
# explain the error if the file does not exist # explain the error if the file does not exist
raise IOError('cfg.use_existing_obsseq is not False. \n' raise IOError('cfg.assimilate_existing_obsseq is not False. \n'
+ 'In this case, use_existing_obsseq should be a file path (wildcards %H, %M allowed)!\n' + 'In this case, assimilate_existing_obsseq should be a file path (wildcards %H, %M allowed)!\n'
+ 'But there is no file with this pattern: '+str(cfg.use_existing_obsseq)) + 'But there is no file with this pattern: '+str(cfg.assimilate_existing_obsseq))
elif use_ACF: elif use_ACF:
# prepare observations with precomputed FO # prepare observations with precomputed FO
pattern_prior = prior_path_exp + prior_init_time.strftime( CF_config = cfg.CF_config.copy()
'/%Y-%m-%d_%H:%M/<iens>/') + prior_valid_time.strftime(cfg.first_guess_pattern) f_prior_pattern = CF_config.pop('first_guess_pattern')
f_obs_pattern = time.strftime(CF_config.pop('f_obs_pattern'))
f_obs = glob.glob(f_obs_pattern)
if len(f_obs) == 0:
raise FileNotFoundError(f_obs_pattern + ' not found')
f_obs = f_obs[0]
pattern_prior = '/'.join([prior_path_exp,
prior_init_time.strftime('/%Y-%m-%d_%H:%M/'),
'<iens>/',
prior_valid_time.strftime(f_prior_pattern),
])
from CloudFractionDA import obsseqout as cfoso from CloudFractionDA import obsseqout as cfoso
cfoso.write_obsseq(time, pattern_prior, cfoso.write_obsseq(time, pattern_prior, f_obs,
time.strftime(cfg.obs_file_pattern), cfg.obs_kind_nrs,
cfg.cloudfraction_variable, path_output = cfg.dir_dart_run + "/obs_seq.out",
path_output=cfg.dir_dart_run + "/obs_seq.out", **CF_config,
) #**ACF_config)
)
else: else:
# do NOT use an existing obs_seq.out file # do NOT use an existing obs_seq.out file
...@@ -595,6 +607,8 @@ def main(cfg: Config): ...@@ -595,6 +607,8 @@ def main(cfg: Config):
prior_valid_time = cfg.prior_valid_time prior_valid_time = cfg.prior_valid_time
prior_path_exp = cfg.prior_path_exp prior_path_exp = cfg.prior_path_exp
cfg.obs_kind_nrs = obskind_read(cfg.dir_dart_src)
# do_reject_smallFGD: triggers additional evaluations of prior & posterior # do_reject_smallFGD: triggers additional evaluations of prior & posterior
do_reject_smallFGD = getattr(cfg, "do_reject_smallFGD", False) do_reject_smallFGD = getattr(cfg, "do_reject_smallFGD", False)
prepare_run_DART_folder(cfg) prepare_run_DART_folder(cfg)
......
...@@ -11,10 +11,10 @@ from dartwrf.utils import Config, copy, try_remove ...@@ -11,10 +11,10 @@ from dartwrf.utils import Config, copy, try_remove
def run(cfg: Config) -> None: def run(cfg: Config) -> None:
# defaults # defaults
hist_interval_s = 300 hist_interval_s = 300
restart = False
restart_interval = 9999 # dummy restart_interval = 9999 # dummy
# overwrite with config # overwrite with config
restart = True
if 'restart' in cfg: if 'restart' in cfg:
restart = cfg.restart restart = cfg.restart
...@@ -32,39 +32,44 @@ def run(cfg: Config) -> None: ...@@ -32,39 +32,44 @@ def run(cfg: Config) -> None:
if 'hist_interval_s' in cfg: if 'hist_interval_s' in cfg:
hist_interval_s = cfg.hist_interval_s hist_interval_s = cfg.hist_interval_s
# replace these variables in the namelist # replace these variables in the namelist
rst_flag = '.true.' if restart else '.false.' rst_flag = '.true.' if restart else '.false.'
replace_dict = {
'time_control': {
'start_year': start.strftime('%Y'),
'start_month': start.strftime('%m'),
'start_day': start.strftime('%d'),
'start_hour': start.strftime('%H'),
'start_minute': start.strftime('%M'),
'start_second': start.strftime('%S'),
'end_year': end.strftime('%Y'),
'end_month': end.strftime('%m'),
'end_day': end.strftime('%d'),
'end_hour': end.strftime('%H'),
'end_minute': end.strftime('%M'),
'end_second': end.strftime('%S'),
'history_interval_s': str(int(hist_interval_s)),
'restart_interval': str(int(restart_interval)),
'restart': rst_flag,
},
'domains': {
'dx': str(cfg.model_dx),
}}
if 'c_s' in cfg:
replace_dict['dynamics'] = {'c_s': str(cfg.c_s)}
print('prepare namelists from', start, 'to', end, print('prepare namelists from', start, 'to', end,
', restart=', restart, 'restart_interval=', restart_interval) ', restart=', restart, 'restart_interval=', restart_interval)
for iens in range(1, cfg.ensemble_size+1): for iens in range(1, cfg.ensemble_size+1):
replace_dict = { replace_dict['time_control'].update({
'time_control': { 'history_outname': "'"+cfg.dir_archive+'/'+start.strftime('%Y-%m-%d_%H:%M')+"/"+str(iens)+"/wrfout_d<domain>_<date>'",
'start_year': start.strftime('%Y'), 'rst_outname': "'"+cfg.dir_archive+'/'+start.strftime('%Y-%m-%d_%H:%M')+"/"+str(iens)+"/wrfrst_d<domain>_<date>'",
'start_month': start.strftime('%m'), })
'start_day': start.strftime('%d'),
'start_hour': start.strftime('%H'),
'start_minute': start.strftime('%M'),
'start_second': start.strftime('%S'),
'end_year': end.strftime('%Y'),
'end_month': end.strftime('%m'),
'end_day': end.strftime('%d'),
'end_hour': end.strftime('%H'),
'end_minute': end.strftime('%M'),
'end_second': end.strftime('%S'),
'history_interval_s': str(int(hist_interval_s)),
'restart_interval': str(int(restart_interval)),
'restart': rst_flag,
'history_outname': "'"+cfg.dir_archive+'/'+start.strftime('%Y-%m-%d_%H:%M')+"/"+str(iens)+"/wrfout_d<domain>_<date>'",
'rst_outname': "'"+cfg.dir_archive+'/'+start.strftime('%Y-%m-%d_%H:%M')+"/"+str(iens)+"/wrfrst_d<domain>_<date>'",
},
'domains': {
'dx': str(cfg.model_dx),
},
}
# define defaults from Config # define defaults from Config
nml = WRF_namelist() nml = WRF_namelist()
nml.read(cfg.WRF_namelist_template) nml.read(cfg.WRF_namelist_template)
...@@ -73,13 +78,12 @@ def run(cfg: Config) -> None: ...@@ -73,13 +78,12 @@ def run(cfg: Config) -> None:
for section, section_dict in replace_dict.items(): for section, section_dict in replace_dict.items():
for key, value in section_dict.items(): for key, value in section_dict.items():
nml.namelist[section][key] = value nml.namelist[section][key] = value
f_out = cfg.dir_wrf_run.replace('<ens>', str(iens) f_out = cfg.dir_wrf_run.replace('<ens>', str(iens)
)+'/namelist.input' )+'/namelist.input'
try_remove(f_out) try_remove(f_out)
nml.write(f_out) nml.write(f_out)
#print('saved', f_out)
# copy to archive # copy to archive
init = start.strftime('/%Y-%m-%d_%H:%M/') init = start.strftime('/%Y-%m-%d_%H:%M/')
......
"""Script to pretty-pring a config file (pickle format)"""
import sys, pickle, pprint
f = sys.argv[1]
with open(f, 'rb') as f:
cfg = pickle.load(f)
pprint.pprint(cfg)
\ No newline at end of file
...@@ -20,6 +20,7 @@ import importlib.util ...@@ -20,6 +20,7 @@ import importlib.util
import random import random
import string import string
import pickle import pickle
import yaml
userhome = os.path.expanduser('~') userhome = os.path.expanduser('~')
...@@ -56,7 +57,7 @@ class Config(object): ...@@ -56,7 +57,7 @@ class Config(object):
`loc_horiz_km`: float of horizontal localization half-width in km; `loc_horiz_km`: float of horizontal localization half-width in km;
`loc_vert_km`: float of vertical localization half-width in km; `loc_vert_km`: float of vertical localization half-width in km;
use_existing_obsseq (str, False): Path to existing obs_seq.out file (False: generate new one); assimilate_existing_obsseq (str, False): Path to existing obs_seq.out file (False: generate new one);
time string is replaced by actual time: /path/%Y-%m-%d_%H:%M_obs_seq.out time string is replaced by actual time: /path/%Y-%m-%d_%H:%M_obs_seq.out
dart_nml (dict): updates to the default input.nml of DART (in dart_srcdir) dart_nml (dict): updates to the default input.nml of DART (in dart_srcdir)
...@@ -88,7 +89,7 @@ class Config(object): ...@@ -88,7 +89,7 @@ class Config(object):
# optional # optional
update_vars: list=[], update_vars: list=[],
dart_nml: dict={}, dart_nml: dict={},
use_existing_obsseq: bool | str = False, assimilate_existing_obsseq: bool | str = False,
nature_wrfout_pattern: bool | str = False, nature_wrfout_pattern: bool | str = False,
# others # others
...@@ -121,7 +122,7 @@ class Config(object): ...@@ -121,7 +122,7 @@ class Config(object):
self.pattern_obs_seq_final = pattern_obs_seq_final.replace('<archivedir>', self.dir_archive) self.pattern_obs_seq_final = pattern_obs_seq_final.replace('<archivedir>', self.dir_archive)
# optional # optional
self.use_existing_obsseq = use_existing_obsseq self.assimilate_existing_obsseq = assimilate_existing_obsseq
self.nature_wrfout_pattern = nature_wrfout_pattern self.nature_wrfout_pattern = nature_wrfout_pattern
# user defined # user defined
...@@ -135,12 +136,12 @@ class Config(object): ...@@ -135,12 +136,12 @@ class Config(object):
if not dart_nml: if not dart_nml:
warnings.warn('No `dart_nml` defined, using default DART namelist!') warnings.warn('No `dart_nml` defined, using default DART namelist!')
if not isinstance(use_existing_obsseq, str): if not isinstance(assimilate_existing_obsseq, str):
if use_existing_obsseq != False: if assimilate_existing_obsseq != False:
raise ValueError('`use_existing_obsseq` must be a string or False, but is', use_existing_obsseq) raise ValueError('`assimilate_existing_obsseq` must be a string or False, but is', assimilate_existing_obsseq)
if isinstance(use_existing_obsseq, str): if isinstance(assimilate_existing_obsseq, str):
print('Using existing observation sequence', use_existing_obsseq) print('Using existing observation sequence', assimilate_existing_obsseq)
# required attributes, derived from others # required attributes, derived from others
self.dir_archive = self.dir_archive.replace('<exp>', self.name) self.dir_archive = self.dir_archive.replace('<exp>', self.name)
...@@ -152,6 +153,7 @@ class Config(object): ...@@ -152,6 +153,7 @@ class Config(object):
self.f_cfg_base = self.dir_archive + '/configs/' self.f_cfg_base = self.dir_archive + '/configs/'
# write config to file # write config to file
self.use_pickle = True
self.f_cfg_current = self.generate_name() self.f_cfg_current = self.generate_name()
self.to_file(self.f_cfg_current) self.to_file(self.f_cfg_current)
...@@ -167,7 +169,7 @@ class Config(object): ...@@ -167,7 +169,7 @@ class Config(object):
def generate_name(self): def generate_name(self):
random_str = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4)) random_str = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
return self.f_cfg_base+'/cfg_'+random_str+'.pkl' return self.f_cfg_base+'/cfg_'+random_str+'.yaml'
def update(self, **kwargs): def update(self, **kwargs):
"""Update the configuration with new values """Update the configuration with new values
...@@ -187,7 +189,10 @@ class Config(object): ...@@ -187,7 +189,10 @@ class Config(object):
@staticmethod @staticmethod
def from_file(fname: str) -> 'Config': def from_file(fname: str) -> 'Config':
"""Read a configuration from a file""" """Read a configuration from a file"""
d = read_Config_as_dict(fname) if True:
d = read_pickle(fname)
else:
d = read_yaml(fname)
return Config(**d) return Config(**d)
def to_file(self, filename: str): def to_file(self, filename: str):
...@@ -195,24 +200,30 @@ class Config(object): ...@@ -195,24 +200,30 @@ class Config(object):
os.makedirs(os.path.dirname(filename), exist_ok=True) os.makedirs(os.path.dirname(filename), exist_ok=True)
d = self.__dict__ d = self.__dict__
with open(filename, 'wb') as handle: if self.use_pickle:
pickle.dump(d, handle, protocol=pickle.HIGHEST_PROTOCOL) with open(filename, 'wb') as handle:
pickle.dump(d, handle, protocol=pickle.HIGHEST_PROTOCOL)
else:
with open(filename, 'w') as f:
yaml.dump(d, f)
if self.debug: if self.debug:
print('Wrote config to', filename) print('Wrote config to', filename)
def read_Config_as_dict(filename: str) -> dict: def read_pickle(filename: str) -> dict:
"""Read a dictionary from a python file, """Read a dictionary from a python file,
return as Config object return as Config object
""" """
with open(filename, 'rb') as handle: with open(filename, 'rb') as handle:
d = pickle.load(handle) return pickle.load(handle)
print('read config', filename)
return d def read_yaml(filename: str) -> dict:
with open(filename, 'r') as f:
return yaml.load(f, Loader=yaml.FullLoader)
def display_config(filename: str) -> None: def display_config(filename: str) -> None:
d = read_Config_as_dict(filename) d = read_pickle(filename)
pprint(d) pprint(d)
def shell(args, pythonpath=None): def shell(args, pythonpath=None):
...@@ -244,10 +255,9 @@ def copy(src, dst, remove_if_exists=True): ...@@ -244,10 +255,9 @@ def copy(src, dst, remove_if_exists=True):
def try_remove(f): def try_remove(f):
try: try:
os.remove(f) os.remove(f)
except: except FileNotFoundError:
pass pass
def mkdir(path): def mkdir(path):
os.system('mkdir -p '+path) os.system('mkdir -p '+path)
......
...@@ -18,22 +18,14 @@ from dartwrf.utils import Config ...@@ -18,22 +18,14 @@ from dartwrf.utils import Config
class WorkFlows(object): class WorkFlows(object):
def __init__(self, cfg: Config): def __init__(self, cfg: Config):
"""Set up the experiment folder in `archivedir`. """Create the archive directory, copy scripts to archive
1. Copy the selected config files
2. Import configurations
3. Prepare obskind.py file (dictionary of observation types)
4. Copy the scripts and configs to `archivedir`
5. Set python path
6. Set log path and slurm scripts path
Args:
exp_config (str): Path to exp config file
server_config (str): Path to the cluster config file
Attributes: Attributes:
cluster (obj): cluster configuration as defined in server_config file use_slurm: if True, run jobs with SLURM
exp (obj): experiment configuration as defined in exp_config file dir_log: Logging directory for slurm
dir_slurm: Scripts directory for slurm
dir_dartwrf_run: as defined in Config
python: python command with prepended PYTHONPATH
""" """
print('------------------------------------------------------') print('------------------------------------------------------')
print('>>> Experiment name: "'+cfg.name+'"') print('>>> Experiment name: "'+cfg.name+'"')
...@@ -42,7 +34,7 @@ class WorkFlows(object): ...@@ -42,7 +34,7 @@ class WorkFlows(object):
# Copy scripts and config files to `self.archivedir` folder # Copy scripts and config files to `self.archivedir` folder
dirs_exist_ok = False dirs_exist_ok = False
if os.path.exists(cfg.dir_archive+'/DART-WRF/'): if os.path.exists(cfg.dir_archive+'/DART-WRF/'):
if input('The experiment name already exists! Overwrite existing experiment? (Y/n) ') in ['Y', 'y']: if input('The experiment name already exists! Overwrite existing scripts? (Y/n) ') in ['Y', 'y']:
dirs_exist_ok = True dirs_exist_ok = True
shutil.copytree(cfg.dir_dartwrf_dev, shutil.copytree(cfg.dir_dartwrf_dev,
...@@ -68,10 +60,9 @@ class WorkFlows(object): ...@@ -68,10 +60,9 @@ class WorkFlows(object):
print('------------------------------------------------------') print('------------------------------------------------------')
# use this python path # use this python path
run_wrf_from_this_folder = cfg.dir_dartwrf_run+'/../' pythonpath_archive = cfg.dir_dartwrf_run+'/../'
self.dir_dartwrf_run = cfg.dir_dartwrf_run self.dir_dartwrf_run = cfg.dir_dartwrf_run
self.python = 'export PYTHONPATH=' +run_wrf_from_this_folder+ '; '+cfg.python self.python = 'export PYTHONPATH=' +pythonpath_archive+ '; '+cfg.python
self.cfg = cfg
def run_job(self, cmd, cfg, depends_on=None, **kwargs): def run_job(self, cmd, cfg, depends_on=None, **kwargs):
"""Run scripts in a shell """Run scripts in a shell
...@@ -92,7 +83,10 @@ class WorkFlows(object): ...@@ -92,7 +83,10 @@ class WorkFlows(object):
from slurmpy import Slurm from slurmpy import Slurm
# name of calling function # name of calling function
path_to_script = inspect.stack()[1].function path_to_script = inspect.stack()[1].function
jobname = path_to_script.split('/')[-1]+'-'+cfg.f_cfg_current.split('/')[-1].replace('.py','') if 'time' in cfg:
jobname = path_to_script.split('/')[-1]+'-'+cfg.time.strftime('%H:%M')
else:
jobname = path_to_script.split('/')[-1]+'-'+cfg.name
print('> SLURM job:', jobname) print('> SLURM job:', jobname)
slurm_kwargs = cfg.slurm_kwargs.copy() slurm_kwargs = cfg.slurm_kwargs.copy()
...@@ -175,12 +169,11 @@ class WorkFlows(object): ...@@ -175,12 +169,11 @@ class WorkFlows(object):
Returns: Returns:
str: job ID of the submitted job str: job ID of the submitted job
""" """
cmd = script_to_str(self.cfg.WRF_ideal_template cmd = script_to_str(cfg.WRF_ideal_template
).replace('<expname>', cfg.name ).replace('<wrf_rundir_base>', cfg.dir_wrf_run.replace('<ens>', '$IENS')
).replace('<wrf_rundir_base>', cfg.dir_wrf_run
).replace('<wrf_modules>', cfg.wrf_modules, ).replace('<wrf_modules>', cfg.wrf_modules,
) )
id = self.run_job(cmd, cfg, depends_on=[depends_on], time="30") id = self.run_job(cmd, cfg, depends_on=[depends_on], time="30", array="1-"+str(cfg.ensemble_size))
return id return id
def run_WRF(self, cfg, depends_on=None): def run_WRF(self, cfg, depends_on=None):
...@@ -191,31 +184,32 @@ class WorkFlows(object): ...@@ -191,31 +184,32 @@ class WorkFlows(object):
end = cfg.WRF_end end = cfg.WRF_end
# SLURM configuration for WRF # SLURM configuration for WRF
slurm_kwargs = {"array": "1-"+str(self.cfg.ensemble_size), slurm_kwargs = {"array": "1-"+str(cfg.ensemble_size),
"nodes": "1", "nodes": "1",
"ntasks": str(self.cfg.max_nproc_for_each_ensemble_member), "ntasks": str(cfg.max_nproc_for_each_ensemble_member),
"ntasks-per-core": "1", "mem": "90G", } "ntasks-per-core": "1", "mem": "90G",
"ntasks-per-node": str(cfg.max_nproc_for_each_ensemble_member),}
# command from template file # command from template file
wrf_cmd = script_to_str(self.cfg.WRF_exe_template wrf_cmd = script_to_str(cfg.WRF_exe_template
).replace('<dir_wrf_run>', self.cfg.dir_wrf_run.replace('<ens>', '$IENS') ).replace('<dir_wrf_run>', cfg.dir_wrf_run.replace('<ens>', '$IENS')
).replace('<wrf_modules>', self.cfg.wrf_modules, ).replace('<wrf_modules>', cfg.wrf_modules,
).replace('<WRF_number_of_processors>', str(self.cfg.max_nproc_for_each_ensemble_member), ).replace('<WRF_number_of_processors>', str(cfg.max_nproc_for_each_ensemble_member),
) )
# prepare namelist # prepare namelist
path_to_script = self.dir_dartwrf_run + '/prepare_namelist.py' path_to_script = self.dir_dartwrf_run + '/prepare_namelist.py'
cmd = ' '.join([self.python, path_to_script, self.cfg.f_cfg_current]) cmd = ' '.join([self.python, path_to_script, cfg.f_cfg_current])
id = self.run_job(cmd, cfg, depends_on=[depends_on]) id = self.run_job(cmd, cfg, depends_on=[depends_on])
# run WRF ensemble # run WRF ensemble
time_in_simulation_hours = (end-start).total_seconds()/3600 time_in_simulation_hours = (end-start).total_seconds()/3600
runtime_wallclock_mins_expected = int( # runtime_wallclock_mins_expected = int(time_in_simulation_hours*15*1.5 + 15)
time_in_simulation_hours*30 + 10) # usually <15 min/hour runtime_wallclock_mins_expected = int(time_in_simulation_hours*30*1.5 + 15)
# usually max 15 min/hour + 50% margin + 15 min buffer
slurm_kwargs.update({"time": str(runtime_wallclock_mins_expected)}) slurm_kwargs.update({"time": str(runtime_wallclock_mins_expected)})
if runtime_wallclock_mins_expected > 20:
if runtime_wallclock_mins_expected > 25:
slurm_kwargs.update({"partition": "amd"}) slurm_kwargs.update({"partition": "amd"})
# #cfg_update.update({"exclude": "jet03"}) #cfg_update.update({"exclude": "jet03"})
id = self.run_job(wrf_cmd, cfg, depends_on=[id], **slurm_kwargs) id = self.run_job(wrf_cmd, cfg, depends_on=[id], **slurm_kwargs)
return id return id
...@@ -271,18 +265,30 @@ class WorkFlows(object): ...@@ -271,18 +265,30 @@ class WorkFlows(object):
id = self.run_job(cmd, cfg, depends_on=[depends_on], time="10") id = self.run_job(cmd, cfg, depends_on=[depends_on], time="10")
return id return id
def create_satimages(self, cfg, depends_on=None): def run_RTTOV(self, cfg, depends_on=None):
"""Run a job array, one job per ensemble member, to create satellite images""" """Run a job array, one job per ensemble member, to create satellite images"""
prefix = 'module purge; module load rttov/v13.2-gcc-8.5.0; python' prefix = 'module purge; module load rttov/v13.2-gcc-8.5.0; python'
path_to_script = '~/RTTOV-WRF/run_init.py' path_to_script = '~/RTTOV-WRF/run_init.py'
cmd = ' '.join([prefix, path_to_script, cfg.f_cfg_current]) cmd = ' '.join([prefix, path_to_script,
cfg.dir_archive +cfg.time.strftime('/%Y-%m-%d_%H:%M/'),
'$SLURM_ARRAY_TASK_ID'])
id = self.run_job(cmd, cfg, depends_on=[depends_on], id = self.run_job(cmd, cfg, depends_on=[depends_on],
**{"ntasks": "1", "time": "60", "mem": "10G", **{"ntasks": "1", "time": "60", "mem": "10G",
"array": "1-"+str(self.cfg.ensemble_size)}) "array": "1-"+str(cfg.ensemble_size)})
return id return id
def verify(self, cfg: Config, depends_on=None):
"""Not included in DART-WRF"""
cmd = ' '.join(['python /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py',
cfg.name, cfg.verify_against, #cfg.time.strftime('%y%m%d_%H:%M'),
'sat', 'wrf', 'has_node', 'np=10', 'mem=250G'])
self.run_job(cmd, cfg, depends_on=[depends_on],
**{"time": "03:00:00", "mail-type": "FAIL,END",
"ntasks": "10", "ntasks-per-node": "10",
"ntasks-per-core": "1", "mem": "250G"})
def evaluate_obs_posterior_after_analysis(self, cfg, depends_on=None): def evaluate_obs_posterior_after_analysis(self, cfg, depends_on=None):
path_to_script = self.dir_dartwrf_run + '/evaluate_obs_space.py' path_to_script = self.dir_dartwrf_run + '/evaluate_obs_space.py'
...@@ -297,13 +303,4 @@ class WorkFlows(object): ...@@ -297,13 +303,4 @@ class WorkFlows(object):
# id = self.run_job(cmd, 'linpost'+self.cfg.name, cfg_update={"ntasks": "16", "mem": "80G", "ntasks-per-node": "16", "ntasks-per-core": "2", # id = self.run_job(cmd, 'linpost'+self.cfg.name, cfg_update={"ntasks": "16", "mem": "80G", "ntasks-per-node": "16", "ntasks-per-core": "2",
# "time": "15", "mail-type": "FAIL"}, # "time": "15", "mail-type": "FAIL"},
# depends_on=[id]) # depends_on=[id])
return id return id
\ No newline at end of file
def verify(self, cfg, depends_on=None):
"""Not included in DART-WRF"""
cmd = ' '.join(['python /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py',
cfg.f_cfg_current])
id = self.run_job(cmd, cfg, depends_on=[depends_on],
**{"time": "210", "mail-type": "FAIL,END",
"ntasks": "10", "ntasks-per-node": "10",
"ntasks-per-core": "1", "mem": "250G"})
...@@ -2,11 +2,8 @@ ...@@ -2,11 +2,8 @@
export SLURM_STEP_GRES=none export SLURM_STEP_GRES=none
echo "SLURM_ARRAY_TASK_ID:"$SLURM_ARRAY_TASK_ID echo "SLURM_ARRAY_TASK_ID:"$SLURM_ARRAY_TASK_ID
EXPNAME=<expname>
MAINDIR=<wrf_rundir_base>
IENS=$SLURM_ARRAY_TASK_ID IENS=$SLURM_ARRAY_TASK_ID
RUNDIR=$MAINDIR/$EXPNAME/$IENS RUNDIR=<wrf_rundir_base>
echo "ENSEMBLE NR: "$IENS" in "$RUNDIR echo "ENSEMBLE NR: "$IENS" in "$RUNDIR
cd $RUNDIR cd $RUNDIR
......