diff --git a/dartwrf/workflows.py b/dartwrf/workflows.py index b4f82e420b562a28a6bfacf87739b2b0d9c726bc..f534bf5b30b77586ce1a2bfa730e70baf2f23d22 100644 --- a/dartwrf/workflows.py +++ b/dartwrf/workflows.py @@ -12,15 +12,6 @@ import importlib from dartwrf.utils import script_to_str from config.cfg import exp -def dict_to_py(d, outfile): - with open(outfile, 'w') as f: - txt = '# this file is autogenerated \nobs_kind_nrs = {' - for k,v in d.items(): - txt += '"'+k+'": '+str(v)+', \n' - txt += '}' - f.write(txt) - - class WorkFlows(object): def __init__(self, exp_config='cfg.py', server_config='server.py'): """Set up the experiment folder in `archivedir`. @@ -53,7 +44,7 @@ class WorkFlows(object): # file needs to exist within package so sphinx can read it def _obskind_read(): """Read dictionary of observation types + ID numbers ("kind") - from DART f90 script + from DART f90 script and return it as python dictionary """ definitionfile = self.cluster.dart_srcdir+'/../../../assimilation_code/modules/observations/obs_kind_mod.f90' with open(definitionfile, 'r') as f: @@ -77,7 +68,25 @@ class WorkFlows(object): kind_nr = int(data[1].strip()) obskind_nrs[kind_str] = kind_nr return obskind_nrs - dict_to_py(_obskind_read(), self.cluster.scriptsdir+'/../config/obskind.py') + + def _dict_to_py(d, outfile): + """Write a python dictionary to a .py file + + Args: + d (dict): dictionary to write + outfile (str): path to output file + + Returns: + None + """ + with open(outfile, 'w') as f: + txt = '# this file is autogenerated \nobs_kind_nrs = {' + for k,v in d.items(): + txt += '"'+k+'": '+str(v)+', \n' + txt += '}' + f.write(txt) + + _dict_to_py(_obskind_read(), self.cluster.scriptsdir+'/../config/obskind.py') # Copy scripts to self.cluster.archivedir folder os.makedirs(self.cluster.archivedir, exist_ok=True) @@ -101,15 +110,32 @@ class WorkFlows(object): print('--------------------------------------------') def prepare_WRFrundir(self, init_time): - """Create WRF/run directories and wrfinput files + """Prepare WRF run directories for all ensemble members + + Note: + Optionally copy input sounding profiles to WRF run directories + if defined in cfg.py + + Args: + init_time (datetime): WRF initialization time + + Returns: + None """ cmd = self.cluster.python+' '+self.cluster.scripts_rundir+'/prepare_wrfrundir.py '+init_time.strftime('%Y-%m-%d_%H:%M') print(cmd) os.system(cmd) def run_ideal(self, depends_on=None): - """Run ideal for every ensemble member""" - cmd = """# run ideal.exe in parallel, then add geodata + """Run WRF's ideal.exe for every ensemble member + + Args: + depends_on (str, optional): job ID of a previous job after which to run this job + + Returns: + str: job ID of the submitted job + """ + cmd = """# run ideal.exe in parallel export SLURM_STEP_GRES=none for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) do @@ -143,11 +169,19 @@ class WorkFlows(object): return id def run_ENS(self, begin, end, depends_on=None, first_minute=True, - input_is_restart=True, restart_path=False, output_restart_interval=720): - """Run forecast for 1 minute, save output. - Then run whole timespan with 5 minutes interval. + input_is_restart=True, output_restart_interval=720): + """Run the forecast ensemble - if input_is_restart: # start WRF in restart mode + Args: + begin (datetime): start time of the forecast + end (datetime): end time of the forecast + depends_on (str, optional): job ID of a previous job after which to run this job + first_minute (bool, optional): if True, run the first minute of the forecast + input_is_restart (bool, optional): if True, start WRF from WRFrst file (restart mode) + output_restart_interval (int, optional): interval in minutes between output of WRFrst files + + Returns: + str: job ID of the submitted job """ restart_flag = '.false.' if not input_is_restart else '.true.' @@ -199,7 +233,7 @@ class WorkFlows(object): runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9.5) # usually below 9 min/hour id = self.cluster.run_job(cmd, "WRF", cfg_update={"array": "1-"+str(self.cluster.size_jobarray), "ntasks": "10", "nodes": "1", - "time": str(runtime_wallclock_mins_expected), "mem": "100G"}, depends_on=[id]) + "time": str(runtime_wallclock_mins_expected), "mem": "30G"}, depends_on=[id]) return id @@ -211,6 +245,9 @@ class WorkFlows(object): assim_time (dt.datetime): timestamp of prior wrfout files prior_init_time (dt.datetime): timestamp to find the directory where the prior wrfout files are prior_path_exp (str): use this directory to get prior state (i.e. self.cluster.archivedir) + + Returns: + str: job ID of the submitted job """ if not os.path.exists(prior_path_exp): raise IOError('prior_path_exp does not exist: '+prior_path_exp)