Skip to content
Snippets Groups Projects
Commit 085c0d0c authored by lkugler's avatar lkugler
Browse files

new prepare initials

parent e5850dfb
No related branches found
No related tags found
No related merge requests found
import os, sys import os, sys
import datetime as dt import datetime as dt
"""Configuration name docs
When coding, use attributes of a dictionary like this:
$ from cfg import exp, cluster
$ path = cluster.archivedir
attribute name | description
------------------------------------------------------
name any string (currently unused)
python path of python version to use
python_enstools path of python version to use for verification script (not provided)
ncks path to 'ncks' program; type 'which ncks' to find the path,
if it doesn't exist, try to load the module first ('module load nco')
ideal path to WRF's ideal.exe
wrfexe path to WRF's wrf.exe
wrf_rundir_base path for temporary files for WRF
dart_rundir_base path for temporary files for DART
archive_base path for long-time output storage
srcdir path to where WRF has been compiled, including the 'run' folder of WRF, e.g. /home/WRF-4.3/run
dart_srcdir path to DART compile directory, e.g. /home/DART-9.11.9/models/wrf/work
rttov_srcdir path to RTTOV compile directory, e.g. /home/RTTOV13/rtcoef_rttov13/
scriptsdir path where DART-WRF scripts reside, e.g. /home/DART-WRF/scripts
namelist path to a namelist template; strings like <hist_interval>, will be overwritten in scripts/prepare_namelist.py
run_WRF path to script which runs WRF on a node of the cluster
slurm_cfg python dictionary, containing options of SLURM
defined in SLURM docs (https://slurm.schedmd.com/sbatch.html)
this configuration can be overwritten later on, for example:
'dict(cluster.slurm_cfg, **cfg_update)' where
'cfg_update = {"nodes": "2"}'
"""
class ClusterConfig(object): class ClusterConfig(object):
"""Helper class, contains useful abbreviations to use in code later on""" """Helper class, contains useful abbreviations to use in code later on"""
def __init__(self): def __init__(self):
...@@ -11,7 +49,7 @@ class ClusterConfig(object): ...@@ -11,7 +49,7 @@ class ClusterConfig(object):
return self.archive_base+'/'+self.expname return self.archive_base+'/'+self.expname
def wrf_rundir(self, iens): def wrf_rundir(self, iens):
return '/gpfs/data/fs71386/lkugler/run_WRF/'+self.expname+'/'+str(iens) return self.wrf_rundir_base+'/'+self.expname+'/'+str(iens)
@property @property
def scripts_rundir(self): def scripts_rundir(self):
...@@ -19,58 +57,31 @@ class ClusterConfig(object): ...@@ -19,58 +57,31 @@ class ClusterConfig(object):
@property @property
def dartrundir(self): def dartrundir(self):
return '/gpfs/data/fs71386/lkugler/run_DART/'+self.expname+'/' return self.dart_rundir_base+'/'+self.expname+'/'
#######################################################################################
"""Configuration name docs
Use attributes of a dictionary like this: `path = vsc.archivedir`
attribute name | publicly usable | description
------------------------------------------------------
name yes any custom name (currently unused)
python yes path of python version to use
python_enstools no path of python version to use for verification script (not provided)
ncks yes path to 'ncks' program; type 'which ncks' to find the path,
if it doesn't exist, try to load the module first ('module load nco')
tmpfiledir yes path to directory where the 'run_WRF' directory is created
necessary to run WRF forecasts
userdir no path to user's directory
srcdir yes path to where WRF has been compiled
including the 'run' folder of WRF, e.g. /home/WRF-4.3/run
archive_base yes path where to write output to
in there, one folder will be created for every experiment
dart_srcdir yes path to DART compile directory, e.g. /home/DART-9.11.9/models/wrf/work
rttov_srcdir yes path to RTTOV compile directory, e.g. /home/RTTOV13/rtcoef_rttov13/
scriptsdir yes path where DART-WRF scripts reside, e.g. /home/DART-WRF/scripts
ideal yes path to WRF's ideal.exe
wrfexe yes path to WRF's wrf.exe
namelist yes path to a namelist template; strings like <hist_interval>
will be overwritten in scripts/prepare_namelist.py
run_WRF yes path to script which runs WRF on a node of the cluster
slurm_cfg yes python dictionary, containing options of SLURM
defined in SLURM docs (https://slurm.schedmd.com/sbatch.html)
this configuration can be overwritten later on, for example:
'dict(cluster.slurm_cfg, **cfg_update)' where
'cfg_update = {"nodes": "2"}'
"""
vsc = ClusterConfig() vsc = ClusterConfig()
vsc.name = 'vsc' vsc.name = 'vsc'
# binaries
vsc.python = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/python' vsc.python = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/python'
vsc.python_enstools = '/home/fs71386/lkugler/miniconda3/envs/enstools/bin/python' vsc.python_enstools = '/home/fs71386/lkugler/miniconda3/envs/enstools/bin/python'
vsc.ncks = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/ncks' vsc.ncks = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/ncks'
vsc.tmpfiledir = '/gpfs/data/fs71386/lkugler' vsc.ideal = '/home/fs71386/lkugler/compile/bin/ideal-v4.2.2_v1.16.exe'
vsc.userdir = '/home/fs71386/lkugler' vsc.wrfexe = '/home/fs71386/lkugler/compile/bin/wrf-v4.3_v1.19.exe'
vsc.srcdir = '/gpfs/data/fs71386/lkugler/compile/WRF/WRF-4.3/run'
# paths for data output
vsc.wrf_rundir_base = '/gpfs/data/fs71386/lkugler/run_WRF/' # path for temporary files
vsc.dart_rundir_base = '/gpfs/data/fs71386/lkugler/run_DART/' # path for temporary files
vsc.archive_base = '/gpfs/data/fs71386/lkugler/sim_archive/' vsc.archive_base = '/gpfs/data/fs71386/lkugler/sim_archive/'
# paths used as input
vsc.srcdir = '/gpfs/data/fs71386/lkugler/compile/WRF/WRF-4.3/run'
vsc.dart_srcdir = '/gpfs/data/fs71386/lkugler/compile/DART/DART-9.11.9/models/wrf/work' vsc.dart_srcdir = '/gpfs/data/fs71386/lkugler/compile/DART/DART-9.11.9/models/wrf/work'
vsc.rttov_srcdir = '/gpfs/data/fs71386/lkugler/compile/RTTOV13/rtcoef_rttov13/' vsc.rttov_srcdir = '/gpfs/data/fs71386/lkugler/compile/RTTOV13/rtcoef_rttov13/'
vsc.scriptsdir = '/home/fs71386/lkugler/DART-WRF/scripts/' vsc.scriptsdir = '/home/fs71386/lkugler/DART-WRF/scripts/'
vsc.ideal = vsc.userdir+'/compile/bin/ideal-v4.2.2_v1.16.exe' # templates/run scripts
vsc.wrfexe = vsc.userdir+'/compile/bin/wrf-v4.3_v1.19.exe'
vsc.namelist = vsc.scriptsdir+'/../templates/namelist.input' vsc.namelist = vsc.scriptsdir+'/../templates/namelist.input'
vsc.run_WRF = '/home/fs71386/lkugler/DART-WRF/scripts/run_ens.vsc.sh' vsc.run_WRF = '/home/fs71386/lkugler/DART-WRF/scripts/run_ens.vsc.sh'
...@@ -78,21 +89,3 @@ vsc.slurm_cfg = {"account": "p71386", "partition": "mem_0384", "qos": "p71386_03 ...@@ -78,21 +89,3 @@ vsc.slurm_cfg = {"account": "p71386", "partition": "mem_0384", "qos": "p71386_03
"nodes": "1", "ntasks": "1", "ntasks-per-node": "48", "ntasks-per-core": "1", "nodes": "1", "ntasks": "1", "ntasks-per-node": "48", "ntasks-per-core": "1",
"mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"} "mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"}
jet = ClusterConfig()
jet.name = 'jet'
jet.python = '/jetfs/home/lkugler/miniconda3/bin/python'
jet.ncks = 'ncks'
jet.userdir = '/jetfs/home/lkugler'
jet.srcdir = '/jetfs/home/lkugler/compile/WRF/WRF-4.1.5/run'
jet.scriptsdir = ''
jet.archive_base = '/jetfs/home/lkugler/data_jetfs/sim_archive/'
jet.ideal = jet.userdir+'/compile/bin/ideal.exe'
jet.wrfexe = jet.userdir+'/compile/bin/wrf-v4.2_v1.10.dmpar.exe'
jet.namelist = jet.userdir+'/config_files/namelist.input'
jet.run_WRF = '/jetfs/home/lkugler/DART-WRF/scripts/osse/run_ens.jet.sh'
jet.slurm_cfg = {"account": "p71386", "partition": "mem_0384", "qos": "p71386_0384",
"mem-per-cpu": "2GB",
"ntasks-per-node": "48", "ntasks-per-core": 1, "gres": "none"}
...@@ -10,10 +10,6 @@ from slurmpy import Slurm ...@@ -10,10 +10,6 @@ from slurmpy import Slurm
from config.cfg import exp, cluster from config.cfg import exp, cluster
from scripts.utils import script_to_str, symlink, copy from scripts.utils import script_to_str, symlink, copy
# allow scripts to access the configuration
# symlink(cluster.scriptsdir+'/../config', cluster.scriptsdir+'/config')
log_dir = cluster.archivedir+'/logs/' log_dir = cluster.archivedir+'/logs/'
slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/' slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/'
print('logging to', log_dir) print('logging to', log_dir)
...@@ -55,7 +51,7 @@ def run_ideal(depends_on=None): ...@@ -55,7 +51,7 @@ def run_ideal(depends_on=None):
export SLURM_STEP_GRES=none export SLURM_STEP_GRES=none
for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) for ((n=1; n<="""+str(exp.n_ens)+"""; n++))
do do
rundir="""+cluster.tmpfiledir+'/run_WRF/'+exp.expname+"""/$n rundir="""+cluster.wrf_rundir_base+'/'+exp.expname+"""/$n
echo $rundir echo $rundir
cd $rundir cd $rundir
mpirun -np 1 ./ideal.exe & mpirun -np 1 ./ideal.exe &
...@@ -63,7 +59,7 @@ done ...@@ -63,7 +59,7 @@ done
wait wait
for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) for ((n=1; n<="""+str(exp.n_ens)+"""; n++))
do do
rundir="""+cluster.tmpfiledir+'/run_WRF/'+exp.expname+"""/$n rundir="""+cluster.wrf_rundir_base+'/'+exp.expname+"""/$n
mv $rundir/rsl.out.0000 $rundir/rsl.out.input mv $rundir/rsl.out.0000 $rundir/rsl.out.input
done done
""" """
...@@ -113,33 +109,33 @@ def run_ENS(begin, end, depends_on=None, first_minute=True, ...@@ -113,33 +109,33 @@ def run_ENS(begin, end, depends_on=None, first_minute=True,
id = depends_on id = depends_on
restart_flag = '.false.' if not input_is_restart else '.true.' restart_flag = '.false.' if not input_is_restart else '.true.'
if False: # doesnt work with restarts at the moment# first_minute: # if False: # doesnt work with restarts at the moment# first_minute:
# first minute forecast (needed for validating an assimilation) # # first minute forecast (needed for validating an assimilation)
hist_interval = 1 # hist_interval = 1
radt = 1 # calc CFRAC also in first minute # radt = 1 # calc CFRAC also in first minute
begin_plus1 = begin+dt.timedelta(minutes=1) # begin_plus1 = begin+dt.timedelta(minutes=1)
s = my_Slurm("preWRF1", cfg_update=dict(time="2")) # s = my_Slurm("preWRF1", cfg_update=dict(time="2"))
args = [cluster.python, cluster.scripts_rundir+'/prepare_namelist.py', # args = [cluster.python, cluster.scripts_rundir+'/prepare_namelist.py',
begin.strftime('%Y-%m-%d_%H:%M'), # begin.strftime('%Y-%m-%d_%H:%M'),
begin_plus1.strftime('%Y-%m-%d_%H:%M'), # begin_plus1.strftime('%Y-%m-%d_%H:%M'),
str(hist_interval), # str(hist_interval),
'--radt='+str(radt), # '--radt='+str(radt),
'--restart='+restart_flag,] # '--restart='+restart_flag,]
if restart_path: # restart from a wrfrst file in restart_path directory # if restart_path: # restart from a wrfrst file in restart_path directory
args.append('--rst_inname='+restart_path) # args.append('--rst_inname='+restart_path)
id = s.run(' '.join(args), depends_on=[id]) # id = s.run(' '.join(args), depends_on=[id])
s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), # s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
"time": "2", "mem-per-cpu": "2G"}) # "time": "2", "mem-per-cpu": "2G"})
cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname) # cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
id = s.run(cmd, depends_on=[id]) # id = s.run(cmd, depends_on=[id])
# apply forward operator (DART filter without assimilation) # # apply forward operator (DART filter without assimilation)
s = my_Slurm("fwOP-1m", cfg_update=dict(time="10", ntasks=48)) # s = my_Slurm("fwOP-1m", cfg_update=dict(time="10", ntasks=48))
id = s.run(cluster.python+' '+cluster.scripts_rundir+'/apply_obs_op_dart.py ' # id = s.run(cluster.python+' '+cluster.scripts_rundir+'/apply_obs_op_dart.py '
+ begin.strftime('%Y-%m-%d_%H:%M')+' ' # + begin.strftime('%Y-%m-%d_%H:%M')+' '
+ begin_plus1.strftime('%Y-%m-%d_%H:%M'), # + begin_plus1.strftime('%Y-%m-%d_%H:%M'),
depends_on=[id]) # depends_on=[id])
# whole forecast timespan # whole forecast timespan
hist_interval = 5 hist_interval = 5
...@@ -165,30 +161,20 @@ def run_ENS(begin, end, depends_on=None, first_minute=True, ...@@ -165,30 +161,20 @@ def run_ENS(begin, end, depends_on=None, first_minute=True,
"time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"}) "time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"})
cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname) cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
id = s.run(cmd, depends_on=[id]) id = s.run(cmd, depends_on=[id])
# not needed, since wrf.exe writes directly to archive folder
#s = my_Slurm("archiveWRF", cfg_update=dict(nodes="1", ntasks="1", time="10"))
#id3 = s.run(cluster.python+' '+cluster.scripts_rundir+'/archive_wrf.py '
# + begin.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2])
return id return id
def assimilate(assim_time, prior_init_time, prior_valid_time,
prior_path_exp=False, input_is_restart=False, depends_on=None): def assimilate(assim_time, prior_init_time, prior_valid_time, prior_path_exp,
input_is_restart=True, depends_on=None):
"""Creates observations from a nature run and assimilates them. """Creates observations from a nature run and assimilates them.
Args: Args:
assim_time (dt.datetime): timestamp of prior wrfout files assim_time (dt.datetime): timestamp of prior wrfout files
prior_init_time (dt.datetime): prior_init_time (dt.datetime): timestamp to find the directory where the prior wrfout files are
timestamp to find the directory where the prior wrfout files are prior_path_exp (str): use this directory to get prior state (i.e. cluster.archivedir)
prior_path_exp (bool or str):
put a `str` to take the prior from a different experiment
if False: use `archivedir` (defined in config) to get prior state
if str: use this directory to get prior state
""" """
if not prior_path_exp: if not os.path.exists(prior_path_exp):
prior_path_exp = cluster.archivedir raise IOError('prior_path_exp does not exist: '+prior_path_exp)
elif not isinstance(prior_path_exp, str):
raise TypeError('prior_path_exp either str or False, is '+str(type(prior_path_exp)))
id = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "60", id = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "60",
"mem": "300G", "ntasks-per-node": "96", "ntasks-per-core": "2"} "mem": "300G", "ntasks-per-node": "96", "ntasks-per-core": "2"}
...@@ -197,15 +183,25 @@ def assimilate(assim_time, prior_init_time, prior_valid_time, ...@@ -197,15 +183,25 @@ def assimilate(assim_time, prior_init_time, prior_valid_time,
+prior_init_time.strftime('%Y-%m-%d_%H:%M ') +prior_init_time.strftime('%Y-%m-%d_%H:%M ')
+prior_valid_time.strftime('%Y-%m-%d_%H:%M ') +prior_valid_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp, depends_on=[depends_on]) +prior_path_exp, depends_on=[depends_on])
return id
id = my_Slurm("WRF_IC", cfg_update=dict(time="8")
).run(cluster.python+' '+cluster.scripts_rundir+'/prepare_wrf_initials.py ' def prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, depends_on=None):
id = my_Slurm("IC-prior", cfg_update=dict(time="8")
).run(cluster.python+' '+cluster.scripts_rundir+'/prep_IC_prior.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ') +assim_time.strftime('%Y-%m-%d_%H:%M ')
+prior_init_time.strftime('%Y-%m-%d_%H:%M ') +prior_init_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp, depends_on=[id]) +prior_path_exp, depends_on=[id])
return id return id
def update_IC_from_DA(assim_time):
id = my_Slurm("IC-update", cfg_update=dict(time="8")
).run(cluster.python+' '+cluster.scripts_rundir+'/update_IC.py '
+assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id])
return id
def create_satimages(init_time, depends_on=None): def create_satimages(init_time, depends_on=None):
s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "60", "nodes": "1"}) s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "60", "nodes": "1"})
id = s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/run_init.py '+cluster.archivedir id = s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/run_init.py '+cluster.archivedir
...@@ -213,38 +209,39 @@ def create_satimages(init_time, depends_on=None): ...@@ -213,38 +209,39 @@ def create_satimages(init_time, depends_on=None):
depends_on=[depends_on]) depends_on=[depends_on])
return id return id
def mailme(depends_on=None): def mailme(depends_on=None):
if depends_on: if depends_on:
s = my_Slurm("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"}) s = my_Slurm("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"})
s.run('sleep 1', depends_on=[depends_on]) s.run('sleep 1', depends_on=[depends_on])
def gen_obsseq(depends_on=None): def gen_obsseq(depends_on=None):
s = my_Slurm("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"}) s = my_Slurm("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"})
id = s.run(cluster.python+' '+cluster.scripts_rundir+'/obsseq_to_netcdf.py', id = s.run(cluster.python+' '+cluster.scripts_rundir+'/obsseq_to_netcdf.py',
depends_on=[depends_on]) depends_on=[depends_on])
return id return id
def verify(depends_on=None): def verify(depends_on=None):
s = my_Slurm("verify", cfg_update={"time": "240", "mail-type": "FAIL,END", s = my_Slurm("verify", cfg_update={"time": "240", "mail-type": "FAIL,END",
"ntasks": "96", "ntasks-per-node": "96", "ntasks-per-core": "2"}) "ntasks": "96", "ntasks-per-node": "96", "ntasks-per-core": "2"})
s.run(cluster.python_enstools+' '+cluster.userdir+'/osse_analysis/analyze_fc.py '+exp.expname+' has_node', s.run(cluster.python_enstools+' /home/fs71386/lkugler/osse_analysis/analyze_fc.py '+exp.expname+' has_node',
depends_on=[depends_on]) depends_on=[depends_on])
def copy_to_jet(depends_on=None):
Slurm('rsync-jet', slurm_kwargs={"time": "30", def cleanup_storage(depends=None):
"account": "p71386", "partition": "mem_0384", "qos": "p71386_0384", Slurm('cleanup').run(
"ntasks": "1", "mem": "5gb", cluster.python+' '+cluster.scripts_rundir+'/cleanup_exp.py '+exp.expname,
"mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"},
log_dir=log_dir, scripts_dir=slurm_scripts_dir,
).run("bash -c 'nohup rsync -avh "+cluster.archivedir+" lkugler@jet01.img.univie.ac.at:/jetfs/home/lkugler/data/sim_archive/ &'",
depends_on=[depends_on]) depends_on=[depends_on])
################################ ################################
if __name__ == "__main__": if __name__ == "__main__":
print('starting osse') print('starting osse')
timedelta_integrate = dt.timedelta(minutes=30) timedelta_integrate = dt.timedelta(minutes=15)
timedelta_btw_assim = dt.timedelta(minutes=30) timedelta_btw_assim = dt.timedelta(minutes=15)
backup_scripts() backup_scripts()
id = None id = None
...@@ -252,7 +249,7 @@ if __name__ == "__main__": ...@@ -252,7 +249,7 @@ if __name__ == "__main__":
start_from_existing_state = True start_from_existing_state = True
is_new_run = not start_from_existing_state is_new_run = not start_from_existing_state
init_time = dt.datetime(2008, 7, 30, 10) init_time = dt.datetime(2008, 7, 30, 12)
id = prepare_WRFrundir(init_time) id = prepare_WRFrundir(init_time)
if is_new_run: if is_new_run:
...@@ -268,42 +265,45 @@ if __name__ == "__main__": ...@@ -268,42 +265,45 @@ if __name__ == "__main__":
prior_path_exp = False # for next assimilation prior_path_exp = False # for next assimilation
elif start_from_existing_state: elif start_from_existing_state:
#prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_Pwbub_40mem'
#prior_path_exp = cluster.archivedir # #prior_path_exp = cluster.archivedir #
prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem_rst' #prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem_rst'
#prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.18_Pwbub_PriorPert10_40mem' prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.19_P1_noDA'
#id = update_wrfinput_from_archive(integration_end_time, init_time, prior_path_exp, depends_on=id) #id = update_wrfinput_from_archive(integration_end_time, init_time, prior_path_exp, depends_on=id)
#id = wrfinput_insert_wbubble(depends_on=id) #id = wrfinput_insert_wbubble(depends_on=id)
id = prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, depends_on=id)
# values for assimilation # values for assimilation
time = dt.datetime(2008, 7, 30, 11) time = dt.datetime(2008, 7, 30, 13)
assim_time = time
prior_init_time = init_time prior_init_time = init_time
while time <= dt.datetime(2008, 7, 30, 11): while time <= dt.datetime(2008, 7, 30, 14):
id = assimilate(assim_time, # usually we take the prior from the current time
prior_init_time, # but one could use a prior from a different time from another run
prior_valid_time=time, #+dt.timedelta(hours=2), # i.e. 13z as a prior to assimilate 12z observations
prior_path_exp=prior_path_exp, prior_valid_time = time
input_is_restart=True,
depends_on=id) id = assimilate(time, prior_init_time, prior_valid_time, prior_path_exp, depends_on=id)
prior_path_exp = cluster.archivedir # use own exp path as prior
# after first assimilation, we can use own exp path as prior
prior_path_exp = cluster.archivedir
# 1) Set posterior = prior
id = prepare_IC_from_prior(prior_path_exp, prior_init_time, prior_valid_time, depends_on=id)
# integration # 2) Update posterior += updates from assimilation
this_forecast_init = assim_time # start integration from here id = update_IC_from_DA(time, depends_on=id)
# How long shall we integrate?
timedelta_integrate = timedelta_btw_assim timedelta_integrate = timedelta_btw_assim
if this_forecast_init.minute in [0,]: # longer forecast every full hour if this_forecast_init.minute in [0,]: # longer forecast every full hour
timedelta_integrate = dt.timedelta(hours=3) timedelta_integrate = dt.timedelta(hours=3)
this_forecast_end = assim_time + timedelta_integrate # 3) Run WRF ensemble
id = run_ENS(begin=time, # start integration from here
id = run_ENS(begin=this_forecast_init, end=time + timedelta_integrate, # integrate until here
end=this_forecast_end, restart_path=cluster.archivedir+prior_init_time.strftime('/%Y-%m-%d_%H:%M/'),
input_is_restart=True, output_restart_interval=timedelta_btw_assim.total_seconds()/60,
restart_path=False, #cluster.archivedir+prior_init_time.strftime('/%Y-%m-%d_%H:%M/'),
output_restart_interval=30, #timedelta_btw_assim.total_seconds()/60,
depends_on=id) depends_on=id)
create_satimages(this_forecast_init, depends_on=id) create_satimages(this_forecast_init, depends_on=id)
...@@ -311,9 +311,10 @@ if __name__ == "__main__": ...@@ -311,9 +311,10 @@ if __name__ == "__main__":
# increment time # increment time
time += timedelta_btw_assim time += timedelta_btw_assim
# values for next iteration # update time variables
assim_time = time
prior_init_time = time - timedelta_btw_assim prior_init_time = time - timedelta_btw_assim
#cleanup_storage(id)
id = gen_obsseq(id) id = gen_obsseq(id)
verify(id) verify(id)
...@@ -266,8 +266,6 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_ ...@@ -266,8 +266,6 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
os.system('rm -rf '+cluster.dartrundir+'/perfect_output_*') os.system('rm -rf '+cluster.dartrundir+'/perfect_output_*')
os.system('rm -rf '+cluster.dartrundir+'/obs_seq.fina*') os.system('rm -rf '+cluster.dartrundir+'/obs_seq.fina*')
os.system(cluster.python+' '+cluster.scriptsdir+'/link_dart_rttov.py')
def calc_obserr_WV73(Hx_nature, Hx_prior): def calc_obserr_WV73(Hx_nature, Hx_prior):
...@@ -406,8 +404,9 @@ if __name__ == "__main__": ...@@ -406,8 +404,9 @@ if __name__ == "__main__":
- write state to archive - write state to archive
Assumptions: Note:
- x_ensemble is already linked for DART to advance_temp<iens>/wrfout_d01 assim_time (dt.datetime): time of output
prior_valid_time (dt.datetime): valid time of prior (may be different to assim_time)
Example call: Example call:
python assim.py 2008-08-07_12:00 2008-08-06:00 2008-08-07_13:00 /home/fs71386/lkugler/data/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem python assim.py 2008-08-07_12:00 2008-08-06:00 2008-08-07_13:00 /home/fs71386/lkugler/data/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem
...@@ -419,8 +418,9 @@ if __name__ == "__main__": ...@@ -419,8 +418,9 @@ if __name__ == "__main__":
prior_path_exp = str(sys.argv[4]) prior_path_exp = str(sys.argv[4])
archive_time = cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/') archive_time = cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/')
os.makedirs(cluster.dartrundir, exist_ok=True) os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in
os.chdir(cluster.dartrundir) os.chdir(cluster.dartrundir)
os.system(cluster.python+' '+cluster.scripts_rundir+'/link_dart_rttov.py') # link DART binaries to run_DART
os.system('rm -f input.nml obs_seq.in obs_seq.out obs_seq.final') # remove any existing observation files os.system('rm -f input.nml obs_seq.in obs_seq.out obs_seq.final') # remove any existing observation files
set_DART_nml() set_DART_nml()
......
...@@ -8,13 +8,12 @@ from utils import try_remove ...@@ -8,13 +8,12 @@ from utils import try_remove
2) remove run_DART folders from exp 2) remove run_DART folders from exp
3) remove run_WRF files from exp 3) remove run_WRF files from exp
""" """
expname = sys.argv[1]
keep_last_init_wrfrst = True keep_last_init_wrfrst = True
print('removing files for exp', exp) print('removing files for exp', exp)
# 1) wrfrst # 1) wrfrst
inits = reversed(sorted(glob.glob(cluster.archive_base+'/'+expname+'/20??-??-??_??:??'))) inits = reversed(sorted(glob.glob(cluster.archivedir+'/20??-??-??_??:??')))
for k, init in enumerate(inits): for k, init in enumerate(inits):
rst_files = glob.glob(init+'/*/wrfrst_*') rst_files = glob.glob(init+'/*/wrfrst_*')
...@@ -28,7 +27,7 @@ for k, init in enumerate(inits): ...@@ -28,7 +27,7 @@ for k, init in enumerate(inits):
try_remove(f) try_remove(f)
# 2) run_DART/exp # 2) run_DART/exp
os.removedirs(cluster.tmpfiledir+'/run_DART/'+expname) os.removedirs(cluster.dartrundir)
# 3) run_WRF/exp # 3) run_WRF/exp
for iens in range(1, exp.n_ens+1): for iens in range(1, exp.n_ens+1):
......
...@@ -9,6 +9,7 @@ bins = ['perfect_model_obs', 'filter', 'obs_diag', 'obs_seq_to_netcdf'] ...@@ -9,6 +9,7 @@ bins = ['perfect_model_obs', 'filter', 'obs_diag', 'obs_seq_to_netcdf']
for b in bins: for b in bins:
symlink(joinp(cluster.dart_srcdir, b), symlink(joinp(cluster.dart_srcdir, b),
joinp(cluster.dartrundir, b)) joinp(cluster.dartrundir, b))
print(joinp(cluster.dartrundir, b), 'created')
rttov_files = ['rttov13pred54L/rtcoef_msg_4_seviri_o3.dat', rttov_files = ['rttov13pred54L/rtcoef_msg_4_seviri_o3.dat',
#'mfasis_lut/rttov_mfasis_cld_msg_4_seviri_deff.dat', #'mfasis_lut/rttov_mfasis_cld_msg_4_seviri_deff.dat',
...@@ -32,3 +33,5 @@ symlink(cluster.dart_srcdir+'/../../../observations/forward_operators/rttov_sens ...@@ -32,3 +33,5 @@ symlink(cluster.dart_srcdir+'/../../../observations/forward_operators/rttov_sens
symlink(cluster.dart_srcdir+'/../../../assimilation_code/programs/gen_sampling_err_table/work/sampling_error_correction_table.nc', symlink(cluster.dart_srcdir+'/../../../assimilation_code/programs/gen_sampling_err_table/work/sampling_error_correction_table.nc',
cluster.dartrundir+'/sampling_error_correction_table.nc') cluster.dartrundir+'/sampling_error_correction_table.nc')
print('prepared DART & RTTOV links in', cluster.dartrundir)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment