Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • dataassimilation/DART-WRF
1 result
Show changes
Commits on Source (2)
......@@ -223,11 +223,10 @@ def set_obserr_assimilate_in_obsseqout(oso, outfile="./obs_seq.out"):
osf_prior (ObsSeq): python representation of obs_seq.final (output of filter in evaluate-mode without posterior)
contains prior values; used for parameterized errors
"""
obs_kind_nrs = obskind_read(cfg.dir_dart_src)
for obscfg in cfg.assimilate_these_observations:
kind_str = obscfg['kind'] # e.g. 'RADIOSONDE_TEMPERATURE'
kind = obs_kind_nrs[kind_str] # e.g. 263
kind = cfg.obs_kind_nrs[kind_str] # e.g. 263
# modify observation error of each kind sequentially
where_oso_iskind = oso.df.kind == kind
......@@ -520,13 +519,14 @@ def get_obsseq_out(time, prior_path_exp, prior_init_time, prior_valid_time, lag=
"""
use_ACF = False
if 'assimilate_cloudfractions' in cfg:
use_ACF = True
if cfg.assimilate_cloudfractions:
use_ACF = True
oso = None
if isinstance(cfg.use_existing_obsseq, str):
if isinstance(cfg.assimilate_existing_obsseq, str):
# assume that the user wants to use an existing obs_seq.out file
f_obsseq = time.strftime(cfg.use_existing_obsseq)
f_obsseq = time.strftime(cfg.assimilate_existing_obsseq)
if os.path.isfile(f_obsseq):
# copy to run_DART folder
copy(f_obsseq, cfg.dir_dart_run+'/obs_seq.out')
......@@ -534,21 +534,33 @@ def get_obsseq_out(time, prior_path_exp, prior_init_time, prior_valid_time, lag=
else:
# explain the error if the file does not exist
raise IOError('cfg.use_existing_obsseq is not False. \n'
+ 'In this case, use_existing_obsseq should be a file path (wildcards %H, %M allowed)!\n'
+ 'But there is no file with this pattern: '+str(cfg.use_existing_obsseq))
raise IOError('cfg.assimilate_existing_obsseq is not False. \n'
+ 'In this case, assimilate_existing_obsseq should be a file path (wildcards %H, %M allowed)!\n'
+ 'But there is no file with this pattern: '+str(cfg.assimilate_existing_obsseq))
elif use_ACF:
# prepare observations with precomputed FO
pattern_prior = prior_path_exp + prior_init_time.strftime(
'/%Y-%m-%d_%H:%M/<iens>/') + prior_valid_time.strftime(cfg.first_guess_pattern)
CF_config = cfg.CF_config.copy()
f_prior_pattern = CF_config.pop('first_guess_pattern')
f_obs_pattern = time.strftime(CF_config.pop('f_obs_pattern'))
f_obs = glob.glob(f_obs_pattern)
if len(f_obs) == 0:
raise FileNotFoundError(f_obs_pattern + ' not found')
f_obs = f_obs[0]
pattern_prior = '/'.join([prior_path_exp,
prior_init_time.strftime('/%Y-%m-%d_%H:%M/'),
'<iens>/',
prior_valid_time.strftime(f_prior_pattern),
])
from CloudFractionDA import obsseqout as cfoso
cfoso.write_obsseq(time, pattern_prior,
time.strftime(cfg.obs_file_pattern),
cfg.cloudfraction_variable,
path_output=cfg.dir_dart_run + "/obs_seq.out",
) #**ACF_config)
cfoso.write_obsseq(time, pattern_prior, f_obs,
cfg.obs_kind_nrs,
path_output = cfg.dir_dart_run + "/obs_seq.out",
**CF_config,
)
else:
# do NOT use an existing obs_seq.out file
......@@ -595,6 +607,8 @@ def main(cfg: Config):
prior_valid_time = cfg.prior_valid_time
prior_path_exp = cfg.prior_path_exp
cfg.obs_kind_nrs = obskind_read(cfg.dir_dart_src)
# do_reject_smallFGD: triggers additional evaluations of prior & posterior
do_reject_smallFGD = getattr(cfg, "do_reject_smallFGD", False)
prepare_run_DART_folder(cfg)
......
......@@ -11,10 +11,10 @@ from dartwrf.utils import Config, copy, try_remove
def run(cfg: Config) -> None:
# defaults
hist_interval_s = 300
restart = False
restart_interval = 9999 # dummy
# overwrite with config
restart = True
if 'restart' in cfg:
restart = cfg.restart
......@@ -32,39 +32,44 @@ def run(cfg: Config) -> None:
if 'hist_interval_s' in cfg:
hist_interval_s = cfg.hist_interval_s
# replace these variables in the namelist
rst_flag = '.true.' if restart else '.false.'
replace_dict = {
'time_control': {
'start_year': start.strftime('%Y'),
'start_month': start.strftime('%m'),
'start_day': start.strftime('%d'),
'start_hour': start.strftime('%H'),
'start_minute': start.strftime('%M'),
'start_second': start.strftime('%S'),
'end_year': end.strftime('%Y'),
'end_month': end.strftime('%m'),
'end_day': end.strftime('%d'),
'end_hour': end.strftime('%H'),
'end_minute': end.strftime('%M'),
'end_second': end.strftime('%S'),
'history_interval_s': str(int(hist_interval_s)),
'restart_interval': str(int(restart_interval)),
'restart': rst_flag,
},
'domains': {
'dx': str(cfg.model_dx),
}}
if 'c_s' in cfg:
replace_dict['dynamics'] = {'c_s': str(cfg.c_s)}
print('prepare namelists from', start, 'to', end,
', restart=', restart, 'restart_interval=', restart_interval)
for iens in range(1, cfg.ensemble_size+1):
replace_dict = {
'time_control': {
'start_year': start.strftime('%Y'),
'start_month': start.strftime('%m'),
'start_day': start.strftime('%d'),
'start_hour': start.strftime('%H'),
'start_minute': start.strftime('%M'),
'start_second': start.strftime('%S'),
'end_year': end.strftime('%Y'),
'end_month': end.strftime('%m'),
'end_day': end.strftime('%d'),
'end_hour': end.strftime('%H'),
'end_minute': end.strftime('%M'),
'end_second': end.strftime('%S'),
'history_interval_s': str(int(hist_interval_s)),
'restart_interval': str(int(restart_interval)),
'restart': rst_flag,
'history_outname': "'"+cfg.dir_archive+'/'+start.strftime('%Y-%m-%d_%H:%M')+"/"+str(iens)+"/wrfout_d<domain>_<date>'",
'rst_outname': "'"+cfg.dir_archive+'/'+start.strftime('%Y-%m-%d_%H:%M')+"/"+str(iens)+"/wrfrst_d<domain>_<date>'",
},
'domains': {
'dx': str(cfg.model_dx),
},
}
replace_dict['time_control'].update({
'history_outname': "'"+cfg.dir_archive+'/'+start.strftime('%Y-%m-%d_%H:%M')+"/"+str(iens)+"/wrfout_d<domain>_<date>'",
'rst_outname': "'"+cfg.dir_archive+'/'+start.strftime('%Y-%m-%d_%H:%M')+"/"+str(iens)+"/wrfrst_d<domain>_<date>'",
})
# define defaults from Config
nml = WRF_namelist()
nml.read(cfg.WRF_namelist_template)
......@@ -73,13 +78,12 @@ def run(cfg: Config) -> None:
for section, section_dict in replace_dict.items():
for key, value in section_dict.items():
nml.namelist[section][key] = value
f_out = cfg.dir_wrf_run.replace('<ens>', str(iens)
)+'/namelist.input'
try_remove(f_out)
nml.write(f_out)
#print('saved', f_out)
# copy to archive
init = start.strftime('/%Y-%m-%d_%H:%M/')
......
"""Script to pretty-pring a config file (pickle format)"""
import sys, pickle, pprint
f = sys.argv[1]
with open(f, 'rb') as f:
cfg = pickle.load(f)
pprint.pprint(cfg)
\ No newline at end of file
......@@ -20,6 +20,7 @@ import importlib.util
import random
import string
import pickle
import yaml
userhome = os.path.expanduser('~')
......@@ -56,7 +57,7 @@ class Config(object):
`loc_horiz_km`: float of horizontal localization half-width in km;
`loc_vert_km`: float of vertical localization half-width in km;
use_existing_obsseq (str, False): Path to existing obs_seq.out file (False: generate new one);
assimilate_existing_obsseq (str, False): Path to existing obs_seq.out file (False: generate new one);
time string is replaced by actual time: /path/%Y-%m-%d_%H:%M_obs_seq.out
dart_nml (dict): updates to the default input.nml of DART (in dart_srcdir)
......@@ -88,7 +89,7 @@ class Config(object):
# optional
update_vars: list=[],
dart_nml: dict={},
use_existing_obsseq: bool | str = False,
assimilate_existing_obsseq: bool | str = False,
nature_wrfout_pattern: bool | str = False,
# others
......@@ -121,7 +122,7 @@ class Config(object):
self.pattern_obs_seq_final = pattern_obs_seq_final.replace('<archivedir>', self.dir_archive)
# optional
self.use_existing_obsseq = use_existing_obsseq
self.assimilate_existing_obsseq = assimilate_existing_obsseq
self.nature_wrfout_pattern = nature_wrfout_pattern
# user defined
......@@ -135,12 +136,12 @@ class Config(object):
if not dart_nml:
warnings.warn('No `dart_nml` defined, using default DART namelist!')
if not isinstance(use_existing_obsseq, str):
if use_existing_obsseq != False:
raise ValueError('`use_existing_obsseq` must be a string or False, but is', use_existing_obsseq)
if not isinstance(assimilate_existing_obsseq, str):
if assimilate_existing_obsseq != False:
raise ValueError('`assimilate_existing_obsseq` must be a string or False, but is', assimilate_existing_obsseq)
if isinstance(use_existing_obsseq, str):
print('Using existing observation sequence', use_existing_obsseq)
if isinstance(assimilate_existing_obsseq, str):
print('Using existing observation sequence', assimilate_existing_obsseq)
# required attributes, derived from others
self.dir_archive = self.dir_archive.replace('<exp>', self.name)
......@@ -152,6 +153,7 @@ class Config(object):
self.f_cfg_base = self.dir_archive + '/configs/'
# write config to file
self.use_pickle = True
self.f_cfg_current = self.generate_name()
self.to_file(self.f_cfg_current)
......@@ -167,7 +169,7 @@ class Config(object):
def generate_name(self):
random_str = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
return self.f_cfg_base+'/cfg_'+random_str+'.pkl'
return self.f_cfg_base+'/cfg_'+random_str+'.yaml'
def update(self, **kwargs):
"""Update the configuration with new values
......@@ -187,7 +189,10 @@ class Config(object):
@staticmethod
def from_file(fname: str) -> 'Config':
"""Read a configuration from a file"""
d = read_Config_as_dict(fname)
if True:
d = read_pickle(fname)
else:
d = read_yaml(fname)
return Config(**d)
def to_file(self, filename: str):
......@@ -195,24 +200,30 @@ class Config(object):
os.makedirs(os.path.dirname(filename), exist_ok=True)
d = self.__dict__
with open(filename, 'wb') as handle:
pickle.dump(d, handle, protocol=pickle.HIGHEST_PROTOCOL)
if self.use_pickle:
with open(filename, 'wb') as handle:
pickle.dump(d, handle, protocol=pickle.HIGHEST_PROTOCOL)
else:
with open(filename, 'w') as f:
yaml.dump(d, f)
if self.debug:
print('Wrote config to', filename)
def read_Config_as_dict(filename: str) -> dict:
def read_pickle(filename: str) -> dict:
"""Read a dictionary from a python file,
return as Config object
"""
with open(filename, 'rb') as handle:
d = pickle.load(handle)
print('read config', filename)
return d
return pickle.load(handle)
def read_yaml(filename: str) -> dict:
with open(filename, 'r') as f:
return yaml.load(f, Loader=yaml.FullLoader)
def display_config(filename: str) -> None:
d = read_Config_as_dict(filename)
d = read_pickle(filename)
pprint(d)
def shell(args, pythonpath=None):
......@@ -244,10 +255,9 @@ def copy(src, dst, remove_if_exists=True):
def try_remove(f):
try:
os.remove(f)
except:
except FileNotFoundError:
pass
def mkdir(path):
os.system('mkdir -p '+path)
......
......@@ -18,22 +18,14 @@ from dartwrf.utils import Config
class WorkFlows(object):
def __init__(self, cfg: Config):
"""Set up the experiment folder in `archivedir`.
1. Copy the selected config files
2. Import configurations
3. Prepare obskind.py file (dictionary of observation types)
4. Copy the scripts and configs to `archivedir`
5. Set python path
6. Set log path and slurm scripts path
Args:
exp_config (str): Path to exp config file
server_config (str): Path to the cluster config file
"""Create the archive directory, copy scripts to archive
Attributes:
cluster (obj): cluster configuration as defined in server_config file
exp (obj): experiment configuration as defined in exp_config file
use_slurm: if True, run jobs with SLURM
dir_log: Logging directory for slurm
dir_slurm: Scripts directory for slurm
dir_dartwrf_run: as defined in Config
python: python command with prepended PYTHONPATH
"""
print('------------------------------------------------------')
print('>>> Experiment name: "'+cfg.name+'"')
......@@ -42,7 +34,7 @@ class WorkFlows(object):
# Copy scripts and config files to `self.archivedir` folder
dirs_exist_ok = False
if os.path.exists(cfg.dir_archive+'/DART-WRF/'):
if input('The experiment name already exists! Overwrite existing experiment? (Y/n) ') in ['Y', 'y']:
if input('The experiment name already exists! Overwrite existing scripts? (Y/n) ') in ['Y', 'y']:
dirs_exist_ok = True
shutil.copytree(cfg.dir_dartwrf_dev,
......@@ -68,10 +60,9 @@ class WorkFlows(object):
print('------------------------------------------------------')
# use this python path
run_wrf_from_this_folder = cfg.dir_dartwrf_run+'/../'
pythonpath_archive = cfg.dir_dartwrf_run+'/../'
self.dir_dartwrf_run = cfg.dir_dartwrf_run
self.python = 'export PYTHONPATH=' +run_wrf_from_this_folder+ '; '+cfg.python
self.cfg = cfg
self.python = 'export PYTHONPATH=' +pythonpath_archive+ '; '+cfg.python
def run_job(self, cmd, cfg, depends_on=None, **kwargs):
"""Run scripts in a shell
......@@ -92,7 +83,10 @@ class WorkFlows(object):
from slurmpy import Slurm
# name of calling function
path_to_script = inspect.stack()[1].function
jobname = path_to_script.split('/')[-1]+'-'+cfg.f_cfg_current.split('/')[-1].replace('.py','')
if 'time' in cfg:
jobname = path_to_script.split('/')[-1]+'-'+cfg.time.strftime('%H:%M')
else:
jobname = path_to_script.split('/')[-1]+'-'+cfg.name
print('> SLURM job:', jobname)
slurm_kwargs = cfg.slurm_kwargs.copy()
......@@ -175,12 +169,11 @@ class WorkFlows(object):
Returns:
str: job ID of the submitted job
"""
cmd = script_to_str(self.cfg.WRF_ideal_template
).replace('<expname>', cfg.name
).replace('<wrf_rundir_base>', cfg.dir_wrf_run
cmd = script_to_str(cfg.WRF_ideal_template
).replace('<wrf_rundir_base>', cfg.dir_wrf_run.replace('<ens>', '$IENS')
).replace('<wrf_modules>', cfg.wrf_modules,
)
id = self.run_job(cmd, cfg, depends_on=[depends_on], time="30")
id = self.run_job(cmd, cfg, depends_on=[depends_on], time="30", array="1-"+str(cfg.ensemble_size))
return id
def run_WRF(self, cfg, depends_on=None):
......@@ -191,31 +184,32 @@ class WorkFlows(object):
end = cfg.WRF_end
# SLURM configuration for WRF
slurm_kwargs = {"array": "1-"+str(self.cfg.ensemble_size),
slurm_kwargs = {"array": "1-"+str(cfg.ensemble_size),
"nodes": "1",
"ntasks": str(self.cfg.max_nproc_for_each_ensemble_member),
"ntasks-per-core": "1", "mem": "90G", }
"ntasks": str(cfg.max_nproc_for_each_ensemble_member),
"ntasks-per-core": "1", "mem": "90G",
"ntasks-per-node": str(cfg.max_nproc_for_each_ensemble_member),}
# command from template file
wrf_cmd = script_to_str(self.cfg.WRF_exe_template
).replace('<dir_wrf_run>', self.cfg.dir_wrf_run.replace('<ens>', '$IENS')
).replace('<wrf_modules>', self.cfg.wrf_modules,
).replace('<WRF_number_of_processors>', str(self.cfg.max_nproc_for_each_ensemble_member),
wrf_cmd = script_to_str(cfg.WRF_exe_template
).replace('<dir_wrf_run>', cfg.dir_wrf_run.replace('<ens>', '$IENS')
).replace('<wrf_modules>', cfg.wrf_modules,
).replace('<WRF_number_of_processors>', str(cfg.max_nproc_for_each_ensemble_member),
)
# prepare namelist
path_to_script = self.dir_dartwrf_run + '/prepare_namelist.py'
cmd = ' '.join([self.python, path_to_script, self.cfg.f_cfg_current])
cmd = ' '.join([self.python, path_to_script, cfg.f_cfg_current])
id = self.run_job(cmd, cfg, depends_on=[depends_on])
# run WRF ensemble
time_in_simulation_hours = (end-start).total_seconds()/3600
runtime_wallclock_mins_expected = int(
time_in_simulation_hours*30 + 10) # usually <15 min/hour
# runtime_wallclock_mins_expected = int(time_in_simulation_hours*15*1.5 + 15)
runtime_wallclock_mins_expected = int(time_in_simulation_hours*30*1.5 + 15)
# usually max 15 min/hour + 50% margin + 15 min buffer
slurm_kwargs.update({"time": str(runtime_wallclock_mins_expected)})
if runtime_wallclock_mins_expected > 25:
if runtime_wallclock_mins_expected > 20:
slurm_kwargs.update({"partition": "amd"})
# #cfg_update.update({"exclude": "jet03"})
#cfg_update.update({"exclude": "jet03"})
id = self.run_job(wrf_cmd, cfg, depends_on=[id], **slurm_kwargs)
return id
......@@ -271,18 +265,30 @@ class WorkFlows(object):
id = self.run_job(cmd, cfg, depends_on=[depends_on], time="10")
return id
def create_satimages(self, cfg, depends_on=None):
def run_RTTOV(self, cfg, depends_on=None):
"""Run a job array, one job per ensemble member, to create satellite images"""
prefix = 'module purge; module load rttov/v13.2-gcc-8.5.0; python'
path_to_script = '~/RTTOV-WRF/run_init.py'
cmd = ' '.join([prefix, path_to_script, cfg.f_cfg_current])
cmd = ' '.join([prefix, path_to_script,
cfg.dir_archive +cfg.time.strftime('/%Y-%m-%d_%H:%M/'),
'$SLURM_ARRAY_TASK_ID'])
id = self.run_job(cmd, cfg, depends_on=[depends_on],
**{"ntasks": "1", "time": "60", "mem": "10G",
"array": "1-"+str(self.cfg.ensemble_size)})
"array": "1-"+str(cfg.ensemble_size)})
return id
def verify(self, cfg: Config, depends_on=None):
"""Not included in DART-WRF"""
cmd = ' '.join(['python /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py',
cfg.name, cfg.verify_against, #cfg.time.strftime('%y%m%d_%H:%M'),
'sat', 'wrf', 'has_node', 'np=10', 'mem=250G'])
self.run_job(cmd, cfg, depends_on=[depends_on],
**{"time": "03:00:00", "mail-type": "FAIL,END",
"ntasks": "10", "ntasks-per-node": "10",
"ntasks-per-core": "1", "mem": "250G"})
def evaluate_obs_posterior_after_analysis(self, cfg, depends_on=None):
path_to_script = self.dir_dartwrf_run + '/evaluate_obs_space.py'
......@@ -297,13 +303,4 @@ class WorkFlows(object):
# id = self.run_job(cmd, 'linpost'+self.cfg.name, cfg_update={"ntasks": "16", "mem": "80G", "ntasks-per-node": "16", "ntasks-per-core": "2",
# "time": "15", "mail-type": "FAIL"},
# depends_on=[id])
return id
def verify(self, cfg, depends_on=None):
"""Not included in DART-WRF"""
cmd = ' '.join(['python /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py',
cfg.f_cfg_current])
id = self.run_job(cmd, cfg, depends_on=[depends_on],
**{"time": "210", "mail-type": "FAIL,END",
"ntasks": "10", "ntasks-per-node": "10",
"ntasks-per-core": "1", "mem": "250G"})
return id
\ No newline at end of file
......@@ -2,11 +2,8 @@
export SLURM_STEP_GRES=none
echo "SLURM_ARRAY_TASK_ID:"$SLURM_ARRAY_TASK_ID
EXPNAME=<expname>
MAINDIR=<wrf_rundir_base>
IENS=$SLURM_ARRAY_TASK_ID
RUNDIR=$MAINDIR/$EXPNAME/$IENS
RUNDIR=<wrf_rundir_base>
echo "ENSEMBLE NR: "$IENS" in "$RUNDIR
cd $RUNDIR
......