Skip to content
Snippets Groups Projects
Commit 64235dd4 authored by Lukas Kugler's avatar Lukas Kugler
Browse files

fwd_op_1min capability

parent 81ef508e
No related branches found
No related tags found
No related merge requests found
...@@ -66,7 +66,7 @@ do ...@@ -66,7 +66,7 @@ do
mv $rundir/rsl.out.0000 $rundir/rsl.out.input mv $rundir/rsl.out.0000 $rundir/rsl.out.input
done done
""" """
s = my_Slurm("prep_wrfinput", cfg_update={"ntasks": str(exp.n_ens), s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens),
"time": "10", "mem-per-cpu": "2G"}) "time": "10", "mem-per-cpu": "2G"})
id = s.run(cmd, depends_on=[id]) id = s.run(cmd, depends_on=[id])
return id return id
...@@ -88,21 +88,29 @@ def run_ENS(begin, end, depends_on=None): ...@@ -88,21 +88,29 @@ def run_ENS(begin, end, depends_on=None):
"""Run forecast for 1 minute, save output. """Run forecast for 1 minute, save output.
Then run whole timespan with 5 minutes interval. Then run whole timespan with 5 minutes interval.
""" """
prev_id = depends_on id = depends_on
# first minute forecast (needed for validating an assimilation) # first minute forecast (needed for validating an assimilation)
hist_interval = 1 hist_interval = 1
begin_plus1 = begin+dt.timedelta(minutes=1)
s = my_Slurm("preWRF1", cfg_update=dict(time="2")) s = my_Slurm("preWRF1", cfg_update=dict(time="2"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_namelist.py ' id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_namelist.py '
+ begin.strftime('%Y-%m-%d_%H:%M')+' ' + begin.strftime('%Y-%m-%d_%H:%M')+' '
+ (begin+dt.timedelta(minutes=1)).strftime('%Y-%m-%d_%H:%M')+' ' + begin_plus1.strftime('%Y-%m-%d_%H:%M')+' '
+ str(hist_interval), + str(hist_interval),
depends_on=[prev_id]) depends_on=[id])
s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
"time": "5", "mem-per-cpu": "2G"}) "time": "2", "mem-per-cpu": "2G"})
cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname) cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
id2 = s.run(cmd, depends_on=[id]) id = s.run(cmd, depends_on=[id])
# apply forward operator (DART filter without assimilation)
s = my_Slurm("fwOP-1m", cfg_update=dict(time="2"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/apply_obs_op_dart.py '
+ begin.strftime('%Y-%m-%d_%H:%M')+' '
+ begin_plus1.strftime('%Y-%m-%d_%H:%M'),
depends_on=[id])
# whole forecast timespan # whole forecast timespan
hist_interval = 5 hist_interval = 5
...@@ -111,20 +119,20 @@ def run_ENS(begin, end, depends_on=None): ...@@ -111,20 +119,20 @@ def run_ENS(begin, end, depends_on=None):
+ begin.strftime('%Y-%m-%d_%H:%M')+' ' + begin.strftime('%Y-%m-%d_%H:%M')+' '
+ end.strftime('%Y-%m-%d_%H:%M')+' ' + end.strftime('%Y-%m-%d_%H:%M')+' '
+ str(hist_interval), + str(hist_interval),
depends_on=[prev_id]) depends_on=[id])
time_in_simulation_hours = (end-begin).total_seconds()/3600 time_in_simulation_hours = (end-begin).total_seconds()/3600
runtime_wallclock_mins_expected = int(5+time_in_simulation_hours*9) # usually below 8 min/hour runtime_wallclock_mins_expected = int(4+time_in_simulation_hours*9) # usually below 8 min/hour
s = my_Slurm("runWRF2", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), s = my_Slurm("runWRF2", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
"time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"}) "time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"})
cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname) cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
id2 = s.run(cmd, depends_on=[id]) id = s.run(cmd, depends_on=[id])
# not needed, since wrf.exe writes directly to archive folder # not needed, since wrf.exe writes directly to archive folder
#s = my_Slurm("archiveWRF", cfg_update=dict(nodes="1", ntasks="1", time="10")) #s = my_Slurm("archiveWRF", cfg_update=dict(nodes="1", ntasks="1", time="10"))
#id3 = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_wrf.py ' #id3 = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_wrf.py '
# + begin.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2]) # + begin.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2])
return id2 return id
def assimilate(assim_time, background_init_time, def assimilate(assim_time, background_init_time,
prior_from_different_exp=False, depends_on=None): prior_from_different_exp=False, depends_on=None):
...@@ -199,7 +207,6 @@ timedelta_integrate = dt.timedelta(minutes=45) ...@@ -199,7 +207,6 @@ timedelta_integrate = dt.timedelta(minutes=45)
timedelta_btw_assim = dt.timedelta(minutes=30) timedelta_btw_assim = dt.timedelta(minutes=30)
clear_logs(backup_existing_to_archive=False) clear_logs(backup_existing_to_archive=False)
id = None id = None
start_from_existing_state = True start_from_existing_state = True
...@@ -221,14 +228,13 @@ elif start_from_existing_state: ...@@ -221,14 +228,13 @@ elif start_from_existing_state:
id = prepare_wrfinput() # create initial conditions id = prepare_wrfinput() # create initial conditions
# get initial conditions from archive # get initial conditions from archive
background_init_time = dt.datetime(2008, 7, 30, 6) background_init_time = dt.datetime(2008, 7, 30, 11, 30)
time = dt.datetime(2008, 7, 30, 10) time = dt.datetime(2008, 7, 30, 12)
exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.11_LMU_filter' exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.12_LMU_so_VIS2'
first_guess = exppath_arch first_guess = exppath_arch
id = update_wrfinput_from_archive(time, background_init_time, exppath_arch, id = update_wrfinput_from_archive(time, background_init_time, exppath_arch, depends_on=id)
depends_on=id)
while time <= dt.datetime(2008, 7, 30, 16): while time <= dt.datetime(2008, 7, 30, 15):
assim_time = time assim_time = time
id = assimilate(assim_time, id = assimilate(assim_time,
background_init_time, background_init_time,
...@@ -236,10 +242,12 @@ while time <= dt.datetime(2008, 7, 30, 16): ...@@ -236,10 +242,12 @@ while time <= dt.datetime(2008, 7, 30, 16):
depends_on=id) depends_on=id)
first_guess = False first_guess = False
background_init_time = assim_time # start integration from here prev_forecast_init = assim_time - timedelta_btw_assim
integration_end_time = assim_time + timedelta_integrate this_forecast_init = assim_time # start integration from here
id = run_ENS(begin=background_init_time, this_forecast_end = assim_time + timedelta_btw_assim
end=integration_end_time,
id = run_ENS(begin=this_forecast_init,
end=this_forecast_end,
depends_on=id) depends_on=id)
time += timedelta_btw_assim time += timedelta_btw_assim
......
import os, sys, shutil
import datetime as dt
import numpy as np
from scipy.interpolate import interp1d
from config.cfg import exp, cluster
from utils import symlink, copy, mkdir, sed_inplace, append_file
import create_obsseq as osq
from gen_synth_obs import read_prior_obs, set_input_nml
import pre_assim
def run_operator(time):
"""
time_for_dart (dt.datetime) : needs to be consistent with wrfout files!
"""
# assume only 1 obstype for now
obscfg = exp.observations[0]
# get observation file (obs not important, but their locations)
# this should correspond to configuration to have same locations as in real assim
obs_seq_all_out = cluster.dartrundir + '/obs_seq_all.out'
os.chdir(cluster.dartrundir)
n_obs = obscfg['n_obs']
error_var = (obscfg['err_std'])**2
sat_channel = obscfg.get('channel', False)
cov_loc = obscfg['cov_loc_radius_km']
dist_obs = obscfg.get('distance_between_obs_km', False)
obs_coords = osq.calc_obs_locations(n_obs, coords_from_domaincenter=False,
distance_between_obs_km=dist_obs,
fpath_obs_locations=None)
osq.sat(time, sat_channel, obs_coords, error_var,
output_path=cluster.dartrundir)
assert os.path.exists(cluster.dartrundir + '/obs_seq.in')
# prepare dummy nature
os.system('cp ./advance_temp1/wrfout_d01 ./wrfout_d01')
import wrfout_add_geo
wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc',
cluster.dartrundir+'/wrfout_d01')
print('running perfect model obs', flush=True)
os.system('mpirun -np 12 ./perfect_model_obs')
# set namelist for filter (calc only forward op)
set_input_nml(sat_channel=obscfg.get('channel', False),
just_prior_values=True)
# run filter
os.system('mv obs_seq.out obs_seq_all.out')
assert os.path.exists(obs_seq_all_out)
print('running filter', flush=True)
os.system('mpirun -np 40 ./filter')
# copy output to archive
savedir = cluster.archivedir()+'/obs_seq_final_1min/'
mkdir(savedir)
copy(cluster.dartrundir+'/obs_seq.final',
savedir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final'))
if __name__ == '__main__':
prev_forecast_init = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
time_to_run_fw_op = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M')
exppath_firstguess = cluster.archivedir()
print(prev_forecast_init, time_to_run_fw_op)
# link ensemble states to run_DART directory
pre_assim.run(time_to_run_fw_op, prev_forecast_init, exppath_firstguess)
# run fwd operator, save to archive
run_operator(time_to_run_fw_op)
...@@ -317,20 +317,21 @@ def generic_obs(obs_type, time_dt, coords, error_var, output_path='./'): ...@@ -317,20 +317,21 @@ def generic_obs(obs_type, time_dt, coords, error_var, output_path='./'):
if __name__ == '__main__': if __name__ == '__main__':
time_dt = dt.datetime(2008, 7, 30, 15, 30) time_dt = dt.datetime(2008, 7, 30, 16, 31)
n_obs = 100 n_obs = 100
channel_id = 1 sat_channel = 1
distance_between_obs_meters = 10000 distance_between_obs_meters = 10000
error_var = 0.001
# error_var = 0.001 obs_coords = calc_obs_locations(n_obs, coords_from_domaincenter=False,
# sat(time_dt, channel_id, n_obs, error_var, distance_between_obs_meters, distance_between_obs_km=distance_between_obs_meters,
fpath_obs_locations=None)
sat(time_dt, sat_channel, obs_coords, error_var, output_path='./')
# error_var = (5.)**2
# generic_obs('RADAR', time_dt, n_obs, error_var, distance_between_obs_meters,
# output_path='./', fpath_obs_locations='./domain.pkl') # output_path='./', fpath_obs_locations='./domain.pkl')
error_var = (5.)**2
generic_obs('RADAR', time_dt, n_obs, error_var, distance_between_obs_meters,
output_path='./', fpath_obs_locations='./domain.pkl')
# error_var = (0.5)**2 # error_var = (0.5)**2
# generic_obs('RASO_T', time_dt, n_obs, error_var, distance_between_obs_meters, # generic_obs('RASO_T', time_dt, n_obs, error_var, distance_between_obs_meters,
# output # output
...@@ -3,10 +3,7 @@ import datetime as dt ...@@ -3,10 +3,7 @@ import datetime as dt
from config.cfg import exp, cluster from config.cfg import exp, cluster
from utils import symlink, copy_scp_srvx8, copy, sed_inplace from utils import symlink, copy_scp_srvx8, copy, sed_inplace
assim_time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') def run(assim_time, background_init_time, exppath_firstguess):
background_init_time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M')
exppath_firstguess = str(sys.argv[3])
#if cluster.name != 'srvx8': #if cluster.name != 'srvx8':
# copy = copy_scp_srvx8 # use scp # copy = copy_scp_srvx8 # use scp
...@@ -48,4 +45,12 @@ os.system('rm -rf '+cluster.dartrundir+'/filter_restart*') ...@@ -48,4 +45,12 @@ os.system('rm -rf '+cluster.dartrundir+'/filter_restart*')
os.system('rm -rf '+cluster.dartrundir+'/output_mean*') os.system('rm -rf '+cluster.dartrundir+'/output_mean*')
os.system('rm -rf '+cluster.dartrundir+'/output_sd*') os.system('rm -rf '+cluster.dartrundir+'/output_sd*')
os.system('rm -rf '+cluster.dartrundir+'/perfect_output_*') os.system('rm -rf '+cluster.dartrundir+'/perfect_output_*')
os.system('rm -rf '+cluster.dartrundir+'/obs_seq.fina*')
if __name__ == '__main__':
assim_time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
background_init_time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M')
exppath_firstguess = str(sys.argv[3])
run(assim_time, background_init_time, exppath_firstguess)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment