diff --git a/analysis_only.py b/analysis_only.py new file mode 100755 index 0000000000000000000000000000000000000000..fa735f959e3e1634741ecd3ccca5a6cf0196fe6c --- /dev/null +++ b/analysis_only.py @@ -0,0 +1,37 @@ +#!/usr/bin/python3 +""" +running the forecast model without assimilation +""" +import os, sys, shutil +import datetime as dt + +from dartwrf import utils +from config.cfg import exp, cluster + +log_dir = cluster.archivedir+'/logs/' +slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/' +print('logging to', log_dir) +print('scripts, which are submitted to SLURM:', slurm_scripts_dir) + + +############################### +utils.backup_scripts() + + +prior_path_exp = '/mnt/jetfs/scratch/lkugler/data/sim_archive/exp_v1.19_P3_wbub7_noDA' +prior_init_time = dt.datetime(2008,7,30,12) +prior_valid_time = dt.datetime(2008,7,30,12,30) +assim_time = prior_valid_time + + +os.system( + cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py ' + +assim_time.strftime('%Y-%m-%d_%H:%M ') + +prior_init_time.strftime('%Y-%m-%d_%H:%M ') + +prior_valid_time.strftime('%Y-%m-%d_%H:%M ') + +prior_path_exp + ) + + + +# id_sat = create_satimages(time, depends_on=id) \ No newline at end of file diff --git a/config/cfg.py b/config/cfg.py index c7272f426d3ac540e6aa40c87897c64df753e97e..af998f1e96fd83c0b475dc5a944f7baacdd05f10 100755 --- a/config/cfg.py +++ b/config/cfg.py @@ -1,15 +1,11 @@ -from config import clusters +from dartwrf import utils +from config import clusters # from . = problem in archivedir cluster = clusters.jet # change cluster configuration here -class ExperimentConfiguration(object): - def __init__(self): - pass - -exp = ExperimentConfiguration() -exp.expname = "exp_v1.22_P2_rr_WV62_obs10_loc20_oe2" +exp = utils.ExperimentConfiguration() +exp.expname = "test_srvx1" #"exp_v1.22_P3_wbub7_WV62_obs10_loc20_oe1" exp.model_dx = 2000 exp.n_ens = 40 -exp.size_jobarray = 40 exp.filter_kind = 1 exp.inflation = True @@ -99,7 +95,7 @@ psfc = dict(plotname='SYNOP Pressure', plotunits='[Pa]', cov_loc_radius_km=32) -exp.observations = [wv62] +exp.observations = [t] exp.update_vars = ['U', 'V', 'W', 'THM', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'PSFC'] #exp.update_vars = ['U', 'V', 'W', 'T', 'PH', 'MU', 'QVAPOR', 'PSFC'] diff --git a/config/clusters.py b/config/clusters.py index 0c7981f08b19d50e4e736b67d8d0d0ca92a8bc6b..d2ae6f8c9137f43b28b329b93bc2f8151b1d46ca 100755 --- a/config/clusters.py +++ b/config/clusters.py @@ -1,5 +1,6 @@ import os, sys import datetime as dt +from dartwrf import utils """Configuration name docs @@ -41,29 +42,13 @@ slurm_cfg python dictionary, containing options of SLURM """ -class ClusterConfig(object): - """Helper class, contains useful abbreviations to use in code later on""" - def __init__(self): - pass - @property - def archivedir(self): - return self.archive_base+'/'+self.expname - def wrf_rundir(self, iens): - return self.wrf_rundir_base+'/'+self.expname+'/'+str(iens) - - @property - def scripts_rundir(self): - return self.archivedir+'/DART-WRF/' - - @property - def dartrundir(self): - return self.dart_rundir_base+'/'+self.expname+'/' - - -vsc = ClusterConfig() +vsc = utils.ClusterConfig() vsc.name = 'vsc' +vsc.max_nproc = 20 +vsc.size_jobarray = 10 # 10 jobs with each 4 WRF processes per node +vsc.use_slurm = True # binaries vsc.python = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/python' @@ -92,8 +77,11 @@ vsc.slurm_cfg = {"account": "p71386", "partition": "skylake_0384", "qos": "p7138 "nodes": "1", "ntasks": "1", "ntasks-per-node": "48", "ntasks-per-core": "1", "mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"} -jet = ClusterConfig() +jet = utils.ClusterConfig() jet.name = 'jet' +jet.max_nproc = 12 +jet.use_slurm = True +jet.size_jobarray = 40 # binaries jet.python = '/jetfs/home/lkugler/miniconda3/envs/DART/bin/python' @@ -123,3 +111,38 @@ jet.run_WRF = '/jetfs/home/lkugler/DART-WRF/dartwrf/run_ens.jet.sh' jet.slurm_cfg = {"account": "lkugler", "partition": "compute", #"nodelist": "jet07", "ntasks": "1", "ntasks-per-core": "1", "mem": "50G", "mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"} + + +srvx1 = utils.ClusterConfig() +srvx1.name = 'srvx1' +srvx1.max_nproc = 6 +srvx1.size_jobarray = 40 +srvx1.use_slurm = False + +# binaries +srvx1.python = '/mnt/jetfs/home/lkugler/miniconda3/envs/DART/bin/python' +srvx1.python_verif = '/jetfs/home/lkugler/miniconda3/envs/enstools/bin/python' +srvx1.ncks = '/jetfs/spack/opt/spack/linux-rhel8-skylake_avx512/intel-20.0.2/nco-4.9.3-dhlqiyog7howjmaleyfhm6lkt7ra37xf/bin/ncks' +srvx1.ideal = '/jetfs/home/lkugler/bin/ideal-v4.3_v1.22.exe' +srvx1.wrfexe = '/jetfs/home/lkugler/bin/wrf-v4.3_v1.22.exe' +srvx1.container = '' + +# paths for data output +srvx1.wrf_rundir_base = '/jetfs/home/lkugler/data/run_WRF/' # path for temporary files +srvx1.dart_rundir_base = '/jetfs/home/lkugler/data/run_DART/' # path for temporary files +srvx1.archive_base = '/mnt/jetfs/scratch/lkugler/data/sim_archive/' + +# paths used as input +srvx1.srcdir = '/users/staff/lkugler/AdvDA23/DART/WRF-4.3/run' +srvx1.dart_srcdir = '/users/staff/lkugler/AdvDA23/DART/models/wrf/work' +srvx1.rttov_srcdir = '/users/staff/lkugler/AdvDA23/RTTOV13/rtcoef_rttov13/' +srvx1.scriptsdir = '/jetfs/home/lkugler/DART-WRF/dartwrf/' +srvx1.geo_em = '/mnt/jetfs/scratch/lkugler/data/geo_em.d01.nc' + +# templates/run scripts +srvx1.namelist = srvx1.scriptsdir+'/../templates/namelist.input' +srvx1.run_WRF = srvx1.scriptsdir+'/run_ens.jet.sh' + +srvx1.slurm_cfg = {"account": "lkugler", "partition": "compute", + "ntasks": "1", "ntasks-per-core": "1", "mem": "50G", + "mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"} diff --git a/scheduler.py b/cycled_exp.py similarity index 79% rename from scheduler.py rename to cycled_exp.py index 31c4bddb567df958037bd27e86fea1cbdb63a6b6..4e30e9930dcba29d7d50f59dbb9857a7aba17ef1 100755 --- a/scheduler.py +++ b/cycled_exp.py @@ -5,58 +5,15 @@ submitting jobs into SLURM queue """ import os, sys, shutil, glob, warnings import datetime as dt -from slurmpy import Slurm +from dartwrf.utils import script_to_str, symlink, copy, create_job from config.cfg import exp, cluster -from dartwrf.utils import script_to_str, symlink, copy log_dir = cluster.archivedir+'/logs/' slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/' print('logging to', log_dir) print('scripts, which are submitted to SLURM:', slurm_scripts_dir) -class Shellslurm(): - def __init__(self, *args, **kwargs): - pass - def run(self, *args, **kwargs): - print(args[0]) - os.system(args[0]) - -def my_Slurm(*args, cfg_update=dict(), **kwargs): - """Shortcut to slurmpy's class; keep certain default kwargs - and only update some with kwarg `cfg_update` - see https://github.com/brentp/slurmpy - """ - debug = False # run without SLURM, locally on headnode - if debug: - return Shellslurm(*args) - return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update), - log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs) - - - -def backup_scripts(): - os.makedirs(cluster.archivedir, exist_ok=True) - - try: - shutil.copytree(cluster.scriptsdir, cluster.scripts_rundir) - except FileExistsError: - pass - except: - raise - try: - copy(os.path.basename(__file__), cluster.scripts_rundir+'/') - except Exception as e: - warnings.warn(str(e)) - -def prepare_WRFrundir(init_time): - """Create WRF/run directories and wrfinput files - """ - cmd = cluster.python+' '+cluster.scripts_rundir+'/prepare_wrfrundir.py '+init_time.strftime('%Y-%m-%d_%H:%M') - print(cmd) - os.system(cmd) - # return id - def run_ideal(depends_on=None): """Run ideal for every ensemble member""" cmd = """# run ideal.exe in parallel, then add geodata @@ -75,7 +32,7 @@ do mv $rundir/rsl.out.0000 $rundir/rsl.out.input done """ - s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens), + s = create_job("ideal", cfg_update={"ntasks": str(exp.n_ens), "time": "10", "mem": "100G"}) id = s.run(cmd, depends_on=[depends_on]) return id @@ -84,7 +41,7 @@ def wrfinput_insert_wbubble(perturb=True, depends_on=None): """Given that directories with wrfinput files exist, update these wrfinput files with warm bubbles """ - s = my_Slurm("ins_wbubble", cfg_update={"time": "5"}) + s = create_job("ins_wbubble", cfg_update={"time": "5"}) pstr = ' ' if perturb: pstr = ' perturb' @@ -107,7 +64,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True, # hist_interval = 1 # radt = 1 # calc CFRAC also in first minute # begin_plus1 = begin+dt.timedelta(minutes=1) - # s = my_Slurm("preWRF1", cfg_update=dict(time="2")) + # s = create_job("preWRF1", cfg_update=dict(time="2")) # args = [cluster.python, cluster.scripts_rundir+'/prepare_namelist.py', # begin.strftime('%Y-%m-%d_%H:%M'), # begin_plus1.strftime('%Y-%m-%d_%H:%M'), @@ -116,13 +73,13 @@ def run_ENS(begin, end, depends_on=None, first_minute=True, # '--restart='+restart_flag,] # id = s.run(' '.join(args), depends_on=[id]) - # s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), + # s = create_job("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), # "time": "2", "mem-per-cpu": "2G"}) # cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname) # id = s.run(cmd, depends_on=[id]) # # apply forward operator (DART filter without assimilation) - # s = my_Slurm("fwOP-1m", cfg_update=dict(time="10", ntasks=48)) + # s = create_job("fwOP-1m", cfg_update=dict(time="10", ntasks=48)) # id = s.run(cluster.python+' '+cluster.scripts_rundir+'/apply_obs_op_dart.py ' # + begin.strftime('%Y-%m-%d_%H:%M')+' ' # + begin_plus1.strftime('%Y-%m-%d_%H:%M'), @@ -141,12 +98,12 @@ def run_ENS(begin, end, depends_on=None, first_minute=True, if output_restart_interval: args.append('--restart_interval='+str(int(float(output_restart_interval)))) - s = my_Slurm("preWRF", cfg_update=dict(time="2")) + s = create_job("preWRF", cfg_update=dict(time="2")) id = s.run(' '.join(args), depends_on=[id]) time_in_simulation_hours = (end-begin).total_seconds()/3600 runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9.5) # usually below 9 min/hour - s = my_Slurm("WRF", cfg_update={"array": "1-"+str(exp.n_nodes), "ntasks": "10", "nodes": "1", + s = create_job("WRF", cfg_update={"array": "1-"+str(cluster.size_jobarray), "ntasks": "10", "nodes": "1", "time": str(runtime_wallclock_mins_expected), "mem": "140G"}) cmd = script_to_str(cluster.run_WRF).replace('<exp.expname>', exp.expname ).replace('<cluster.wrf_rundir_base>', cluster.wrf_rundir_base) @@ -166,7 +123,7 @@ def assimilate(assim_time, prior_init_time, prior_valid_time, prior_path_exp, if not os.path.exists(prior_path_exp): raise IOError('prior_path_exp does not exist: '+prior_path_exp) - id = my_Slurm("Assim", cfg_update={"ntasks": "12", "time": "60", + id = create_job("Assim", cfg_update={"ntasks": "12", "time": "60", "mem": "200G", "ntasks-per-node": "12", "ntasks-per-core": "2"} ).run(cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py ' +assim_time.strftime('%Y-%m-%d_%H:%M ') @@ -183,7 +140,7 @@ def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, new else: tnew = '' - id = my_Slurm("IC-prior", cfg_update=dict(time="8") + id = create_job("IC-prior", cfg_update=dict(time="8") ).run(cluster.python+' '+cluster.scripts_rundir+'/prep_IC_prior.py ' +prior_path_exp +prior_init_time.strftime(' %Y-%m-%d_%H:%M') @@ -193,14 +150,14 @@ def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, new def update_IC_from_DA(assim_time, depends_on=None): - id = my_Slurm("IC-update", cfg_update=dict(time="8") + id = create_job("IC-update", cfg_update=dict(time="8") ).run(cluster.python+' '+cluster.scripts_rundir+'/update_IC.py ' +assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on]) return id def create_satimages(init_time, depends_on=None): - s = my_Slurm("RTTOV", cfg_update={"ntasks": "12", "time": "80", "mem": "200G"}) + s = create_job("RTTOV", cfg_update={"ntasks": "12", "time": "80", "mem": "200G"}) id = s.run(cluster.python_verif+' ~/RTTOV-WRF/run_init.py '+cluster.archivedir +init_time.strftime('/%Y-%m-%d_%H:%M/'), depends_on=[depends_on]) @@ -209,31 +166,31 @@ def create_satimages(init_time, depends_on=None): def mailme(depends_on=None): if depends_on: - s = my_Slurm("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"}) + s = create_job("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"}) s.run('sleep 1', depends_on=[depends_on]) def gen_obsseq(depends_on=None): - s = my_Slurm("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"}) + s = create_job("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"}) id = s.run(cluster.python+' '+cluster.scripts_rundir+'/obsseq_to_netcdf.py', depends_on=[depends_on]) return id def verify_sat(depends_on=None): - s = my_Slurm("verif-SAT-"+exp.expname, cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "20", + s = create_job("verif-SAT-"+exp.expname, cfg_update={"time": "60", "mail-type": "FAIL,END", "ntasks": "20", "ntasks-per-node": "20", "ntasks-per-core": "1", "mem": "100G",}) cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node sat verif1d FSS BS' s.run(cmd, depends_on=[depends_on]) def verify_wrf(depends_on=None): - s = my_Slurm("verif-WRF-"+exp.expname, cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "20", + s = create_job("verif-WRF-"+exp.expname, cfg_update={"time": "120", "mail-type": "FAIL,END", "ntasks": "20", "ntasks-per-node": "20", "ntasks-per-core": "1", "mem": "250G"}) - cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node wrf verif1d FSS BS' + cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_from_raw/analyze_fc.py '+exp.expname+' has_node wrf verif1d verif3d FSS BS' s.run(cmd, depends_on=[depends_on]) def verify_fast(depends_on=None): - s = my_Slurm("verif-fast-"+exp.expname, cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1", + s = create_job("verif-fast-"+exp.expname, cfg_update={"time": "10", "mail-type": "FAIL", "ntasks": "1", "ntasks-per-node": "1", "ntasks-per-core": "1"}) cmd = cluster.python_verif+' /jetfs/home/lkugler/osse_analysis/plot_fast/plot_single_exp.py '+exp.expname s.run(cmd, depends_on=[depends_on]) @@ -307,7 +264,7 @@ if __name__ == "__main__": # as we have WRF output, we can use own exp path as prior prior_path_exp = cluster.archivedir - + id_sat = create_satimages(time, depends_on=id) # increment time @@ -316,7 +273,6 @@ if __name__ == "__main__": # update time variables prior_init_time = time - timedelta_btw_assim - #id = gen_obsseq(id) verify_sat(id_sat) verify_wrf(id) verify_fast(id) diff --git a/dartwrf/assim_synth_obs.py b/dartwrf/assim_synth_obs.py index b4b6dedaa96c0debe105c4074c5eac5b8a61870b..ce777c3c1f8db76b74f81358a4b7c63b9b94f074 100755 --- a/dartwrf/assim_synth_obs.py +++ b/dartwrf/assim_synth_obs.py @@ -472,7 +472,7 @@ if __name__ == "__main__": options = [] if len(sys.argv) >4: options = sys.argv[5:] - nproc = 6 if 'headnode' in options else 12 + nproc = cluster.max_nproc archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/") os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in diff --git a/dartwrf/utils.py b/dartwrf/utils.py index 4185b5a5c819e89923b4addf3fad3e947b3a35ce..d4a5560068550891dcb418bad7aedab6a6672e4e 100755 --- a/dartwrf/utils.py +++ b/dartwrf/utils.py @@ -1,7 +1,78 @@ -import os, sys, shutil, glob +import os, sys, shutil, glob, warnings import builtins as __builtin__ -#copy = shutil.copy import subprocess +import datetime as dt +from slurmpy import Slurm +from config.cfg import cluster + +class ExperimentConfiguration(object): + """Collection of variables to use in code later on""" + def __init__(self): + pass + +class ClusterConfig(object): + """Collection of variables to use in code later on""" + def __init__(self): + pass + + @property + def archivedir(self): + return self.archive_base+'/'+self.expname + + def wrf_rundir(self, iens): + return self.wrf_rundir_base+'/'+self.expname+'/'+str(iens) + + @property + def scripts_rundir(self): + return self.archivedir+'/DART-WRF/' + + @property + def dartrundir(self): + return self.dart_rundir_base+'/'+self.expname+'/' + +class Shellslurm(): + """Like Slurmpy class, but runs locally""" + def __init__(self, *args, **kwargs): + pass + def run(self, *args, **kwargs): + print(args[0]) + os.system(args[0]) + +def create_job(*args, cfg_update=dict(), **kwargs): + """Shortcut to slurmpy's class; keep certain default kwargs + and only update some with kwarg `cfg_update` + see https://github.com/brentp/slurmpy + + with_slurm (bool) : if True, use SLURM, else run locally + + """ + if cluster.use_slurm: + return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update), + log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs) + else: + return Shellslurm(*args) + +def backup_scripts(): + """Copies scripts and configuration to archive dir output folder""" + os.makedirs(cluster.archivedir, exist_ok=True) + + try: + shutil.copytree(cluster.scriptsdir, cluster.scripts_rundir) + except FileExistsError: + pass + except: + raise + try: + copy(os.path.basename(__file__), cluster.scripts_rundir+'/') + except Exception as e: + warnings.warn(str(e)) + +def prepare_WRFrundir(init_time): + """Create WRF/run directories and wrfinput files + """ + cmd = cluster.python+' '+cluster.scripts_rundir+'/prepare_wrfrundir.py '+init_time.strftime('%Y-%m-%d_%H:%M') + print(cmd) + os.system(cmd) def shell(args): print(args) diff --git a/generate_free.py b/generate_free.py index b1f3cb6e16f116fc829069fd78b6616635aaa82e..acb1fc0fbc95aa87ac818a239bd4c566ad37f775 100755 --- a/generate_free.py +++ b/generate_free.py @@ -8,15 +8,14 @@ import pandas as pd from slurmpy import Slurm from config.cfg import exp, cluster -from dartwrf.utils import script_to_str, symlink +from dartwrf.utils import script_to_str, symlink, backup_scripts +from cycled_exp import * log_dir = cluster.archivedir+'/logs/' slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/' print('logging to', log_dir) print('scripts, which are submitted to SLURM:', slurm_scripts_dir) -from scheduler import * - ################################ print('starting osse')