Skip to content
Snippets Groups Projects
Commit cf2a0754 authored by lkugler's avatar lkugler
Browse files

merge towards working docs

parents b673bd55 2f44f8dd
Branches
Tags
No related merge requests found
...@@ -42,35 +42,38 @@ slurm_cfg python dictionary, containing options of SLURM ...@@ -42,35 +42,38 @@ slurm_cfg python dictionary, containing options of SLURM
'cfg_update = {"nodes": "2"}' 'cfg_update = {"nodes": "2"}'
""" """
cluster = utils.ClusterConfig(exp) cluster = utils.ClusterConfig(exp)
cluster.name = 'srvx1' cluster.name = 'jet'
cluster.max_nproc = 6 cluster.max_nproc = 12
cluster.use_slurm = False cluster.use_slurm = True
cluster.size_jobarray = 40
# binaries # binaries
cluster.python = '/users/staff/lkugler/miniconda3/bin/python' cluster.python = '/jetfs/home/lkugler/miniconda3/envs/DART/bin/python'
cluster.python_verif = '/users/staff/lkugler/miniconda3/bin/python' cluster.python_verif = '/jetfs/home/lkugler/miniconda3/envs/enstools/bin/python'
cluster.ncks = '/home/swd/spack/opt/spack/linux-rhel8-skylake_avx512/gcc-8.5.0/nco-5.0.1-ntu44aoxlvwtr2tsrobfr4lht7cpvccf/bin/ncks' cluster.ncks = '/jetfs/spack/opt/spack/linux-rhel8-skylake_avx512/intel-20.0.2/nco-4.9.3-dhlqiyog7howjmaleyfhm6lkt7ra37xf/bin/ncks'
cluster.ideal = '' #/jetfs/home/lkugler/bin/ideal-v4.3_v1.22.exe' cluster.ideal = '/jetfs/home/lkugler/bin/ideal-v4.3_v1.22.exe'
cluster.wrfexe = '' #/jetfs/home/lkugler/bin/wrf-v4.3_v1.22.exe' cluster.wrfexe = '/jetfs/home/lkugler/bin/wrf-v4.3_v1.22.exe'
cluster.container = '' cluster.container = ''
# paths for data output # paths for data output
cluster.wrf_rundir_base = '/users/staff/lkugler/AdvDA23/run_WRF/' # path for temporary files cluster.wrf_rundir_base = '/jetfs/home/lkugler/data/run_WRF/' # path for temporary files
cluster.dart_rundir_base = '/users/staff/lkugler/AdvDA23/run_DART/' # path for temporary files cluster.dart_rundir_base = '/jetfs/home/lkugler/data/run_DART/' # path for temporary files
cluster.archive_base = '/mnt/jetfs/scratch/lkugler/data/sim_archive/' cluster.archive_base = '/jetfs/home/lkugler/data/sim_archive/'
# paths used as input # paths used as input
cluster.srcdir = '/users/staff/lkugler/AdvDA23/DART/WRF-4.3/run' cluster.srcdir = '/jetfs/home/lkugler/data/compile/WRF-4.3/run'
cluster.dart_srcdir = '/users/staff/lkugler/AdvDA23/DART/models/wrf/work' cluster.dart_srcdir = '/jetfs/home/lkugler/data/compile/DART/DART-10.5.3/models/wrf/work'
cluster.rttov_srcdir = '/users/staff/lkugler/AdvDA23/RTTOV13/rtcoef_rttov13/' cluster.rttov_srcdir = '/jetfs/home/lkugler/data/compile/RTTOV13/rtcoef_rttov13/'
cluster.scriptsdir = '/users/staff/lkugler/AdvDA23/DART-WRF/dartwrf/' cluster.scriptsdir = '/jetfs/home/lkugler/DART-WRF/dartwrf/'
cluster.geo_em = '/mnt/jetfs/scratch/lkugler/data/geo_em.d01.nc'
# templates/run scripts # other inputs
cluster.geo_em = '/jetfs/home/lkugler/data/geo_em.d01.nc'
cluster.obs_impact_filename = cluster.scriptsdir+'/../templates/impactfactor_T.txt'
cluster.namelist = cluster.scriptsdir+'/../templates/namelist.input' cluster.namelist = cluster.scriptsdir+'/../templates/namelist.input'
cluster.run_WRF = cluster.scriptsdir+'/run_ens.jet.sh' cluster.run_WRF = '/jetfs/home/lkugler/DART-WRF/dartwrf/run_ens.jet.sh'
cluster.slurm_cfg = {"account": "lkugler", "partition": "compute", cluster.slurm_cfg = {"account": "lkugler", "partition": "compute", #"nodelist": "jet07",
"ntasks": "1", "ntasks-per-core": "1", "mem": "50G", "ntasks": "1", "ntasks-per-core": "1", "mem": "50G",
"mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"} "mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"}
...@@ -3,13 +3,15 @@ import os, sys, shutil, glob, warnings ...@@ -3,13 +3,15 @@ import os, sys, shutil, glob, warnings
import datetime as dt import datetime as dt
from dartwrf.utils import script_to_str from dartwrf.utils import script_to_str
from dartwrf.workflows import WorkFlows from config.cfg import exp
from config.clusters import cluster
from dartwrf.workflows import *
if __name__ == "__main__": if __name__ == "__main__":
""" """
Run a cycled OSSE with WRF and DART. Run a cycled OSSE with WRF and DART.
""" """
w = WorkFlows(exp_config='cfg.py', server_config='jet.py') cluster.setup()
timedelta_integrate = dt.timedelta(minutes=15) timedelta_integrate = dt.timedelta(minutes=15)
timedelta_btw_assim = dt.timedelta(minutes=15) timedelta_btw_assim = dt.timedelta(minutes=15)
...@@ -24,20 +26,20 @@ if __name__ == "__main__": ...@@ -24,20 +26,20 @@ if __name__ == "__main__":
last_assim_time = dt.datetime(2008, 7, 30, 13,30) last_assim_time = dt.datetime(2008, 7, 30, 13,30)
forecast_until = dt.datetime(2008, 7, 30, 18) forecast_until = dt.datetime(2008, 7, 30, 18)
w.prepare_WRFrundir(init_time) prepare_WRFrundir(init_time)
# id = w.run_ideal(depends_on=id) # id = run_ideal(depends_on=id)
# id = w.wrfinput_insert_wbubble(depends_on=id) # id = wrfinput_insert_wbubble(depends_on=id)
if True: # random if True: # random
prior_path_exp = '/jetfs/home/lkugler/data/sim_archive/exp_v1.19_P2_noDA' prior_path_exp = '/jetfs/home/lkugler/data/sim_archive/exp_v1.19_P2_noDA'
init_time = dt.datetime(2008, 7, 30, 12) init_time = dt.datetime(2008, 7, 30, 13)
time = dt.datetime(2008, 7, 30, 13) time = dt.datetime(2008, 7, 30, 14)
last_assim_time = dt.datetime(2008, 7, 30, 14) last_assim_time = dt.datetime(2008, 7, 30, 14)
forecast_until = dt.datetime(2008, 7, 30, 14, 18) forecast_until = dt.datetime(2008, 7, 30, 14, 15)
w.prepare_WRFrundir(init_time) prepare_WRFrundir(init_time)
# id = w.run_ideal(depends_on=id) # id = run_ideal(depends_on=id)
# prior_path_exp = cluster.archivedir # prior_path_exp = cluster.archivedir
# prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.19_P5+su_noDA' # prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.19_P5+su_noDA'
...@@ -52,13 +54,13 @@ if __name__ == "__main__": ...@@ -52,13 +54,13 @@ if __name__ == "__main__":
# i.e. 13z as a prior to assimilate 12z observations # i.e. 13z as a prior to assimilate 12z observations
prior_valid_time = time prior_valid_time = time
id = w.assimilate(time, prior_init_time, prior_valid_time, prior_path_exp, depends_on=id) id = assimilate(time, prior_init_time, prior_valid_time, prior_path_exp, depends_on=id)
# 1) Set posterior = prior # 1) Set posterior = prior
id = w.prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, depends_on=id) id = prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, depends_on=id)
# 2) Update posterior += updates from assimilation # 2) Update posterior += updates from assimilation
id = w.update_IC_from_DA(time, depends_on=id) id = update_IC_from_DA(time, depends_on=id)
# How long shall we integrate? # How long shall we integrate?
timedelta_integrate = timedelta_btw_assim timedelta_integrate = timedelta_btw_assim
...@@ -68,15 +70,15 @@ if __name__ == "__main__": ...@@ -68,15 +70,15 @@ if __name__ == "__main__":
output_restart_interval = 9999 # no restart file after last assim output_restart_interval = 9999 # no restart file after last assim
# 3) Run WRF ensemble # 3) Run WRF ensemble
id = w.run_ENS(begin=time, # start integration from here id = run_ENS(begin=time, # start integration from here
end=time + timedelta_integrate, # integrate until here end=time + timedelta_integrate, # integrate until here
output_restart_interval=output_restart_interval, output_restart_interval=output_restart_interval,
depends_on=id) depends_on=id)
# as we have WRF output, we can use own exp path as prior # as we have WRF output, we can use own exp path as prior
prior_path_exp = cluster.archivedir prior_path_exp = w.cluster.archivedir
id_sat = w.create_satimages(time, depends_on=id) id_sat = create_satimages(time, depends_on=id)
# increment time # increment time
time += timedelta_btw_assim time += timedelta_btw_assim
...@@ -84,6 +86,6 @@ if __name__ == "__main__": ...@@ -84,6 +86,6 @@ if __name__ == "__main__":
# update time variables # update time variables
prior_init_time = time - timedelta_btw_assim prior_init_time = time - timedelta_btw_assim
w.verify_sat(id_sat) verify_sat(id_sat)
w.verify_wrf(id) verify_wrf(id)
w.verify_fast(id) verify_fast(id)
...@@ -3,7 +3,7 @@ import time as time_module ...@@ -3,7 +3,7 @@ import time as time_module
import datetime as dt import datetime as dt
import numpy as np import numpy as np
from dartwrf.utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print, shell from dartwrf.utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print, shell, write_txt
from dartwrf.obs import error_models as err from dartwrf.obs import error_models as err
import dartwrf.create_obsseq as osq import dartwrf.create_obsseq as osq
from dartwrf import wrfout_add_geo from dartwrf import wrfout_add_geo
...@@ -141,13 +141,6 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ ...@@ -141,13 +141,6 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
os.system("rm -rf " + cluster.dartrundir + "/perfect_output_*") os.system("rm -rf " + cluster.dartrundir + "/perfect_output_*")
os.system("rm -rf " + cluster.dartrundir + "/obs_seq.fina*") os.system("rm -rf " + cluster.dartrundir + "/obs_seq.fina*")
def write_txt(lines, fpath):
try_remove(fpath)
with open(fpath, "w") as file:
for line in lines:
file.write(line+'\n')
def write_list_of_inputfiles_prior(): def write_list_of_inputfiles_prior():
files = [] files = []
for iens in range(1, exp.n_ens+1): for iens in range(1, exp.n_ens+1):
...@@ -236,13 +229,13 @@ def archive_filteroutput(time): ...@@ -236,13 +229,13 @@ def archive_filteroutput(time):
def get_parametrized_error(obscfg, osf_prior): def get_parametrized_error(obscfg, osf_prior):
"""Calculate the parametrized error for an ObsConfig (one obs type) """Calculate the parametrized error for an ObsConfig (one obs type)
Args Args:
obscfg (object): configuration of observations obscfg (object): Configuration of observations
osf_prior (obsseq.ObsRecord): contains truth and prior values from obs_seq.final osf_prior (obsseq.ObsRecord): Contains truth and prior values from obs_seq.final
(output of ./final in evaluate-mode (no posterior)) (output of ./final in evaluate-mode (no posterior))
Returns Returns:
np.array observation error std-dev for assimilation np.array: observation error std-dev for assimilation
""" """
Hx_prior = osf_prior.get_prior_Hx().T Hx_prior = osf_prior.get_prior_Hx().T
Hx_truth = osf_prior.get_truth_Hx() Hx_truth = osf_prior.get_truth_Hx()
...@@ -263,8 +256,7 @@ def set_obserr_assimilate_in_obsseqout(oso, osf_prior, outfile="./obs_seq.out"): ...@@ -263,8 +256,7 @@ def set_obserr_assimilate_in_obsseqout(oso, osf_prior, outfile="./obs_seq.out"):
""""Overwrite existing variance values in obs_seq.out files """"Overwrite existing variance values in obs_seq.out files
Args: Args:
oso (ObsSeq) : python representation of obs_seq.out file, oso (ObsSeq): python representation of obs_seq.out file, will be modified and written to file
will be modified and written to file
osf_prior (ObsSeq): python representation of obs_seq.final (output of filter in evaluate-mode without posterior) osf_prior (ObsSeq): python representation of obs_seq.final (output of filter in evaluate-mode without posterior)
contains prior values; used for parameterized errors contains prior values; used for parameterized errors
......
...@@ -180,15 +180,15 @@ def create_obs_seq_in(time_dt, list_obscfg, ...@@ -180,15 +180,15 @@ def create_obs_seq_in(time_dt, list_obscfg,
Args: Args:
time_dt (dt.datetime): time of observation time_dt (dt.datetime): time of observation
list_obscfg (list of dict): configuration for observation types list_obscfg (list of dict): configuration for observation types
must have keys:
Note:
`list_obscfg` must have these keys:
- n_obs (int) : number of observations (must be a square of an integer: 4, 9, 1000, ...) - n_obs (int) : number of observations (must be a square of an integer: 4, 9, 1000, ...)
- obs_locations (str or tuple) in ['square_array_from_domaincenter', 'square_array_evenly_on_grid', ] - obs_locations (str or tuple) in ['square_array_from_domaincenter', 'square_array_evenly_on_grid', ]
or list of (lat, lon) coordinate tuples, in degrees north/east or list of (lat, lon) coordinate tuples, in degrees north/east
- error_generate (float) - error_generate (np.array)
- error_assimilate (float or False) : False -> parameterized - error_assimilate (np.array or False) : False -> parameterized
- cov_loc_radius_km (float) - cov_loc_radius_km (float)
obs_errors (np.array): contains observation errors, one for each observation
""" """
print('creating obs_seq.in:') print('creating obs_seq.in:')
time_dt = add_timezone_UTC(time_dt) time_dt = add_timezone_UTC(time_dt)
......
import os import os
from config.cfg import exp from config.cfg import exp
from config.cluster import cluster from config.cluster import cluster
from dartwrf.utils import symlink, copy_scp_srvx8, copy, sed_inplace from dartwrf.utils import symlink, copy, sed_inplace
joinp = os.path.join joinp = os.path.join
......
"""Create obs_seq.out files with collapsed vertical dimension """Create obs_seq.out files with collapsed vertical dimension
Specifically, one observation per column which is the maximum of the column Specifically, one observation per column which is the maximum of the column
Use this script before running the OSSE workflow, to prepare obs_seq.out files.
Note:
path_3d_obsseq = '/path/exp_obs10_loc20/obs_seq_out/2008-07-30_%H:%M_obs_seq.out'
Note:
Only works in case there is 1 observation type!
Example:
python obsseq_2dim.py exp_v1.22_P2_rr_REFL_obs10_loc20_oe2.5 2008-07-30_13:00
""" """
from copy import copy from copy import copy
...@@ -8,41 +19,46 @@ import time as time_module ...@@ -8,41 +19,46 @@ import time as time_module
import datetime as dt import datetime as dt
import numpy as np import numpy as np
from config.cfg import exp
from config.cluster import cluster from config.cluster import cluster
from dartwrf import utils
from dartwrf import assim_synth_obs as aso from dartwrf import assim_synth_obs as aso
from dartwrf import obsseq from dartwrf import obsseq
def _get_n_obs_per_layer(oso):
"""Get number of observations per layer"""
height_all = np.array([a[2] for a in oso.df.loc3d])
if __name__ == "__main__": height_first = height_all[0]
assim_time = dt.datetime.strptime(sys.argv[1], "%Y-%m-%d_%H:%M") # count how often this height appears
n_obs_per_layer = int(np.sum(height_all == height_first))
return n_obs_per_layer
# prepare an obsseq without rejected observations
if exp.use_existing_obsseq: # from another exp
oso_input = assim_time.strftime(exp.use_existing_obsseq)
# only assured to work with single obstype if __name__ == "__main__":
if len(exp.observations) > 1: exp = sys.argv[1]
raise NotImplementedError() assim_time = dt.datetime.strptime(sys.argv[2], "%Y-%m-%d_%H:%M")
n_obs = exp.observations[0]['n_obs']
path_3d_obsseq = cluster.archive_base+exp+'/obs_seq_out/%Y-%m-%d_%H:%M_obs_seq.out'
oso_input = assim_time.strftime(path_3d_obsseq)
# existing obsseq with multi levels # existing obsseq with multi levels
oso = obsseq.ObsSeq(oso_input) oso = obsseq.ObsSeq(oso_input)
nlev = len(oso.df)/n_obs n_obs_3d = len(oso.df)
if nlev - int(nlev) != 0: n_obs_per_layer = _get_n_obs_per_layer(oso)
raise RuntimeError() nlev = int(n_obs_3d/n_obs_per_layer)
nlev = int(nlev) # levels per obs assert np.allclose(nlev, n_obs_3d/n_obs_per_layer), 'n_obs not evenly divisible!'
# copy will be modified print('n_obs_per_layer', n_obs_per_layer)
output = copy(oso) print('n_obs_3d', n_obs_3d)
output.df = output.df.iloc[0::nlev] # every nth level = first level
#print(output.df, oso.df) output = copy(oso) # copy will be modified
# output.df = output.df.copy() # without this, we get a SettingWithCopyWarning
output.df = output.df.iloc[0::nlev] # every nth level = first level
# iterate through, set value to max # iterate through, set value to max
for i_obs in range(0, n_obs): # go through n_obs (all columns) for i_obs in range(0, ): # go through n_obs (all columns)
i_obs_subset = i_obs*nlev # jumps by nlev (from one to next column) i_obs_subset = i_obs*nlev # jumps by nlev (from one to next column)
column = oso.df.loc[0 + i_obs_subset:nlev + i_obs_subset, :] # select column column = oso.df.loc[0 + i_obs_subset:nlev + i_obs_subset, :] # select column
...@@ -50,8 +66,9 @@ if __name__ == "__main__": ...@@ -50,8 +66,9 @@ if __name__ == "__main__":
output.df.loc[i_obs_subset, ('observations')] = float(column['observations'].max()) output.df.loc[i_obs_subset, ('observations')] = float(column['observations'].max())
output.df.loc[i_obs_subset, ('truth')] = float(column['truth'].max()) output.df.loc[i_obs_subset, ('truth')] = float(column['truth'].max())
print(output.df) #, 'observations'], output.df.loc[i_obs, 'observations']) print(output.df)
fout = cluster.archivedir + assim_time.strftime("/obs_seq_out/%Y-%m-%d_%H:%M_obs_seq.out") fout = cluster.archivedir + assim_time.strftime("/obs_seq_out/%Y-%m-%d_%H:%M_obs_seq.out")
os.makedirs(cluster.archivedir+'/obs_seq_out', exist_ok=True) os.makedirs(cluster.archivedir+'/obs_seq_out', exist_ok=True)
output.to_dart(fout) output.to_dart(fout)
utils.write_txt(["created from", oso_input,], fout[:-3]+'.txt')
...@@ -2,7 +2,7 @@ import os, sys, glob, warnings ...@@ -2,7 +2,7 @@ import os, sys, glob, warnings
from config.cfg import exp from config.cfg import exp
from config.cluster import cluster from config.cluster import cluster
import run_obs_diag as rod import dartwrf.run_obs_diag as rod
def listdir_dirs(path): def listdir_dirs(path):
return [a for a in os.listdir(path) if os.path.isdir(os.path.join(path, a))] return [a for a in os.listdir(path) if os.path.isdir(os.path.join(path, a))]
......
...@@ -18,13 +18,16 @@ from dartwrf.utils import sed_inplace, copy, symlink, mkdir ...@@ -18,13 +18,16 @@ from dartwrf.utils import sed_inplace, copy, symlink, mkdir
def run(iens, begin, end, hist_interval=5, radt=5, archive=True, def run(iens, begin, end, hist_interval=5, radt=5, archive=True,
restart=False, restart_interval=720): restart=False, restart_interval=720):
"""Create namelist.input files """Create a namelist.input file for each ensemble member
Args: Args:
archive (bool): if True, write to archivedir of experiment archive (bool): if True, write to archivedir of experiment
if False, write to WRF run directory if False, write to WRF run directory
restart (str): fortran bool whether to use wrfinput or wrfrst restart (str): fortran bool whether to use wrfinput or wrfrst
restart_interval (int): output frequency of wrfrst (minutes) restart_interval (int): output frequency of wrfrst (minutes)
Returns
None
""" """
rundir = cluster.wrf_rundir(iens) rundir = cluster.wrf_rundir(iens)
copy(cluster.namelist, rundir+'/namelist.input') copy(cluster.namelist, rundir+'/namelist.input')
...@@ -73,6 +76,7 @@ def run(iens, begin, end, hist_interval=5, radt=5, archive=True, ...@@ -73,6 +76,7 @@ def run(iens, begin, end, hist_interval=5, radt=5, archive=True,
if __name__ == '__main__': if __name__ == '__main__':
args = docopt(__doc__) args = docopt(__doc__)
begin = dt.datetime.strptime(args['<begin>'], '%Y-%m-%d_%H:%M') begin = dt.datetime.strptime(args['<begin>'], '%Y-%m-%d_%H:%M')
end = dt.datetime.strptime(args['<end>'], '%Y-%m-%d_%H:%M') end = dt.datetime.strptime(args['<end>'], '%Y-%m-%d_%H:%M')
......
"""Prepare WRF run directories, to use wrf.exe then
Args:
init_time (str): YYYY-MM-DD_HH:MM
Returns:
None
"""
import os, sys, shutil import os, sys, shutil
import datetime as dt import datetime as dt
from config.cfg import exp from config.cfg import exp
from config.cluster import cluster from config.cluster import cluster
from dartwrf.utils import symlink, copy, link_contents from dartwrf.utils import symlink, copy, link_contents
import prepare_namelist from dartwrf import prepare_namelist
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -22,7 +22,7 @@ class ClusterConfig(object): ...@@ -22,7 +22,7 @@ class ClusterConfig(object):
Example: Example:
`/users/abcd/data/sim_archive/experiment1/` `/users/abcd/data/sim_archive/experiment1/`
""" """
return self.archive_base+'/'+self.exp.expname return self.archive_base+'/'+self.exp.expname+'/'
@property @property
def scripts_rundir(self): def scripts_rundir(self):
...@@ -44,7 +44,7 @@ class ClusterConfig(object): ...@@ -44,7 +44,7 @@ class ClusterConfig(object):
"""Path to the directory where an ensemble member will run WRF """Path to the directory where an ensemble member will run WRF
Includes the experiment name and the ensemble member index Includes the experiment name and the ensemble member index
""" """
return self.wrf_rundir_base+'/'+self.exp.expname+'/'+str(iens) return self.wrf_rundir_base+'/'+self.exp.expname+'/'+str(iens)+'/'
def run_job(self, cmd, jobname='', cfg_update=dict(), depends_on=None): def run_job(self, cmd, jobname='', cfg_update=dict(), depends_on=None):
"""Run scripts in a shell """Run scripts in a shell
...@@ -63,10 +63,9 @@ class ClusterConfig(object): ...@@ -63,10 +63,9 @@ class ClusterConfig(object):
""" """
if self.use_slurm: if self.use_slurm:
from slurmpy import Slurm from slurmpy import Slurm
Slurm(jobname, slurm_kwargs=dict(self.slurm_cfg, **cfg_update), return Slurm(jobname, slurm_kwargs=dict(self.slurm_cfg, **cfg_update),
log_dir=self.log_dir, log_dir=self.log_dir,
scripts_dir=self.slurm_scripts_dir, scripts_dir=self.slurm_scripts_dir,
**kwargs
).run(cmd, depends_on=depends_on) ).run(cmd, depends_on=depends_on)
else: else:
print(cmd) print(cmd)
...@@ -109,7 +108,8 @@ def clean_wrfdir(dir): ...@@ -109,7 +108,8 @@ def clean_wrfdir(dir):
os.remove(f) os.remove(f)
def symlink(src, dst): def symlink(src, dst):
# Create a symbolic link pointing to src named dst. """Create a symbolic link from src to dst
"""
try: try:
os.symlink(src, dst) os.symlink(src, dst)
except FileExistsError: except FileExistsError:
...@@ -123,12 +123,18 @@ def symlink(src, dst): ...@@ -123,12 +123,18 @@ def symlink(src, dst):
raise e raise e
def link_contents(src, dst): def link_contents(src, dst):
"""Create symbolic links for all files in src to dst
Args:
src (str): Path to source directory
dst (str): Path to destination directory
Returns:
None
"""
for f in os.listdir(src): for f in os.listdir(src):
symlink(src+'/'+f, dst+'/'+f) symlink(src+'/'+f, dst+'/'+f)
def copy_scp_srvx8(src, dst):
os.system('scp '+src+' a1254888@srvx8.img.univie.ac.at:'+dst)
def sed_inplace(filename, pattern, repl): def sed_inplace(filename, pattern, repl):
'''Perform the pure-Python equivalent of in-place `sed` substitution '''Perform the pure-Python equivalent of in-place `sed` substitution
Like `sed -i -e 's/'${pattern}'/'${repl}' "${filename}"`. Like `sed -i -e 's/'${pattern}'/'${repl}' "${filename}"`.
...@@ -162,4 +168,28 @@ def sed_inplace(filename, pattern, repl): ...@@ -162,4 +168,28 @@ def sed_inplace(filename, pattern, repl):
shutil.move(tmp_file.name, filename) shutil.move(tmp_file.name, filename)
def append_file(f_main, f_gets_appended): def append_file(f_main, f_gets_appended):
"""Append the contents of one file to another
Args:
f_main (str): Path to file that will be appended
f_gets_appended (str): Path to file that will be appended to f_main
Returns:
None
"""
os.system('cat '+f_gets_appended+' >> '+f_main) os.system('cat '+f_gets_appended+' >> '+f_main)
def write_txt(lines, fpath):
"""Write a list of strings to a text file
Args:
lines (list): List of strings
fpath (str): Path to file
Returns:
None
"""
try_remove(fpath)
with open(fpath, "w") as file:
for line in lines:
file.write(line+'\n')
\ No newline at end of file
...@@ -25,7 +25,7 @@ def dict_to_py(d, outfile): ...@@ -25,7 +25,7 @@ def dict_to_py(d, outfile):
class WorkFlows(object): class WorkFlows(object):
def __init__(self, exp_config='cfg.py', server_config='server.py'): def __init__(self, exp_config='cfg.py', server_config='server.py'):
"""Set up the experiment folder in `archivedir`, copy config files, backup scripts. """Set up the experiment folder in `archivedir`.
Args: Args:
exp (str): Path to exp config file exp (str): Path to exp config file
...@@ -138,7 +138,6 @@ class WorkFlows(object): ...@@ -138,7 +138,6 @@ class WorkFlows(object):
if input_is_restart: # start WRF in restart mode if input_is_restart: # start WRF in restart mode
""" """
id = depends_on
restart_flag = '.false.' if not input_is_restart else '.true.' restart_flag = '.false.' if not input_is_restart else '.true.'
# if False: # doesnt work with restarts at the moment# first_minute: # if False: # doesnt work with restarts at the moment# first_minute:
...@@ -180,7 +179,7 @@ class WorkFlows(object): ...@@ -180,7 +179,7 @@ class WorkFlows(object):
if output_restart_interval: if output_restart_interval:
args.append('--restart_interval='+str(int(float(output_restart_interval)))) args.append('--restart_interval='+str(int(float(output_restart_interval))))
id = self.cluster.run_job(' '.join(args), "preWRF", cfg_update=dict(time="2"), depends_on=[id]) id = self.cluster.run_job(' '.join(args), "preWRF", cfg_update=dict(time="2"), depends_on=[depends_on])
cmd = script_to_str(self.cluster.run_WRF).replace('<exp.expname>', exp.expname cmd = script_to_str(self.cluster.run_WRF).replace('<exp.expname>', exp.expname
).replace('<cluster.wrf_rundir_base>', self.cluster.wrf_rundir_base) ).replace('<cluster.wrf_rundir_base>', self.cluster.wrf_rundir_base)
...@@ -240,7 +239,7 @@ class WorkFlows(object): ...@@ -240,7 +239,7 @@ class WorkFlows(object):
def create_satimages(self, init_time, depends_on=None): def create_satimages(self, init_time, depends_on=None):
cmd = self.cluster.python_verif+' ~/RTTOV-WRF/run_init.py '+self.cluster.archivedir+init_time.strftime('/%Y-%m-%d_%H:%M/') cmd = self.cluster.python_verif+' ~/RTTOV-WRF/run_init.py '+self.cluster.archivedir+init_time.strftime('/%Y-%m-%d_%H:%M/')
id = self.cluster.run_job(cmd, "RTTOV", cfg_update={"ntasks": "12", "time": "80", "mem": "180G"}, depends_on=[depends_on]) id = self.cluster.run_job(cmd, "RTTOV", cfg_update={"ntasks": "12", "time": "80", "mem": "200G"}, depends_on=[depends_on])
return id return id
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment