Skip to content
Snippets Groups Projects
Commit e0526218 authored by Lukas Kugler's avatar Lukas Kugler
Browse files

small changes for singlescript mode, errormodel

parent 66def4b4
No related branches found
No related tags found
No related merge requests found
......@@ -33,9 +33,10 @@ cluster_defaults = dict(
WRF_exe_template = '/jetfs/home/lkugler/DART-WRF/templates/run_WRF.jet.sh',
WRF_ideal_template = '/jetfs/home/lkugler/DART-WRF/templates/run_WRF_ideal.sh',
slurm_kwargs = {"account": "lkugler", "partition": "all", "time": "00:30:00",
"ntasks": "1", "ntasks-per-core": "1", "mem": "30G",
"mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"},
slurm_kwargs = {"account": "lkugler", "partition": "devel", "time": "30",
"nodes": "1", "ntasks": "1", "ntasks-per-core": "1", "mem": "25G",
#"exclude": "jet07,jet11,jet16,jet18,jet19",
"mail-type": "FAIL,TIME_LIMIT_80", "mail-user": "lukas.kugler@univie.ac.at"},
# WRF file format, will only change if WRF changes
wrfout_format = '/wrfout_d01_%Y-%m-%d_%H:%M:%S',
......
__all__ = []
\ No newline at end of file
#__all__ = []
\ No newline at end of file
......@@ -14,7 +14,7 @@ from dartwrf.obs import obsseq
from dartwrf import dart_nml
def prepare_DART_grid_template():
def prepare_DART_grid_template(cfg):
"""Prepare DART grid template wrfinput_d01 file from a prior file
DART needs a wrfinput file as a template for the grid information.
......@@ -30,7 +30,7 @@ def prepare_DART_grid_template():
else:
pass # what now?
def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_path_exp):
def prepare_prior_ensemble(cfg, assim_time, prior_init_time, prior_valid_time, prior_path_exp):
"""Prepares DART files for running filter
i.e.
- links first guess state to DART first guess filenames
......@@ -70,8 +70,8 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
#if cluster.geo_em_forecast:
# wrfout_add_geo.run(cluster.geo_em_forecast, wrfout_dart)
use_linked_files_as_prior()
write_list_of_outputfiles()
use_linked_files_as_prior(cfg)
write_list_of_outputfiles(cfg)
print("removing preassim and filter_restart")
os.system("rm -rf " + cfg.dir_dart_run + "/preassim_*")
......@@ -82,7 +82,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
os.system("rm -rf " + cfg.dir_dart_run + "/obs_seq.fina*")
def use_linked_files_as_prior():
def use_linked_files_as_prior(cfg):
"""Instruct DART to use the prior ensemble as input
"""
files = []
......@@ -91,7 +91,7 @@ def use_linked_files_as_prior():
write_txt(files, cfg.dir_dart_run+'/input_list.txt')
def use_filter_output_as_prior():
def use_filter_output_as_prior(cfg):
"""Use the last posterior as input for DART, e.g. to evaluate the analysis in observation space
"""
files = []
......@@ -108,14 +108,14 @@ def use_filter_output_as_prior():
write_txt(files, cfg.dir_dart_run+'/input_list.txt')
def write_list_of_outputfiles():
def write_list_of_outputfiles(cfg):
files = []
for iens in range(1, cfg.ensemble_size+1):
files.append("./filter_restart_d01." + str(iens).zfill(4))
write_txt(files, cfg.dir_dart_run+'/output_list.txt')
def filter():
def filter(cfg):
"""Calls DART ./filter program
Args:
......@@ -146,7 +146,7 @@ def filter():
"Check log file at " + cfg.dir_dart_run + "/log.filter")
def archive_filteroutput(time):
def archive_filteroutput(cfg, time):
"""Archive filter output files (filter_restart, preassim, postassim, output_mean, output_sd)
"""
# archive diagnostics
......@@ -210,7 +210,7 @@ def get_parametrized_error(obscfg, osf_prior) -> np.ndarray: # type: ignore
obscfg.get("sat_channel"))
def set_obserr_assimilate_in_obsseqout(oso, outfile="./obs_seq.out"):
def set_obserr_assimilate_in_obsseqout(cfg, oso, outfile="./obs_seq.out"):
""""Overwrite existing variance values in obs_seq.out files
Args:
......@@ -231,17 +231,28 @@ def set_obserr_assimilate_in_obsseqout(oso, outfile="./obs_seq.out"):
# modify observation error of each kind sequentially
where_oso_iskind = oso.df.kind == kind
if hasattr(obscfg, "error_assimilate"):
if "error_assimilate" in obscfg:
if obscfg["error_assimilate"] == False:
print("error_assimilate is False, will compute dynamic obs-errors")
# get a parametrized error for this observation type
# parametrization is state dependent => need states
use_external_FO = obscfg.get("external_FO", False)
if use_external_FO:
# read prior from obs_seq.out
pass
# modify OE in obs_seq.out
pass
else:
f_osf = cfg.dir_dart_run + "/obs_seq.final"
if not os.path.isfile(f_osf):
evaluate(cfg.time, f_out_pattern=f_osf)
print('computing prior as input for dynamic obs errors')
# generates obs_seq.final
evaluate(cfg, cfg.time, f_out_pattern=f_osf)
# this file was generated by `evaluate()`
# read prior (obs_seq.final)
osf_prior = obsseq.ObsSeq(f_osf)
where_osf_iskind = osf_prior.df.kind == kind
assim_err = get_parametrized_error(
......@@ -256,7 +267,7 @@ def set_obserr_assimilate_in_obsseqout(oso, outfile="./obs_seq.out"):
oso.to_dart(outfile)
def reject_small_FGD(time, oso):
def reject_small_FGD(cfg, time, oso):
"""Quality control of observations
We assume that the prior values have been evaluated and are in `run_DART/obs_seq.final`
......@@ -330,7 +341,7 @@ def reject_small_FGD(time, oso):
print('saved', f_out_dart)
def evaluate(assim_time,
def evaluate(cfg, assim_time,
obs_seq_out=False,
prior_is_filter_output=False,
f_out_pattern: str = './obs_seq.final'):
......@@ -354,10 +365,10 @@ def evaluate(assim_time,
if prior_is_filter_output:
print('using filter_restart files in run_DART as prior')
use_filter_output_as_prior()
use_filter_output_as_prior(cfg)
else:
print('using files linked to `run_DART/<exp>/prior_ens*/wrfout_d01` as prior')
use_linked_files_as_prior()
use_linked_files_as_prior(cfg)
# the observations at which to evaluate the prior at
if obs_seq_out:
......@@ -369,11 +380,11 @@ def evaluate(assim_time,
'/obs_seq.out does not exist')
dart_nml.write_namelist(cfg, just_prior_values=True)
filter()
archive_filter_diagnostics(assim_time, f_out_pattern)
filter(cfg)
archive_filter_diagnostics(cfg, assim_time, f_out_pattern)
def archive_filter_diagnostics(time, f_out_pattern):
def archive_filter_diagnostics(cfg, time, f_out_pattern):
"""Copy the filter output txt to the archive
"""
f_archive = time.strftime(f_out_pattern)
......@@ -384,7 +395,7 @@ def archive_filter_diagnostics(time, f_out_pattern):
print(f_archive, "saved.")
def txtlink_to_prior(time, prior_init_time, prior_path_exp):
def txtlink_to_prior(cfg, time, prior_init_time, prior_path_exp):
"""For reproducibility, write the path of the prior to a txt file
"""
os.makedirs(cfg.dir_archive +
......@@ -394,7 +405,7 @@ def txtlink_to_prior(time, prior_init_time, prior_path_exp):
+ cfg.dir_archive + time.strftime('/%Y-%m-%d_%H:%M/')+'link_to_prior.txt')
def prepare_inflation_2(time, prior_init_time):
def prepare_adapt_inflation(cfg, time, prior_init_time):
"""Prepare inflation files (spatially varying)
Recycles inflation files from previous assimilations
......@@ -407,7 +418,6 @@ def prepare_inflation_2(time, prior_init_time):
dir_priorinf = cfg.dir_archive + \
prior_init_time.strftime(cfg.pattern_init_time)
f_default = cfg.dir_archive.replace('<exp>', cfg.name) + "/input_priorinf_mean.nc"
f_prior = dir_priorinf + \
time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc")
f_new = cfg.dir_dart_run + '/input_priorinf_mean.nc'
......@@ -415,11 +425,13 @@ def prepare_inflation_2(time, prior_init_time):
if os.path.isfile(f_prior):
copy(f_prior, f_new)
print(f_prior, 'copied to', f_new)
else: # no prior inflation file at the first assimilation
else:
# no prior inflation file at the first assimilation
warnings.warn(f_prior + ' does not exist. Using default file instead.')
f_default = cfg.dir_archive+"/../input_priorinf_mean.nc"
copy(f_default, f_new)
f_default = cfg.archive_base + "/input_priorinf_sd.nc"
f_prior = dir_priorinf + \
time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_sd.nc")
f_new = cfg.dir_dart_run + '/input_priorinf_sd.nc'
......@@ -428,11 +440,13 @@ def prepare_inflation_2(time, prior_init_time):
copy(f_prior, f_new)
print(f_prior, 'copied to', f_new)
else:
# no prior inflation file at the first assimilation
warnings.warn(f_prior + ' does not exist. Using default file instead.')
f_default = cfg.dir_archive + "/../input_priorinf_sd.nc"
copy(f_default, f_new)
def archive_inflation_2(time):
def archive_adapt_inflation(cfg, time):
dir_output = cfg.dir_archive + time.strftime(cfg.pattern_init_time)
os.makedirs(dir_output, exist_ok=True)
......@@ -490,7 +504,7 @@ def prepare_run_DART_folder(cfg: Config):
# remove any remains of a previous run
os.makedirs(cfg.dir_dart_run, exist_ok=True)
os.chdir(cfg.dir_dart_run)
os.system("rm -f input.nml obs_seq.in obs_seq.out obs_seq.out-orig obs_seq.final output_* input_*")
os.system("rm -f input.nml obs_seq.in obs_seq.out-orig obs_seq.final")
__link_DART_exe()
......@@ -500,13 +514,13 @@ def prepare_run_DART_folder(cfg: Config):
continue # only need to link RTTOV files once
def get_obsseq_out(time, prior_path_exp, prior_init_time, prior_valid_time, lag=None):
def get_obsseq_out(cfg, time, prior_path_exp, prior_init_time, prior_valid_time):
"""Prepares an obs_seq.out file in the run_DART folder
3 Options:
Options:
1) Use existing obs_seq.out file
2) Use precomputed FO (for cloud fraction)
3) Generate new observations with new observation noise
2) Use precomputed FO (e.g. cloud fraction)
3) Generate new observations from nature run with new noise
Args:
time (datetime): time of assimilation
......@@ -615,38 +629,37 @@ def main(cfg: Config):
nml = dart_nml.write_namelist(cfg)
print(" get observations with specified obs-error")
#lag = dt.timedelta(minutes=15)
oso = get_obsseq_out(time, prior_path_exp, prior_init_time, prior_valid_time)
oso = get_obsseq_out(cfg, time, prior_path_exp, prior_init_time, prior_valid_time)
# prepare for assimilation
prepare_prior_ensemble(time, prior_init_time, prior_valid_time, prior_path_exp)
prepare_DART_grid_template()
prepare_prior_ensemble(cfg, time, prior_init_time, prior_valid_time, prior_path_exp)
prepare_DART_grid_template(cfg)
# additional evaluation of prior (in case some observations are rejected)
if do_reject_smallFGD:
print(" evaluate prior for all observations (incl rejected) ")
evaluate(time, f_out_pattern=cfg.pattern_obs_seq_final+"-evaluate_prior")
evaluate(cfg, time, f_out_pattern=cfg.pattern_obs_seq_final+"-evaluate_prior")
print(" assign observation-errors for assimilation ")
set_obserr_assimilate_in_obsseqout(oso, outfile=cfg.dir_dart_run + "/obs_seq.out")
set_obserr_assimilate_in_obsseqout(cfg, oso, outfile=cfg.dir_dart_run + "/obs_seq.out")
if do_reject_smallFGD:
print(" reject observations? ")
reject_small_FGD(time, oso)
reject_small_FGD(cfg, time, oso)
prior_inflation_type = nml['&filter_nml']['inf_flavor'][0][0]
if prior_inflation_type == '2':
prepare_inflation_2(time, prior_init_time)
if prior_inflation_type != '0':
prepare_adapt_inflation(cfg, time, prior_init_time)
print(" run filter ")
dart_nml.write_namelist(cfg)
filter()
archive_filteroutput(time)
archive_filter_diagnostics(time, cfg.pattern_obs_seq_final)
txtlink_to_prior(time, prior_init_time, prior_path_exp)
filter(cfg)
archive_filteroutput(cfg, time)
archive_filter_diagnostics(cfg, time, cfg.pattern_obs_seq_final)
txtlink_to_prior(cfg, time, prior_init_time, prior_path_exp)
if prior_inflation_type == '2':
archive_inflation_2(time)
if prior_inflation_type != '0':
archive_adapt_inflation(cfg, time)
if 'evaluate_posterior_in_obs_space' in cfg:
if cfg.evaluate_posterior_in_obs_space:
......@@ -658,7 +671,7 @@ def main(cfg: Config):
# evaluate() separately after ./filter is crucial when assimilating cloud variables
# as the obs_seq.final returned by ./filter was produced using un-clamped cloud variables
evaluate(time,
evaluate(cfg, time,
obs_seq_out=f_oso,
prior_is_filter_output=True,
f_out_pattern=cfg.pattern_obs_seq_final+"-evaluate")
......
......@@ -14,7 +14,7 @@ def calc_obserr_WV(channel, Hx_nature, Hx_prior):
"""
if channel not in ['WV62', 'WV73']:
raise NotImplementedError("channel not implemented: " + channel)
debug = False
debug = True
n_obs = len(Hx_nature)
OEs = np.ones(n_obs)
......@@ -29,10 +29,12 @@ def calc_obserr_WV(channel, Hx_nature, Hx_prior):
if channel == 'WV62':
oe_model = _OE_model_harnisch_WV62(mean_CI)
elif channel == 'WV73':
oe_model = _OE_model_harnisch_WV73(mean_CI)
oe_model = _OE_model_new73(mean_CI)
if debug:
print("BT_nature=", bt_y, "=> mean_CI=", mean_CI, "=> OE_assim=", oe_model)
print("bt_y=", bt_y, "bt_x_ens=", bt_x_ens)
print("CIs=", CIs)
print("=> mean_CI=", mean_CI, "=> OE_assim=", oe_model)
OEs[iobs] = oe_model
return OEs
......@@ -44,7 +46,8 @@ def _cloudimpact(channel, bt_mod, bt_obs):
"""
if channel == 'WV73':
biascor_obs = 0
bt_lim = 255 # Kelvin for 7.3 micron WV channel
# bt_lim = 252 # Kelvin for 7.3 micron WV channel
bt_lim = 255.0 # new
elif channel == 'WV62':
biascor_obs = 0
bt_lim = 232.5 # Kelvin for 6.2 micron WV channel
......@@ -71,10 +74,18 @@ def _OE_model_harnisch_WV73(ci):
# Kelvin, fit of Fig 7b, Harnisch 2016
x_ci = [0, 5, 10.5, 13, 16] # average cloud impact [K]
y_oe = [1, 4.5, 10, 12, 13] # adjusted observation error [K]
#y_oe = [1.2, 3, 5, 6, 6.5] # OE for WV62 !!!!
oe_linear = interp1d(x_ci, y_oe, assume_sorted=True)
return oe_linear(ci)
else: # assign highest observation error
return 13.0
def _OE_model_new73(ci):
# based on exp_nat250_WV73_obs6_loc6_oe2_inf3
if ci >= 0 and ci < 25:
x_ci = [0, 5, 10, 15, 25] # average cloud impact [K]
y_oe = [1, 2, 9, 10.5, 7] # adjusted observation error [K]
oe_linear = interp1d(x_ci, y_oe, assume_sorted=True)
return oe_linear(ci)
else:
return 7.0
......@@ -10,10 +10,8 @@ Args:
Returns:
None
"""
import os, sys, shutil
import datetime as dt
from dartwrf.utils import Config, symlink, link_contents, try_remove
import os, sys
from dartwrf.utils import Config, symlink, link_contents
from dartwrf import prepare_namelist
def run(cfg: Config):
......
"""Script to pretty-pring a config file (pickle format)"""
#!/usr/bin/python
"""Script to pretty-pring a config file (pickle format)"
Usage:
python print_config.py <config_file>
"""
import sys, pickle, pprint
f = sys.argv[1]
with open(f, 'rb') as f:
......
......@@ -120,6 +120,7 @@ class Config(object):
self.python = 'python'
self.pattern_obs_seq_out = pattern_obs_seq_out.replace('<archivedir>', self.dir_archive)
self.pattern_obs_seq_final = pattern_obs_seq_final.replace('<archivedir>', self.dir_archive)
self.obs_kind_nrs = dict() # will be filled later
# optional
self.assimilate_existing_obsseq = assimilate_existing_obsseq
......@@ -401,3 +402,37 @@ def obskind_read(dart_srcdir: str) -> dict:
kind_nr = int(data[1].strip())
obskind_nrs[kind_str] = kind_nr
return obskind_nrs
def run_bash_command_in_directory(command, directory):
"""Runs a Bash command in the specified directory.
Args:
command (str): The Bash command to execute.
directory (str): The path to the directory where the command should be run.
Returns:
subprocess.CompletedProcess: An object containing information about the executed command.
"""
try:
result = subprocess.run(
command,
shell=True,
cwd=directory,
check=True, # Raise an exception for non-zero exit codes
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True # Decode stdout and stderr as text
)
print("Command executed successfully.")
print("Stdout:", result.stdout)
if result.stderr:
print("Stderr:", result.stderr)
return result
except subprocess.CalledProcessError as e:
print(f"Error executing command: {e}")
print("Stdout:", e.stdout)
print("Stderr:", e.stderr)
return e
except FileNotFoundError:
print(f"Error: Directory not found: {directory}")
return None
\ No newline at end of file
......@@ -137,7 +137,7 @@ class WorkFlows(object):
cmd = ' '.join([self.python, path_to_script, cfg.f_cfg_current])
id = self.run_job(cmd, cfg, depends_on=[depends_on],
**{"ntasks": "20", "time": "30", "mem": "200G", "ntasks-per-node": "20"})
**{"ntasks": "20", "mem": "200G", "ntasks-per-node": "20"})
return id
......@@ -173,7 +173,7 @@ class WorkFlows(object):
).replace('<wrf_rundir_base>', cfg.dir_wrf_run.replace('<ens>', '$IENS')
).replace('<wrf_modules>', cfg.wrf_modules,
)
id = self.run_job(cmd, cfg, depends_on=[depends_on], time="30", array="1-"+str(cfg.ensemble_size))
id = self.run_job(cmd, cfg, depends_on=[depends_on], array="1-"+str(cfg.ensemble_size))
return id
def run_WRF(self, cfg, depends_on=None):
......@@ -184,10 +184,11 @@ class WorkFlows(object):
end = cfg.WRF_end
# SLURM configuration for WRF
slurm_kwargs = {"array": "1-"+str(cfg.ensemble_size),
"nodes": "1",
slurm_kwargs = {
"partition": "general",
"array": "1-"+str(cfg.ensemble_size),
"ntasks": str(cfg.max_nproc_for_each_ensemble_member),
"ntasks-per-core": "1", "mem": "90G",
"ntasks-per-core": "1", "mem": "100G",
"ntasks-per-node": str(cfg.max_nproc_for_each_ensemble_member),}
# command from template file
......@@ -203,13 +204,11 @@ class WorkFlows(object):
# run WRF ensemble
time_in_simulation_hours = (end-start).total_seconds()/3600
# runtime_wallclock_mins_expected = int(time_in_simulation_hours*15*1.5 + 15)
runtime_wallclock_mins_expected = int(time_in_simulation_hours*30*1.5 + 15)
# usually max 15 min/hour + 50% margin + 15 min buffer
runtime_wallclock_mins_expected = int(time_in_simulation_hours*.5)*60 + 20
# usually max 15 min/hour + 100% margin + 15 min buffer
slurm_kwargs.update({"time": str(runtime_wallclock_mins_expected)})
if runtime_wallclock_mins_expected > 20:
slurm_kwargs.update({"partition": "amd"})
#cfg_update.update({"exclude": "jet03"})
# if runtime_wallclock_mins_expected > 30: # this means jobs will mostly take < 15 mins
slurm_kwargs.update({"constraint": "zen4"})
id = self.run_job(wrf_cmd, cfg, depends_on=[id], **slurm_kwargs)
return id
......@@ -224,8 +223,9 @@ class WorkFlows(object):
cmd = ' '.join([self.python, path_to_script, cfg.f_cfg_current])
id = self.run_job(cmd, cfg, depends_on=[depends_on],
**{"ntasks": "20", "time": "30", "mem": "110G",
"ntasks-per-node": "20", "ntasks-per-core": "1"},
**{"ntasks": str(cfg.max_nproc), "time": "30",
"mem": "110G", "partition": "devel",
"ntasks-per-node": str(cfg.max_nproc), "ntasks-per-core": "1"},
)
return id
......@@ -246,7 +246,7 @@ class WorkFlows(object):
"""
path_to_script = self.dir_dartwrf_run + '/prep_IC_prior.py'
cmd = ' '.join([self.python, path_to_script, cfg.f_cfg_current])
id = self.run_job(cmd, cfg, depends_on=[depends_on], time="10")
id = self.run_job(cmd, cfg, depends_on=[depends_on])
return id
def update_IC_from_DA(self, cfg, depends_on=None):
......@@ -262,7 +262,7 @@ class WorkFlows(object):
path_to_script = self.dir_dartwrf_run + '/update_IC.py'
cmd = ' '.join([self.python, path_to_script, cfg.f_cfg_current])
id = self.run_job(cmd, cfg, depends_on=[depends_on], time="10")
id = self.run_job(cmd, cfg, depends_on=[depends_on])
return id
def run_RTTOV(self, cfg, depends_on=None):
......@@ -275,17 +275,22 @@ class WorkFlows(object):
'$SLURM_ARRAY_TASK_ID'])
id = self.run_job(cmd, cfg, depends_on=[depends_on],
**{"ntasks": "1", "time": "60", "mem": "10G",
"array": "1-"+str(cfg.ensemble_size)})
**{"ntasks": "1", "mem": "15G", "array": "1-"+str(cfg.ensemble_size)})
return id
def verify(self, cfg: Config, depends_on=None):
def verify(self, cfg: Config, init=False,
depends_on=None):
"""Not included in DART-WRF"""
if not init:
cmd = ' '.join(['python /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py',
cfg.name, cfg.verify_against,
'sat', 'wrf', 'has_node', 'np=10', 'mem=250G'])
else:
cmd = ' '.join(['python /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py',
cfg.name, cfg.verify_against, #cfg.time.strftime('%y%m%d_%H:%M'),
cfg.name, cfg.verify_against, 'init='+cfg.time.strftime('%Y%m%d_%H:%M'), 'force',
'sat', 'wrf', 'has_node', 'np=10', 'mem=250G'])
self.run_job(cmd, cfg, depends_on=[depends_on],
**{"time": "03:00:00", "mail-type": "FAIL,END",
**{"time": "04:00:00", "mail-type": "FAIL", "partition": "general",
"ntasks": "10", "ntasks-per-node": "10",
"ntasks-per-core": "1", "mem": "250G"})
......
......@@ -9,22 +9,32 @@ from dartwrf.utils import Config
from config.jet import cluster_defaults
from config.defaults import dart_nml, CF_config, vis
# test multiple assimilation windows (11-12, 12-13, 13-14, )
timedelta_btw_assim = dt.timedelta(minutes=15)
ensemble_size = 40
dart_nml['&filter_nml'].update(num_output_state_members=ensemble_size,
ens_size=ensemble_size)
# which DART version to use?
assimilate_cloudfractions = True
cf1 = dict(kind='CF192km', loc_horiz_km=9999)
cf2 = dict(kind='CF96km', loc_horiz_km=96)
cf3 = dict(kind='CF48km', loc_horiz_km=48)
cf4 = dict(kind='CF24km', loc_horiz_km=24)
cf5 = dict(kind='CF12km', loc_horiz_km=12)
assimilate_cloudfractions = False
# scale_km = 2
# cf = dict(kind='CF{}km'.format(scale_km), loc_horiz_km=scale_km)
assimilate_these_observations = [cf5]
cf1 = dict(kind='CF192km', loc_horiz_km=9999,
)
cf2 = dict(kind='CF96km', loc_horiz_km=96,
)
cf3 = dict(kind='CF48km', loc_horiz_km=48,
)
cf4 = dict(kind='CF24km', loc_horiz_km=24,
)
cf5 = dict(kind='CF12km', loc_horiz_km=12,
)
cf6 = dict(kind='CF6km', loc_horiz_km=6,
)
assimilate_these_observations = [vis,] #cf3, cf4, cf5,]
if assimilate_cloudfractions:
cluster_defaults.update(
......@@ -38,11 +48,11 @@ else:
)
time0 = dt.datetime(2008, 7, 30, 11)
time_end = dt.datetime(2008, 7, 30, 15)
id = None
cfg = Config(name='exp_nat250_VIS_SO12_loc12_oe2_inf4-0.5',
cfg = Config(name='exp_nat250_VIS_obs12_loc12_oe2_inf4-0.5',
model_dx = 2000,
ensemble_size = ensemble_size,
dart_nml = dart_nml,
......@@ -54,9 +64,9 @@ cfg = Config(name='exp_nat250_VIS_SO12_loc12_oe2_inf4-0.5',
CF_config=CF_config,
assimilate_existing_obsseq = False,
#nature_wrfout_pattern = '/jetfs/home/lkugler/data/sim_archive/nat_250m_1600x1600x100/*/1/wrfout_d01_%Y-%m-%d_%H_%M_%S',
nature_wrfout_pattern = '/jetfs/home/lkugler/data/sim_archive/nat_250m_1600x1600x100/*/1/wrfout_d01_%Y-%m-%d_%H_%M_%S',
#geo_em_nature = '/jetfs/home/lkugler/data/sim_archive/geo_em.d01.nc.2km_200x200',
# geo_em_nature = '/jetfs/home/lkugler/data/sim_archive/geo_em.d01.nc.250m_1600x1600',
geo_em_nature = '/jetfs/home/lkugler/data/sim_archive/geo_em.d01.nc.250m_1600x1600',
update_vars = ['U', 'V', 'W', 'THM', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'QSNOW', 'PSFC'],
#input_profile = '/jetfs/home/lkugler/data/sim_archive/nat_250m_1600x1600x100/2008-07-30_08:00/1/input_sounding',
......@@ -69,29 +79,36 @@ w.prepare_WRFrundir(cfg)
# id = w.run_ideal(cfg, depends_on=id)
# assimilate at these times
assim_times = pd.date_range(time0, time0 + dt.timedelta(hours=4), freq=timedelta_btw_assim)
timedelta_btw_assim = dt.timedelta(minutes=15)
assim_times = pd.date_range(time0, time_end, freq=timedelta_btw_assim)
#assim_times = [dt.datetime(2008, 7, 30, 12), dt.datetime(2008, 7, 30, 13), dt.datetime(2008, 7, 30, 14),]
last_assim_time = assim_times[-1]
# loop over assimilations
for i, t in enumerate(assim_times):
if i == 0:
if t == dt.datetime(2008, 7, 30, 11):
prior_init_time = dt.datetime(2008, 7, 30, 8)
else:
prior_init_time = t - dt.timedelta(minutes=15)
# which scales?
# if t.minute == 0:
# CF_config.update(scales_km=(48, 24, 12),)
# else:
# CF_config.update(scales_km=(12,))
#cfg.update(CF_config=CF_config)
cfg.update(time = t,
prior_init_time = prior_init_time,
if i == 0 and t == dt.datetime(2008, 7, 30, 11):
cfg.update(
time = t,
prior_init_time = dt.datetime(2008, 7, 30, 8),
prior_valid_time = t,
prior_path_exp = '/jetfs/home/lkugler/data/sim_archive/exp_nat250m_noDA/',)
else:
cfg.update(time = t,
prior_init_time = assim_times[i-1],
cfg.update(
time = t,
prior_init_time = t - dt.timedelta(minutes=15),
prior_valid_time = t,
prior_path_exp = cfg.dir_archive,)
id = w.assimilate(cfg, depends_on=id)
# 1) Set posterior = prior
......@@ -119,7 +136,7 @@ for i, t in enumerate(assim_times):
if t.minute == 0 and i != 0:
# full hour but not first one
# make long forecasts without restart files
timedelta_integrate = dt.timedelta(hours=4)
timedelta_integrate = dt.timedelta(hours=.25)
restart_interval = 9999
cfg.update( WRF_start=t,
......@@ -133,4 +150,7 @@ for i, t in enumerate(assim_times):
id = w.run_WRF(cfg, depends_on=id)
id = w.run_RTTOV(cfg, depends_on=id)
w.verify(cfg, init=True, depends_on=id)
# verify the rest
w.verify(cfg, depends_on=id)
\ No newline at end of file
......@@ -31,7 +31,7 @@
max_step_increase_pct = 5, 51, 51,
starting_time_step = 4,
max_time_step = 16,
min_time_step = 4,
min_time_step = 3,
time_step = 8,
time_step_fract_num = 0,
time_step_fract_den = 1,
......
from timer import Timer
with Timer('imports'):
import datetime as dt
import pandas as pd
from dartwrf.workflows import WorkFlows
from dartwrf.utils import Config, run_bash_command_in_directory
# import default config for jet
from config.jet_1node import cluster_defaults
from config.defaults import dart_nml
ensemble_size = 5
dart_nml['&filter_nml'].update(num_output_state_members=ensemble_size,
ens_size=ensemble_size)
t = dict(var_name='Temperature', unit='[K]',
kind='RADIOSONDE_TEMPERATURE',
# n_obs=22500,
n_obs=1, obs_locations=[(45., 0.)],
error_generate=0.2, error_assimilate=0.2,
heights=range(1000, 17001, 2000),
loc_horiz_km=1000, loc_vert_km=4)
assimilate_these_observations = [t,] #cf3, cf4, cf5,]
time0 = dt.datetime(2008, 7, 30, 11)
time_end = dt.datetime(2008, 7, 30, 11)
id = None
with Timer('Config()'):
cfg = Config(name='test-1node',
model_dx = 2000,
ensemble_size = ensemble_size,
dart_nml = dart_nml,
geo_em_forecast = '/jetfs/home/lkugler/data/sim_archive/geo_em.d01.nc.2km_200x200',
time = time0,
assimilate_these_observations = assimilate_these_observations,
assimilate_existing_obsseq = False,
nature_wrfout_pattern = '/jetfs/home/lkugler/data/sim_archive/exp_v1.18_P1_nature+1/*/1/wrfout_d01_%Y-%m-%d_%H:%M:%S',
geo_em_nature = '/jetfs/home/lkugler/data/sim_archive/geo_em.d01.nc.2km_200x200',
update_vars = ['U', 'V', 'W', 'THM', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'QSNOW', 'PSFC'],
#input_profile = '/jetfs/home/lkugler/data/sim_archive/nat_250m_1600x1600x100/2008-07-30_08:00/1/input_sounding',
verify_against = 'nat_250m_blockavg2km',
**cluster_defaults)
with Timer('prepare_WRFrundir'):
w = WorkFlows(cfg)
import dartwrf.prepare_wrfrundir as prepwrf
prepwrf.run(cfg)
# w.prepare_WRFrundir(cfg)
# id = w.run_ideal(cfg, depends_on=id)
# assimilate at these times
timedelta_btw_assim = dt.timedelta(minutes=15)
assim_times = pd.date_range(time0, time_end, freq=timedelta_btw_assim)
#assim_times = [dt.datetime(2008, 7, 30, 12), dt.datetime(2008, 7, 30, 13), dt.datetime(2008, 7, 30, 14),]
last_assim_time = assim_times[-1]
# loop over assimilations
for i, t in enumerate(assim_times):
if i == 0 and t == dt.datetime(2008, 7, 30, 11):
cfg.update(
time = t,
prior_init_time = dt.datetime(2008, 7, 30, 8),
prior_valid_time = t,
prior_path_exp = '/jetfs/home/lkugler/data/sim_archive/exp_nat250m_noDA/')
else:
cfg.update(
time = t,
prior_init_time = t - dt.timedelta(minutes=15),
prior_valid_time = t,
prior_path_exp = cfg.dir_archive,)
with Timer('assimilate'):
#id = w.assimilate(cfg, depends_on=id)
import dartwrf.assimilate as da
da.main(cfg)
with Timer('prepare_IC_from_prior'):
# 1) Set posterior = prior
#id = w.prepare_IC_from_prior(cfg, depends_on=id)
import dartwrf.prep_IC_prior as prep
prep.main(cfg)
with Timer('update_IC_from_DA'):
# 2) Update posterior += updates from assimilation
#id = w.update_IC_from_DA(cfg, depends_on=id)
import dartwrf.update_IC as upd
upd.update_initials_in_WRF_rundir(cfg)
# integrate until next assimilation
timedelta_integrate = dt.timedelta(minutes=15)
restart_interval = timedelta_btw_assim.total_seconds()/60 # in minutes
cfg.update( WRF_start=t,
WRF_end=t+timedelta_integrate,
restart=True,
restart_interval=restart_interval,
hist_interval_s=300,
)
import dartwrf.prepare_namelist as prepnl
prepnl.run(cfg)
# 3) Run WRF ensemble
#id = w.run_WRF(cfg, depends_on=id)
with Timer('run_WRF'):
# Example usage:
cmd = cfg.wrf_modules +'; '
for iens in range(1, cfg.ensemble_size+1):
dir_wrf_run = cfg.dir_wrf_run.replace('<exp>', cfg.name).replace('<ens>', str(iens))
cmd += 'cd ' + dir_wrf_run + '; '
cmd += 'echo "'+dir_wrf_run+'"; '
cmd += 'mpirun -np 4 ./wrf.exe & '
cmd += 'wait; '
cmd += 'echo "WRF run completed."'
# Run the command in the specified directory
run_bash_command_in_directory(cmd, dir_wrf_run)
timer.py 0 → 100644
import time
class Timer:
"""
A context manager for measuring the execution time of a code block.
Prints a message before and after the timed block.
"""
def __init__(self, message="Code block"):
"""
Initializes the Timer with an optional message.
Args:
message (str, optional): The message to print before and after the timed block.
Defaults to "Code block".
"""
self.message = message
def __enter__(self):
"""
Starts the timer and prints the initial message.
"""
print(f"{self.message} started.")
self.start_time = time.perf_counter()
return self # Returns self, so you can access the timer object if needed
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Stops the timer, calculates the elapsed time, and prints the final message
along with the execution time.
Args:
exc_type: The type of exception that occurred, if any.
exc_val: The exception instance, if any.
exc_tb: A traceback object, if an exception occurred.
"""
self.end_time = time.perf_counter()
self.elapsed_time = self.end_time - self.start_time
print(f"{self.message} finished in {self.elapsed_time:.4f} seconds.")
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment