Skip to content
Snippets Groups Projects
Commit 93867d32 authored by lkugler's avatar lkugler
Browse files

restart capability etc

parent c3184b59
No related branches found
No related tags found
No related merge requests found
...@@ -8,7 +8,7 @@ import datetime as dt ...@@ -8,7 +8,7 @@ import datetime as dt
from slurmpy import Slurm from slurmpy import Slurm
from config.cfg import exp, cluster from config.cfg import exp, cluster
from scripts.utils import script_to_str, symlink from scripts.utils import script_to_str, symlink, copy
# allow scripts to access the configuration # allow scripts to access the configuration
...@@ -28,30 +28,29 @@ def my_Slurm(*args, cfg_update=dict(), **kwargs): ...@@ -28,30 +28,29 @@ def my_Slurm(*args, cfg_update=dict(), **kwargs):
log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs) log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs)
def backup_scripts(): def backup_scripts():
current = cluster.scriptsdir
main_a = cluster.scripts_rundir
# old_a = main_a+'/old/'
os.makedirs(cluster.archivedir, exist_ok=True) os.makedirs(cluster.archivedir, exist_ok=True)
# def func(a, b, method): # call method if not link or directory
# if os.path.islink(a) or os.path.isdir(a):
# pass
# else:
# method(a, b)
try: try:
shutil.copytree(cluster.scriptsdir, cluster.scripts_rundir) shutil.copytree(cluster.scriptsdir, cluster.scripts_rundir)
except FileExistsError: except FileExistsError:
pass pass
except: except:
raise raise
try:
copy(os.path.basename(__file__), cluster.scripts_rundir+'/')
except Exception as e:
warnings.warn(str(e))
def prepare_wrfinput(init_time): def prepare_WRFrundir(init_time):
"""Create WRF/run directories and wrfinput files """Create WRF/run directories and wrfinput files
""" """
s = my_Slurm("prep_wrfinput", cfg_update={"time": "10", "mail-type": "BEGIN"}) s = my_Slurm("prep_wrfrundir", cfg_update={"time": "5", "mail-type": "BEGIN"})
id = s.run(cluster.python+' '+cluster.scripts_rundir+'/prepare_wrfinput.py ' id = s.run(cluster.python+' '+cluster.scripts_rundir+'/prepare_wrfrundir.py '
+init_time.strftime('%Y-%m-%d_%H:%M')) +init_time.strftime('%Y-%m-%d_%H:%M'))
return id
def run_ideal(depends_on=None):
"""Run ideal for every ensemble member"""
cmd = """# run ideal.exe in parallel, then add geodata cmd = """# run ideal.exe in parallel, then add geodata
export SLURM_STEP_GRES=none export SLURM_STEP_GRES=none
for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) for ((n=1; n<="""+str(exp.n_ens)+"""; n++))
...@@ -70,7 +69,7 @@ done ...@@ -70,7 +69,7 @@ done
""" """
s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens), "nodes": "1", s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens), "nodes": "1",
"time": "10", "mem-per-cpu": "2G"}) "time": "10", "mem-per-cpu": "2G"})
id = s.run(cmd, depends_on=[id]) id = s.run(cmd, depends_on=[depends_on])
return id return id
def update_wrfinput_from_archive(valid_time, background_init_time, exppath, depends_on=None): def update_wrfinput_from_archive(valid_time, background_init_time, exppath, depends_on=None):
...@@ -86,33 +85,49 @@ def update_wrfinput_from_archive(valid_time, background_init_time, exppath, depe ...@@ -86,33 +85,49 @@ def update_wrfinput_from_archive(valid_time, background_init_time, exppath, depe
+IC_path, depends_on=[depends_on]) +IC_path, depends_on=[depends_on])
return id return id
def wrfinput_insert_wbubble(depends_on=None): def wrfinput_insert_wbubble(perturb=True, depends_on=None):
"""Given that directories with wrfinput files exist, """Given that directories with wrfinput files exist,
update these wrfinput files with warm bubbles update these wrfinput files with warm bubbles
""" """
s = my_Slurm("ins_wbubble", cfg_update={"time": "5"}) s = my_Slurm("ins_wbubble", cfg_update={"time": "5"})
id = s.run(cluster.python+' '+cluster.scripts_rundir+'/create_wbubble_wrfinput.py', pstr = ' '
if perturb:
pstr = ' perturb'
id = s.run(cluster.python+' '+cluster.scripts_rundir+'/create_wbubble_wrfinput.py'+pstr,
depends_on=[depends_on]) depends_on=[depends_on])
return id return id
def run_ENS(begin, end, depends_on=None, first_minute=True): def run_ENS(begin, end, depends_on=None, first_minute=True,
input_is_restart=False, restart_path=False, output_restart_interval=720):
"""Run forecast for 1 minute, save output. """Run forecast for 1 minute, save output.
Then run whole timespan with 5 minutes interval. Then run whole timespan with 5 minutes interval.
if input_is_restart: # start WRF in restart mode
if restart_path:
# restart from a wrfrst file in restart_path directory
# e.g. when restarting from a state in an archivedir (from any existing experiment)
else:
# restart from wrfrst files that are already in run_WRF directories
# e.g. after updateIC, it puts wrfrst in run_WRF directories
""" """
id = depends_on id = depends_on
restart_flag = '.false.' if not input_is_restart else '.true.'
if first_minute: if False: # doesnt work with restarts at the moment# first_minute:
# first minute forecast (needed for validating an assimilation) # first minute forecast (needed for validating an assimilation)
hist_interval = 1 hist_interval = 1
radt = 1 # calc CFRAC also in first minute radt = 1 # calc CFRAC also in first minute
begin_plus1 = begin+dt.timedelta(minutes=1) begin_plus1 = begin+dt.timedelta(minutes=1)
s = my_Slurm("preWRF1", cfg_update=dict(time="2")) s = my_Slurm("preWRF1", cfg_update=dict(time="2"))
id = s.run(' '.join([cluster.python, args = [cluster.python, cluster.scripts_rundir+'/prepare_namelist.py',
cluster.scripts_rundir+'/prepare_namelist.py', begin.strftime('%Y-%m-%d_%H:%M'),
begin.strftime('%Y-%m-%d_%H:%M'), begin_plus1.strftime('%Y-%m-%d_%H:%M'),
begin_plus1.strftime('%Y-%m-%d_%H:%M'), str(hist_interval),
str(hist_interval), str(radt),]), '--radt='+str(radt),
depends_on=[id]) '--restart='+restart_flag,]
if restart_path: # restart from a wrfrst file in restart_path directory
args.append('--rst_inname='+restart_path)
id = s.run(' '.join(args), depends_on=[id])
s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
"time": "2", "mem-per-cpu": "2G"}) "time": "2", "mem-per-cpu": "2G"})
...@@ -129,16 +144,23 @@ def run_ENS(begin, end, depends_on=None, first_minute=True): ...@@ -129,16 +144,23 @@ def run_ENS(begin, end, depends_on=None, first_minute=True):
# whole forecast timespan # whole forecast timespan
hist_interval = 5 hist_interval = 5
radt = 5 radt = 5
args = [cluster.python,
cluster.scripts_rundir+'/prepare_namelist.py',
begin.strftime('%Y-%m-%d_%H:%M'),
end.strftime('%Y-%m-%d_%H:%M'),
str(hist_interval),
'--radt='+str(radt),
'--restart='+restart_flag,]
if restart_path:
args.append('--rst_inname='+restart_path)
if output_restart_interval:
args.append('--restart_interval='+str(int(float(output_restart_interval))))
s = my_Slurm("preWRF2", cfg_update=dict(time="2")) s = my_Slurm("preWRF2", cfg_update=dict(time="2"))
id = s.run(' '.join([cluster.python, id = s.run(' '.join(args), depends_on=[id])
cluster.scripts_rundir+'/prepare_namelist.py',
begin.strftime('%Y-%m-%d_%H:%M'),
end.strftime('%Y-%m-%d_%H:%M'),
str(hist_interval), str(radt),]),
depends_on=[id])
time_in_simulation_hours = (end-begin).total_seconds()/3600 time_in_simulation_hours = (end-begin).total_seconds()/3600
runtime_wallclock_mins_expected = int(6+time_in_simulation_hours*10) # usually below 9 min/hour runtime_wallclock_mins_expected = int(8+time_in_simulation_hours*9.5) # usually below 9 min/hour
s = my_Slurm("runWRF2", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), s = my_Slurm("runWRF2", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
"time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"}) "time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"})
cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname) cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
...@@ -151,7 +173,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True): ...@@ -151,7 +173,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True):
return id return id
def assimilate(assim_time, prior_init_time, prior_valid_time, def assimilate(assim_time, prior_init_time, prior_valid_time,
prior_path_exp=False, depends_on=None): prior_path_exp=False, input_is_restart=False, depends_on=None):
"""Creates observations from a nature run and assimilates them. """Creates observations from a nature run and assimilates them.
Args: Args:
...@@ -168,26 +190,16 @@ def assimilate(assim_time, prior_init_time, prior_valid_time, ...@@ -168,26 +190,16 @@ def assimilate(assim_time, prior_init_time, prior_valid_time,
elif not isinstance(prior_path_exp, str): elif not isinstance(prior_path_exp, str):
raise TypeError('prior_path_exp either str or False, is '+str(type(prior_path_exp))) raise TypeError('prior_path_exp either str or False, is '+str(type(prior_path_exp)))
id = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "60",
# # prepare prior model state "mem": "300G", "ntasks-per-node": "96", "ntasks-per-core": "2"}
# s = my_Slurm("preAssim", cfg_update=dict(time="2")) ).run(cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py '
# id = s.run(cluster.python+' '+cluster.scripts_rundir+'/pre_assim.py '
# +assim_time.strftime('%Y-%m-%d_%H:%M ')
# +prior_init_time.strftime('%Y-%m-%d_%H:%M ')
# +prior_valid_time.strftime('%Y-%m-%d_%H:%M ')
# +prior_path_exp, depends_on=[depends_on])
s = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "60",
"mem": "300G", "ntasks-per-node": "96", "ntasks-per-core": "2"})
id = s.run(cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ') +assim_time.strftime('%Y-%m-%d_%H:%M ')
+prior_init_time.strftime('%Y-%m-%d_%H:%M ') +prior_init_time.strftime('%Y-%m-%d_%H:%M ')
+prior_valid_time.strftime('%Y-%m-%d_%H:%M ') +prior_valid_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp, depends_on=[depends_on]) +prior_path_exp, depends_on=[depends_on])
id = my_Slurm("WRF_IC", cfg_update=dict(time="8")
s = my_Slurm("updateIC", cfg_update=dict(time="8")) ).run(cluster.python+' '+cluster.scripts_rundir+'/prepare_wrf_initials.py '
id = s.run(cluster.python+' '+cluster.scripts_rundir+'/update_wrfinput_from_filteroutput.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ') +assim_time.strftime('%Y-%m-%d_%H:%M ')
+prior_init_time.strftime('%Y-%m-%d_%H:%M ') +prior_init_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp, depends_on=[id]) +prior_path_exp, depends_on=[id])
...@@ -195,10 +207,11 @@ def assimilate(assim_time, prior_init_time, prior_valid_time, ...@@ -195,10 +207,11 @@ def assimilate(assim_time, prior_init_time, prior_valid_time,
def create_satimages(init_time, depends_on=None): def create_satimages(init_time, depends_on=None):
s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "30", "nodes": "1"}) s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "60", "nodes": "1"})
s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/run_init.py '+cluster.archivedir id = s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/run_init.py '+cluster.archivedir
+init_time.strftime('/%Y-%m-%d_%H:%M/'), +init_time.strftime('/%Y-%m-%d_%H:%M/'),
depends_on=[depends_on]) depends_on=[depends_on])
return id
def mailme(depends_on=None): def mailme(depends_on=None):
if depends_on: if depends_on:
...@@ -230,8 +243,8 @@ def copy_to_jet(depends_on=None): ...@@ -230,8 +243,8 @@ def copy_to_jet(depends_on=None):
if __name__ == "__main__": if __name__ == "__main__":
print('starting osse') print('starting osse')
timedelta_integrate = dt.timedelta(minutes=15) timedelta_integrate = dt.timedelta(minutes=30)
timedelta_btw_assim = dt.timedelta(minutes=15) timedelta_btw_assim = dt.timedelta(minutes=30)
backup_scripts() backup_scripts()
id = None id = None
...@@ -239,9 +252,11 @@ if __name__ == "__main__": ...@@ -239,9 +252,11 @@ if __name__ == "__main__":
start_from_existing_state = True start_from_existing_state = True
is_new_run = not start_from_existing_state is_new_run = not start_from_existing_state
init_time = dt.datetime(2008, 7, 30, 10)
id = prepare_WRFrundir(init_time)
if is_new_run: if is_new_run:
init_time = dt.datetime(2008, 7, 30, 9) id = run_ideal(depends_on=id)
id = prepare_wrfinput(init_time) # create initial conditions
id = wrfinput_insert_wbubble(depends_on=id) id = wrfinput_insert_wbubble(depends_on=id)
# spin up the ensemble # spin up the ensemble
...@@ -253,17 +268,15 @@ if __name__ == "__main__": ...@@ -253,17 +268,15 @@ if __name__ == "__main__":
prior_path_exp = False # for next assimilation prior_path_exp = False # for next assimilation
elif start_from_existing_state: elif start_from_existing_state:
time = dt.datetime(2008, 7, 30, 11)
# prior init time
init_time = dt.datetime(2008, 7, 30, 6)
#prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_Pwbub_40mem' #prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_Pwbub_40mem'
#prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem' #prior_path_exp = cluster.archivedir #
prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_P1_40mem' prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem_rst'
#prior_path_exp = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.18_Pwbub_PriorPert10_40mem'
#id = update_wrfinput_from_archive(integration_end_time, init_time, prior_path_exp, depends_on=id) #id = update_wrfinput_from_archive(integration_end_time, init_time, prior_path_exp, depends_on=id)
#id = wrfinput_insert_wbubble(depends_on=id) #id = wrfinput_insert_wbubble(depends_on=id)
# values for assimilation # values for assimilation
time = dt.datetime(2008, 7, 30, 11)
assim_time = time assim_time = time
prior_init_time = init_time prior_init_time = init_time
...@@ -271,22 +284,26 @@ if __name__ == "__main__": ...@@ -271,22 +284,26 @@ if __name__ == "__main__":
id = assimilate(assim_time, id = assimilate(assim_time,
prior_init_time, prior_init_time,
prior_valid_time=time+dt.timedelta(hours=2), prior_valid_time=time, #+dt.timedelta(hours=2),
prior_path_exp=prior_path_exp, prior_path_exp=prior_path_exp,
input_is_restart=True,
depends_on=id) depends_on=id)
prior_path_exp = False # use own exp path as prior prior_path_exp = cluster.archivedir # use own exp path as prior
# integration # integration
this_forecast_init = assim_time # start integration from here this_forecast_init = assim_time # start integration from here
timedelta_integrate = timedelta_btw_assim timedelta_integrate = timedelta_btw_assim
if this_forecast_init.minute in [0,30]: # longer forecast every full hour if this_forecast_init.minute in [0,]: # longer forecast every full hour
timedelta_integrate = dt.timedelta(hours=2) timedelta_integrate = dt.timedelta(hours=3)
this_forecast_end = assim_time + timedelta_integrate this_forecast_end = assim_time + timedelta_integrate
id = run_ENS(begin=this_forecast_init, id = run_ENS(begin=this_forecast_init,
end=this_forecast_end, end=this_forecast_end,
input_is_restart=True,
restart_path=False, #cluster.archivedir+prior_init_time.strftime('/%Y-%m-%d_%H:%M/'),
output_restart_interval=30, #timedelta_btw_assim.total_seconds()/60,
depends_on=id) depends_on=id)
create_satimages(this_forecast_init, depends_on=id) create_satimages(this_forecast_init, depends_on=id)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment