From fe0076781699d3fbe9d40a337524892e5b9ea9cc Mon Sep 17 00:00:00 2001 From: Lukas Kugler <lukas.kugler@univie.ac.at> Date: Thu, 27 Feb 2025 12:27:24 +0100 Subject: [PATCH] docs, configs --- dartwrf/workflows.py | 95 +++++++++++++++++++++----------------------- 1 file changed, 46 insertions(+), 49 deletions(-) diff --git a/dartwrf/workflows.py b/dartwrf/workflows.py index 029233a..63d99ae 100644 --- a/dartwrf/workflows.py +++ b/dartwrf/workflows.py @@ -18,22 +18,14 @@ from dartwrf.utils import Config class WorkFlows(object): def __init__(self, cfg: Config): - """Set up the experiment folder in `archivedir`. - - 1. Copy the selected config files - 2. Import configurations - 3. Prepare obskind.py file (dictionary of observation types) - 4. Copy the scripts and configs to `archivedir` - 5. Set python path - 6. Set log path and slurm scripts path - - Args: - exp_config (str): Path to exp config file - server_config (str): Path to the cluster config file + """Create the archive directory, copy scripts to archive Attributes: - cluster (obj): cluster configuration as defined in server_config file - exp (obj): experiment configuration as defined in exp_config file + use_slurm: if True, run jobs with SLURM + dir_log: Logging directory for slurm + dir_slurm: Scripts directory for slurm + dir_dartwrf_run: as defined in Config + python: python command with prepended PYTHONPATH """ print('------------------------------------------------------') print('>>> Experiment name: "'+cfg.name+'"') @@ -42,7 +34,7 @@ class WorkFlows(object): # Copy scripts and config files to `self.archivedir` folder dirs_exist_ok = False if os.path.exists(cfg.dir_archive+'/DART-WRF/'): - if input('The experiment name already exists! Overwrite existing experiment? (Y/n) ') in ['Y', 'y']: + if input('The experiment name already exists! Overwrite existing scripts? (Y/n) ') in ['Y', 'y']: dirs_exist_ok = True shutil.copytree(cfg.dir_dartwrf_dev, @@ -68,10 +60,9 @@ class WorkFlows(object): print('------------------------------------------------------') # use this python path - run_wrf_from_this_folder = cfg.dir_dartwrf_run+'/../' + pythonpath_archive = cfg.dir_dartwrf_run+'/../' self.dir_dartwrf_run = cfg.dir_dartwrf_run - self.python = 'export PYTHONPATH=' +run_wrf_from_this_folder+ '; '+cfg.python - self.cfg = cfg + self.python = 'export PYTHONPATH=' +pythonpath_archive+ '; '+cfg.python def run_job(self, cmd, cfg, depends_on=None, **kwargs): """Run scripts in a shell @@ -92,7 +83,10 @@ class WorkFlows(object): from slurmpy import Slurm # name of calling function path_to_script = inspect.stack()[1].function - jobname = path_to_script.split('/')[-1]+'-'+cfg.f_cfg_current.split('/')[-1].replace('.py','') + if 'time' in cfg: + jobname = path_to_script.split('/')[-1]+'-'+cfg.time.strftime('%H:%M') + else: + jobname = path_to_script.split('/')[-1]+'-'+cfg.name print('> SLURM job:', jobname) slurm_kwargs = cfg.slurm_kwargs.copy() @@ -175,12 +169,11 @@ class WorkFlows(object): Returns: str: job ID of the submitted job """ - cmd = script_to_str(self.cfg.WRF_ideal_template - ).replace('<expname>', cfg.name - ).replace('<wrf_rundir_base>', cfg.dir_wrf_run + cmd = script_to_str(cfg.WRF_ideal_template + ).replace('<wrf_rundir_base>', cfg.dir_wrf_run.replace('<ens>', '$IENS') ).replace('<wrf_modules>', cfg.wrf_modules, ) - id = self.run_job(cmd, cfg, depends_on=[depends_on], time="30") + id = self.run_job(cmd, cfg, depends_on=[depends_on], time="30", array="1-"+str(cfg.ensemble_size)) return id def run_WRF(self, cfg, depends_on=None): @@ -191,31 +184,32 @@ class WorkFlows(object): end = cfg.WRF_end # SLURM configuration for WRF - slurm_kwargs = {"array": "1-"+str(self.cfg.ensemble_size), + slurm_kwargs = {"array": "1-"+str(cfg.ensemble_size), "nodes": "1", - "ntasks": str(self.cfg.max_nproc_for_each_ensemble_member), - "ntasks-per-core": "1", "mem": "90G", } + "ntasks": str(cfg.max_nproc_for_each_ensemble_member), + "ntasks-per-core": "1", "mem": "90G", + "ntasks-per-node": str(cfg.max_nproc_for_each_ensemble_member),} # command from template file - wrf_cmd = script_to_str(self.cfg.WRF_exe_template - ).replace('<dir_wrf_run>', self.cfg.dir_wrf_run.replace('<ens>', '$IENS') - ).replace('<wrf_modules>', self.cfg.wrf_modules, - ).replace('<WRF_number_of_processors>', str(self.cfg.max_nproc_for_each_ensemble_member), + wrf_cmd = script_to_str(cfg.WRF_exe_template + ).replace('<dir_wrf_run>', cfg.dir_wrf_run.replace('<ens>', '$IENS') + ).replace('<wrf_modules>', cfg.wrf_modules, + ).replace('<WRF_number_of_processors>', str(cfg.max_nproc_for_each_ensemble_member), ) # prepare namelist path_to_script = self.dir_dartwrf_run + '/prepare_namelist.py' - cmd = ' '.join([self.python, path_to_script, self.cfg.f_cfg_current]) + cmd = ' '.join([self.python, path_to_script, cfg.f_cfg_current]) id = self.run_job(cmd, cfg, depends_on=[depends_on]) # run WRF ensemble time_in_simulation_hours = (end-start).total_seconds()/3600 - runtime_wallclock_mins_expected = int( - time_in_simulation_hours*30 + 10) # usually <15 min/hour + # runtime_wallclock_mins_expected = int(time_in_simulation_hours*15*1.5 + 15) + runtime_wallclock_mins_expected = int(time_in_simulation_hours*30*1.5 + 15) + # usually max 15 min/hour + 50% margin + 15 min buffer slurm_kwargs.update({"time": str(runtime_wallclock_mins_expected)}) - - if runtime_wallclock_mins_expected > 25: + if runtime_wallclock_mins_expected > 20: slurm_kwargs.update({"partition": "amd"}) - # #cfg_update.update({"exclude": "jet03"}) + #cfg_update.update({"exclude": "jet03"}) id = self.run_job(wrf_cmd, cfg, depends_on=[id], **slurm_kwargs) return id @@ -271,18 +265,30 @@ class WorkFlows(object): id = self.run_job(cmd, cfg, depends_on=[depends_on], time="10") return id - def create_satimages(self, cfg, depends_on=None): + def run_RTTOV(self, cfg, depends_on=None): """Run a job array, one job per ensemble member, to create satellite images""" prefix = 'module purge; module load rttov/v13.2-gcc-8.5.0; python' path_to_script = '~/RTTOV-WRF/run_init.py' - cmd = ' '.join([prefix, path_to_script, cfg.f_cfg_current]) + cmd = ' '.join([prefix, path_to_script, + cfg.dir_archive +cfg.time.strftime('/%Y-%m-%d_%H:%M/'), + '$SLURM_ARRAY_TASK_ID']) id = self.run_job(cmd, cfg, depends_on=[depends_on], **{"ntasks": "1", "time": "60", "mem": "10G", - "array": "1-"+str(self.cfg.ensemble_size)}) + "array": "1-"+str(cfg.ensemble_size)}) return id + def verify(self, cfg: Config, depends_on=None): + """Not included in DART-WRF""" + cmd = ' '.join(['python /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py', + cfg.name, cfg.verify_against, #cfg.time.strftime('%y%m%d_%H:%M'), + 'sat', 'wrf', 'has_node', 'np=10', 'mem=250G']) + self.run_job(cmd, cfg, depends_on=[depends_on], + **{"time": "03:00:00", "mail-type": "FAIL,END", + "ntasks": "10", "ntasks-per-node": "10", + "ntasks-per-core": "1", "mem": "250G"}) + def evaluate_obs_posterior_after_analysis(self, cfg, depends_on=None): path_to_script = self.dir_dartwrf_run + '/evaluate_obs_space.py' @@ -297,13 +303,4 @@ class WorkFlows(object): # id = self.run_job(cmd, 'linpost'+self.cfg.name, cfg_update={"ntasks": "16", "mem": "80G", "ntasks-per-node": "16", "ntasks-per-core": "2", # "time": "15", "mail-type": "FAIL"}, # depends_on=[id]) - return id - - def verify(self, cfg, depends_on=None): - """Not included in DART-WRF""" - cmd = ' '.join(['python /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py', - cfg.f_cfg_current]) - id = self.run_job(cmd, cfg, depends_on=[depends_on], - **{"time": "210", "mail-type": "FAIL,END", - "ntasks": "10", "ntasks-per-node": "10", - "ntasks-per-core": "1", "mem": "250G"}) + return id \ No newline at end of file -- GitLab