Skip to content
Snippets Groups Projects
Commit 500cc4cf authored by lkugler's avatar lkugler
Browse files

update

parent defcdce4
No related branches found
No related tags found
No related merge requests found
Showing
with 955 additions and 439 deletions
import numpy as np import numpy as np
from . import clusters from config import clusters # from . = problem in archivedir
cluster = clusters.vsc # change cluster configuration here cluster = clusters.vsc # change cluster configuration here
...@@ -9,12 +9,12 @@ class ExperimentConfiguration(object): ...@@ -9,12 +9,12 @@ class ExperimentConfiguration(object):
exp = ExperimentConfiguration() exp = ExperimentConfiguration()
exp.expname = "exp_v1.16_P0-4_Radar5+I-test" exp.expname = "exp_v1.16_Pwbub-1_40mem"
exp.model_dx = 2000 exp.model_dx = 2000
exp.n_ens = 40 exp.n_ens = 40
exp.n_nodes = 10 exp.n_nodes = 10
n_obs = 4 #1600 # 50km res: 64:8x8; 10km res: 1600:40x40 # radar: n_obs for each observation height level n_obs = 121 #961 900: 10km resoltn # radar: n_obs for each observation height level
vis = dict(plotname='VIS 0.6µm', plotunits='[1]', vis = dict(plotname='VIS 0.6µm', plotunits='[1]',
kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs, kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs,
...@@ -48,7 +48,9 @@ psfc = dict(plotname='SYNOP Pressure', plotunits='[dBz]', ...@@ -48,7 +48,9 @@ psfc = dict(plotname='SYNOP Pressure', plotunits='[dBz]',
cov_loc_radius_km=32, cov_loc_vert_km=5) cov_loc_radius_km=32, cov_loc_vert_km=5)
exp.observations = [vis, wv73, ] # 108, wv73, vis] exp.observations = [] #wv73, vis] # 108, wv73, vis]
exp.update_vars = ['T', 'QVAPOR', 'QCLOUD', 'QICE','CLDFRA']
#exp.update_vars = ['U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'TSK', 'CLDFRA']
# directory paths depend on the name of the experiment # directory paths depend on the name of the experiment
cluster.expname = exp.expname cluster.expname = exp.expname
...@@ -6,11 +6,18 @@ class ClusterConfig(object): ...@@ -6,11 +6,18 @@ class ClusterConfig(object):
def __init__(self): def __init__(self):
pass pass
@property
def archivedir(self): def archivedir(self):
return self.archive_base+'/'+self.expname return self.archive_base+'/'+self.expname
def wrf_rundir(self, iens): def wrf_rundir(self, iens):
return self.userdir+'/run_WRF/'+self.expname+'/'+str(iens) return '/gpfs/data/fs71386/lkugler/run_WRF/'+self.expname+'/'+str(iens)
@property
def scripts_rundir(self):
return self.archivedir+'/DART-WRF/'
####################################################################################### #######################################################################################
...@@ -19,21 +26,22 @@ vsc = ClusterConfig() ...@@ -19,21 +26,22 @@ vsc = ClusterConfig()
vsc.name = 'vsc' vsc.name = 'vsc'
vsc.python = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/python' vsc.python = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/python'
vsc.ncks = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/ncks' vsc.ncks = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/ncks'
vsc.tmpfiledir = '/gpfs/data/fs71386/lkugler'
vsc.userdir = '/home/fs71386/lkugler' vsc.userdir = '/home/fs71386/lkugler'
vsc.srcdir = '/home/fs71386/lkugler/compile/WRF/WRF-4.2.2/run' vsc.srcdir = '/gpfs/data/fs71386/lkugler/compile/WRF/WRF-4.2.2/run'
vsc.archive_base = '/gpfs/data/fs71386/lkugler/sim_archive/' vsc.archive_base = '/gpfs/data/fs71386/lkugler/sim_archive/'
vsc.dart_srcdir = '/home/fs71386/lkugler/DART/DART-9.9.0/models/wrf/work' vsc.dart_srcdir = '/home/fs71386/lkugler/DART/DART-9.9.0/models/wrf/work'
vsc.dartrundir = '/home/fs71386/lkugler/run_DART' vsc.dartrundir = '/gpfs/data/fs71386/lkugler/run_DART'
vsc.scriptsdir = '/home/fs71386/lkugler/DART-WRF/scripts' vsc.scriptsdir = '/home/fs71386/lkugler/DART-WRF/scripts/'
vsc.nature_wrfout = '/home/fs71386/lkugler/data/sim_archive/exp_v1.16_P1_nature/2008-07-30_06:00/1/wrfout_d01_%Y-%m-%d_%H:%M:%S' vsc.nature_wrfout = '/home/fs71386/lkugler/data/sim_archive/exp_v1.16_P1_nature_tstep/2008-07-30_06:00/1/wrfout_d01_%Y-%m-%d_%H:%M:%S'
#vsc.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/from_LMU/raso.nat.<iens>.wrfprof' #vsc.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/from_LMU/raso.nat.<iens>.wrfprof'
vsc.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/2021-05-04/raso.fc.<iens>.wrfprof' vsc.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/2021-05-04/raso.nat.1.wrfprof'
vsc.ideal = vsc.userdir+'/compile/bin/ideal-v4.2.2_v1.16.exe' vsc.ideal = vsc.userdir+'/compile/bin/ideal-v4.2.2_v1.16.exe'
vsc.wrfexe = vsc.userdir+'/compile/bin/wrf-v4.2.2_v1.16.exe' vsc.wrfexe = vsc.userdir+'/compile/bin/wrf-v4.3_v1.16.exe'
vsc.namelist = vsc.scriptsdir+'/../templates/namelist.input' vsc.namelist = vsc.scriptsdir+'/../templates/namelist.input'
vsc.run_WRF = '/gpfs/data/fs71386/lkugler/DART-WRF/scripts/osse/run_ens.vsc.sh' vsc.run_WRF = '/home/fs71386/lkugler/DART-WRF/scripts/run_ens.vsc.sh'
vsc.slurm_cfg = {"account": "p71386", "partition": "mem_0384", "qos": "p71386_0384", vsc.slurm_cfg = {"account": "p71386", "partition": "mem_0384", "qos": "p71386_0384",
"ntasks": "1", "nodes": "1", "ntasks-per-node": "48", "ntasks-per-core": "1", "ntasks": "1", "nodes": "1", "ntasks-per-node": "48", "ntasks-per-core": "1",
...@@ -46,9 +54,9 @@ jet.python = '/jetfs/home/lkugler/miniconda3/bin/python' ...@@ -46,9 +54,9 @@ jet.python = '/jetfs/home/lkugler/miniconda3/bin/python'
jet.ncks = 'ncks' jet.ncks = 'ncks'
jet.userdir = '/jetfs/home/lkugler' jet.userdir = '/jetfs/home/lkugler'
jet.srcdir = '/jetfs/home/lkugler/compile/WRF/WRF-4.1.5/run' jet.srcdir = '/jetfs/home/lkugler/compile/WRF/WRF-4.1.5/run'
jet.scriptsdir = ''
jet.archive_base = '/jetfs/home/lkugler/data_jetfs/sim_archive/' jet.archive_base = '/jetfs/home/lkugler/data_jetfs/sim_archive/'
jet.dartrundir = '/jetfs/home/lkugler/DART-WRF/rundir' jet.dartrundir = '/jetfs/home/lkugler/DART-WRF/rundir'
jet.scriptsdir = '/jetfs/home/lkugler/DART-WRF/scripts/osse'
jet.nature_wrfout = '/raid61/scratch/lkugler/VSC/sim_archive/OSSE_v1.10_LMU+shear/2/single/wrfout_d01_%Y-%m-%d_%H:%M:%S' jet.nature_wrfout = '/raid61/scratch/lkugler/VSC/sim_archive/OSSE_v1.10_LMU+shear/2/single/wrfout_d01_%Y-%m-%d_%H:%M:%S'
jet.ideal = jet.userdir+'/compile/bin/ideal.exe' jet.ideal = jet.userdir+'/compile/bin/ideal.exe'
......
...@@ -7,195 +7,21 @@ import os, sys, shutil ...@@ -7,195 +7,21 @@ import os, sys, shutil
import datetime as dt import datetime as dt
from slurmpy import Slurm from slurmpy import Slurm
# necessary to find modules in folder, since SLURM runs the script elsewhere
sys.path.append(os.getcwd())
from config.cfg import exp, cluster from config.cfg import exp, cluster
from scripts.utils import script_to_str, symlink from scripts.utils import script_to_str, symlink
# necessary to find modules in folder, since SLURM runs the script elsewhere
sys.path.append(os.getcwd())
# allow scripts to access the configuration # allow scripts to access the configuration
symlink(cluster.scriptsdir+'/../config', cluster.scriptsdir+'/config') # symlink(cluster.scriptsdir+'/../config', cluster.scriptsdir+'/config')
log_dir = cluster.archivedir()+'/logs/' log_dir = cluster.archivedir+'/logs/'
slurm_scripts_dir = cluster.archivedir()+'/slurm-scripts/' slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/'
print('logging to', log_dir) print('logging to', log_dir)
print('scripts, which are submitted to SLURM:', slurm_scripts_dir) print('scripts, which are submitted to SLURM:', slurm_scripts_dir)
def my_Slurm(*args, cfg_update=dict(), **kwargs): from scheduler import *
"""Shortcut to slurmpy's class; keep certain default kwargs
and only update some with kwarg `cfg_update`
see https://github.com/brentp/slurmpy
"""
return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update),
log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs)
class Cmdline(object):
def __init__(self, name, cfg_update):
self.name = name
def run(self, cmd, **kwargs):
print('running', self.name, 'without SLURM')
os.system(cmd)
from scheduler import backup_scripts
def prepare_wrfinput():
"""Create WRF/run directories and wrfinput files
"""
s = my_Slurm("prep_wrfinput", cfg_update={"time": "5", "mail-type": "BEGIN"})
id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py')
cmd = """# run ideal.exe in parallel, then add geodata
export SLURM_STEP_GRES=none
for ((n=1; n<="""+str(exp.n_ens)+"""; n++))
do
rundir="""+cluster.userdir+'/run_WRF/'+exp.expname+"""/$n
echo $rundir
cd $rundir
mpirun -np 1 ./ideal.exe &
done
wait
for ((n=1; n<="""+str(exp.n_ens)+"""; n++))
do
rundir="""+cluster.userdir+'/run_WRF/'+exp.expname+"""/$n
mv $rundir/rsl.out.0000 $rundir/rsl.out.input
done
"""
s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens), "nodes": "1",
"time": "10", "mem-per-cpu": "2G"})
id = s.run(cmd, depends_on=[id])
return id
def update_wrfinput_from_archive(valid_time, background_init_time, exppath, depends_on=None):
"""Given that directories with wrfinput files exist,
update these wrfinput files according to wrfout files
"""
s = my_Slurm("upd_wrfinput", cfg_update={"time": "5"})
# path of initial conditions, <iens> is replaced by member index
IC_path = exppath + background_init_time.strftime('/%Y-%m-%d_%H:%M/') \
+'*iens*/'+valid_time.strftime('/wrfout_d01_%Y-%m-%d_%H:%M:%S')
id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_wrfout.py '
+IC_path, depends_on=[depends_on])
return id
def run_ENS(begin, end, depends_on=None):
"""Run forecast for 1 minute, save output.
Then run whole timespan with 5 minutes interval.
"""
id = depends_on
# first minute forecast (needed for validating an assimilation)
hist_interval = 1
radt = 1 # calc CFRAC also in first minute
begin_plus1 = begin+dt.timedelta(minutes=1)
s = my_Slurm("preWRF1", cfg_update=dict(time="2"))
id = s.run(' '.join([cluster.python,
cluster.scriptsdir+'/prepare_namelist.py',
begin.strftime('%Y-%m-%d_%H:%M'),
begin_plus1.strftime('%Y-%m-%d_%H:%M'),
str(hist_interval), str(radt),]),
depends_on=[id])
s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
"time": "2", "mem-per-cpu": "2G"})
cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
id = s.run(cmd, depends_on=[id])
# apply forward operator (DART filter without assimilation)
s = my_Slurm("fwOP-1m", cfg_update=dict(time="10", ntasks=48))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/apply_obs_op_dart.py '
+ begin.strftime('%Y-%m-%d_%H:%M')+' '
+ begin_plus1.strftime('%Y-%m-%d_%H:%M'),
depends_on=[id])
# whole forecast timespan
hist_interval = 5
radt = 5
s = my_Slurm("preWRF2", cfg_update=dict(time="2"))
id = s.run(' '.join([cluster.python,
cluster.scriptsdir+'/prepare_namelist.py',
begin.strftime('%Y-%m-%d_%H:%M'),
end.strftime('%Y-%m-%d_%H:%M'),
str(hist_interval), str(radt),]),
depends_on=[id])
time_in_simulation_hours = (end-begin).total_seconds()/3600
runtime_wallclock_mins_expected = int(4+time_in_simulation_hours*9) # usually below 8 min/hour
s = my_Slurm("runWRF2", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
"time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"})
cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
id = s.run(cmd, depends_on=[id])
# not needed, since wrf.exe writes directly to archive folder
#s = my_Slurm("archiveWRF", cfg_update=dict(nodes="1", ntasks="1", time="10"))
#id3 = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_wrf.py '
# + begin.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2])
return id
def assimilate(assim_time, prior_init_time,
prior_path_exp=False, depends_on=None):
"""Creates observations from a nature run and assimilates them.
Args:
assim_time (dt.datetime): timestamp of prior wrfout files
prior_init_time (dt.datetime):
timestamp to find the directory where the prior wrfout files are
prior_path_exp (bool or str):
put a `str` to take the prior from a different experiment
if False: use `archivedir` (defined in config) to get prior state
if str: use this directory to get prior state
"""
if not prior_path_exp:
prior_path_exp = cluster.archivedir()
elif not isinstance(prior_path_exp, str):
raise TypeError('prior_path_exp either str or False, is '+str(type(prior_path_exp)))
# prepare state of nature run, from which observation is sampled
#s = my_Slurm("prepNature", cfg_update=dict(time="2"))
#id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_nature.py '
# +time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on])
# prepare prior model state
s = my_Slurm("preAssim", cfg_update=dict(time="2"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/pre_assim.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ')
+prior_init_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp, depends_on=[depends_on])
# prepare nature run, generate observations
s = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "30",
"mem": "300G", "ntasks-per-node": "96", "ntasks-per-core": "2"})
id = s.run(cluster.python+' '+cluster.scriptsdir+'/assim_synth_obs.py '
+time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id])
# # actuall assimilation step
# s = my_Slurm("Assim", cfg_update=dict(nodes="1", ntasks="48", time="50", mem="200G"))
# cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./filter; rm obs_seq_all.out'
# id = s.run(cmd, depends_on=[id])
# s = my_Slurm("archiveAssim", cfg_update=dict(time="10"))
# id = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py '
# + assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id])
s = my_Slurm("updateIC", cfg_update=dict(time="8"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_filteroutput.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ')
+prior_init_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp, depends_on=[id])
return id
def create_satimages(depends_on=None):
s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "40"})
s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/loop.py '+exp.expname,
depends_on=[depends_on])
def mailme(depends_on=None):
if depends_on:
s = my_Slurm("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"})
s.run('sleep 1', depends_on=[depends_on])
################################ ################################
print('starting osse') print('starting osse')
...@@ -203,10 +29,19 @@ print('starting osse') ...@@ -203,10 +29,19 @@ print('starting osse')
backup_scripts() backup_scripts()
id = None id = None
id = prepare_wrfinput() # create initial conditions init_time = dt.datetime(2008, 7, 30, 9)
id = prepare_wrfinput(init_time) # create initial conditions
# get initial conditions from archive
integration_end_time = dt.datetime(2008, 7, 30, 12)
#exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_P1_40mem'
#id = update_wrfinput_from_archive(integration_end_time, init_time, exppath_arch, depends_on=id)
id = wrfinput_insert_wbubble(depends_on=id)
begin = dt.datetime(2008, 7, 30, 6, 0) begin = dt.datetime(2008, 7, 30, 9, 0)
end = dt.datetime(2008, 7, 30, 21, 0) end = dt.datetime(2008, 7, 30, 12, 0)
# whole forecast timespan # whole forecast timespan
hist_interval = 5 hist_interval = 5
......
...@@ -15,10 +15,10 @@ from scripts.utils import script_to_str, symlink ...@@ -15,10 +15,10 @@ from scripts.utils import script_to_str, symlink
sys.path.append(os.getcwd()) sys.path.append(os.getcwd())
# allow scripts to access the configuration # allow scripts to access the configuration
symlink(cluster.scriptsdir+'/../config', cluster.scriptsdir+'/config') # symlink(cluster.scriptsdir+'/../config', cluster.scriptsdir+'/config')
log_dir = cluster.archivedir()+'/logs/' log_dir = cluster.archivedir+'/logs/'
slurm_scripts_dir = cluster.archivedir()+'/slurm-scripts/' slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/'
print('logging to', log_dir) print('logging to', log_dir)
print('scripts, which are submitted to SLURM:', slurm_scripts_dir) print('scripts, which are submitted to SLURM:', slurm_scripts_dir)
...@@ -30,52 +30,36 @@ def my_Slurm(*args, cfg_update=dict(), **kwargs): ...@@ -30,52 +30,36 @@ def my_Slurm(*args, cfg_update=dict(), **kwargs):
return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update), return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update),
log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs) log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs)
class Cmdline(object):
def __init__(self, name, cfg_update):
self.name = name
def run(self, cmd, **kwargs):
print('running', self.name, 'without SLURM')
os.system(cmd)
def backup_scripts(): def backup_scripts():
current = cluster.scriptsdir current = cluster.scriptsdir
main_a = cluster.archivedir()+'/DART-WRF/' main_a = cluster.scripts_rundir
old_a = main_a+'/old/' # old_a = main_a+'/old/'
os.makedirs(cluster.archivedir, exist_ok=True)
os.makedirs(cluster.archivedir(), exist_ok=True)
os.makedirs(main_a, exist_ok=True) # def func(a, b, method): # call method if not link or directory
os.makedirs(old_a, exist_ok=True) # if os.path.islink(a) or os.path.isdir(a):
# pass
def func(a, b, method): # call method if not link or directory # else:
if os.path.islink(a) or os.path.isdir(a): # method(a, b)
try:
shutil.copytree(cluster.scriptsdir, cluster.scripts_rundir)
except FileExistsError:
pass pass
else: except:
method(a, b) raise
# archive existing files
for f in os.listdir(main_a):
func(os.path.join(main_a, f), old_a+'/'+f, shutil.move)
# reproducibility
for f in ['scheduler.py', 'config/clusters.py', 'config/cfg.py']:
fname = os.path.basename(f)
func(current+'/../'+f, main_a+'/'+fname, shutil.copy)
for f in os.listdir(current): def prepare_wrfinput(init_time):
func(os.path.join(current, f), main_a+'/', shutil.copy)
def prepare_wrfinput():
"""Create WRF/run directories and wrfinput files """Create WRF/run directories and wrfinput files
""" """
s = my_Slurm("prep_wrfinput", cfg_update={"time": "10", "mail-type": "BEGIN"}) s = my_Slurm("prep_wrfinput", cfg_update={"time": "10", "mail-type": "BEGIN"})
id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py') id = s.run(cluster.python+' '+cluster.scripts_rundir+'/prepare_wrfinput.py '
+init_time.strftime('%Y-%m-%d_%H:%M'))
cmd = """# run ideal.exe in parallel, then add geodata cmd = """# run ideal.exe in parallel, then add geodata
export SLURM_STEP_GRES=none export SLURM_STEP_GRES=none
for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) for ((n=1; n<="""+str(exp.n_ens)+"""; n++))
do do
rundir="""+cluster.userdir+'/run_WRF/'+exp.expname+"""/$n rundir="""+cluster.tmpfiledir+'/run_WRF/'+exp.expname+"""/$n
echo $rundir echo $rundir
cd $rundir cd $rundir
mpirun -np 1 ./ideal.exe & mpirun -np 1 ./ideal.exe &
...@@ -83,7 +67,7 @@ done ...@@ -83,7 +67,7 @@ done
wait wait
for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) for ((n=1; n<="""+str(exp.n_ens)+"""; n++))
do do
rundir="""+cluster.userdir+'/run_WRF/'+exp.expname+"""/$n rundir="""+cluster.tmpfiledir+'/run_WRF/'+exp.expname+"""/$n
mv $rundir/rsl.out.0000 $rundir/rsl.out.input mv $rundir/rsl.out.0000 $rundir/rsl.out.input
done done
""" """
...@@ -101,10 +85,19 @@ def update_wrfinput_from_archive(valid_time, background_init_time, exppath, depe ...@@ -101,10 +85,19 @@ def update_wrfinput_from_archive(valid_time, background_init_time, exppath, depe
# path of initial conditions, <iens> is replaced by member index # path of initial conditions, <iens> is replaced by member index
IC_path = exppath + background_init_time.strftime('/%Y-%m-%d_%H:%M/') \ IC_path = exppath + background_init_time.strftime('/%Y-%m-%d_%H:%M/') \
+'*iens*/'+valid_time.strftime('/wrfout_d01_%Y-%m-%d_%H:%M:%S') +'*iens*/'+valid_time.strftime('/wrfout_d01_%Y-%m-%d_%H:%M:%S')
id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_wrfout.py ' id = s.run(cluster.python+' '+cluster.scripts_rundir+'/update_wrfinput_from_wrfout.py '
+IC_path, depends_on=[depends_on]) +IC_path, depends_on=[depends_on])
return id return id
def wrfinput_insert_wbubble(depends_on=None):
"""Given that directories with wrfinput files exist,
update these wrfinput files with warm bubbles
"""
s = my_Slurm("ins_wbubble", cfg_update={"time": "5"})
id = s.run(cluster.python+' '+cluster.scripts_rundir+'/create_wbubble_wrfinput.py',
depends_on=[depends_on])
return id
def run_ENS(begin, end, depends_on=None, first_minute=True): def run_ENS(begin, end, depends_on=None, first_minute=True):
"""Run forecast for 1 minute, save output. """Run forecast for 1 minute, save output.
Then run whole timespan with 5 minutes interval. Then run whole timespan with 5 minutes interval.
...@@ -118,7 +111,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True): ...@@ -118,7 +111,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True):
begin_plus1 = begin+dt.timedelta(minutes=1) begin_plus1 = begin+dt.timedelta(minutes=1)
s = my_Slurm("preWRF1", cfg_update=dict(time="2")) s = my_Slurm("preWRF1", cfg_update=dict(time="2"))
id = s.run(' '.join([cluster.python, id = s.run(' '.join([cluster.python,
cluster.scriptsdir+'/prepare_namelist.py', cluster.scripts_rundir+'/prepare_namelist.py',
begin.strftime('%Y-%m-%d_%H:%M'), begin.strftime('%Y-%m-%d_%H:%M'),
begin_plus1.strftime('%Y-%m-%d_%H:%M'), begin_plus1.strftime('%Y-%m-%d_%H:%M'),
str(hist_interval), str(radt),]), str(hist_interval), str(radt),]),
...@@ -131,7 +124,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True): ...@@ -131,7 +124,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True):
# apply forward operator (DART filter without assimilation) # apply forward operator (DART filter without assimilation)
s = my_Slurm("fwOP-1m", cfg_update=dict(time="10", ntasks=48)) s = my_Slurm("fwOP-1m", cfg_update=dict(time="10", ntasks=48))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/apply_obs_op_dart.py ' id = s.run(cluster.python+' '+cluster.scripts_rundir+'/apply_obs_op_dart.py '
+ begin.strftime('%Y-%m-%d_%H:%M')+' ' + begin.strftime('%Y-%m-%d_%H:%M')+' '
+ begin_plus1.strftime('%Y-%m-%d_%H:%M'), + begin_plus1.strftime('%Y-%m-%d_%H:%M'),
depends_on=[id]) depends_on=[id])
...@@ -141,7 +134,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True): ...@@ -141,7 +134,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True):
radt = 5 radt = 5
s = my_Slurm("preWRF2", cfg_update=dict(time="2")) s = my_Slurm("preWRF2", cfg_update=dict(time="2"))
id = s.run(' '.join([cluster.python, id = s.run(' '.join([cluster.python,
cluster.scriptsdir+'/prepare_namelist.py', cluster.scripts_rundir+'/prepare_namelist.py',
begin.strftime('%Y-%m-%d_%H:%M'), begin.strftime('%Y-%m-%d_%H:%M'),
end.strftime('%Y-%m-%d_%H:%M'), end.strftime('%Y-%m-%d_%H:%M'),
str(hist_interval), str(radt),]), str(hist_interval), str(radt),]),
...@@ -156,7 +149,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True): ...@@ -156,7 +149,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True):
# not needed, since wrf.exe writes directly to archive folder # not needed, since wrf.exe writes directly to archive folder
#s = my_Slurm("archiveWRF", cfg_update=dict(nodes="1", ntasks="1", time="10")) #s = my_Slurm("archiveWRF", cfg_update=dict(nodes="1", ntasks="1", time="10"))
#id3 = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_wrf.py ' #id3 = s.run(cluster.python+' '+cluster.scripts_rundir+'/archive_wrf.py '
# + begin.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2]) # + begin.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2])
return id return id
...@@ -174,26 +167,26 @@ def assimilate(assim_time, prior_init_time, ...@@ -174,26 +167,26 @@ def assimilate(assim_time, prior_init_time,
if str: use this directory to get prior state if str: use this directory to get prior state
""" """
if not prior_path_exp: if not prior_path_exp:
prior_path_exp = cluster.archivedir() prior_path_exp = cluster.archivedir
elif not isinstance(prior_path_exp, str): elif not isinstance(prior_path_exp, str):
raise TypeError('prior_path_exp either str or False, is '+str(type(prior_path_exp))) raise TypeError('prior_path_exp either str or False, is '+str(type(prior_path_exp)))
# prepare state of nature run, from which observation is sampled # prepare state of nature run, from which observation is sampled
#s = my_Slurm("prepNature", cfg_update=dict(time="2")) #s = my_Slurm("prepNature", cfg_update=dict(time="2"))
#id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_nature.py ' #id = s.run(cluster.python+' '+cluster.scripts_rundir+'/prepare_nature.py '
# +time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on]) # +time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on])
# prepare prior model state # prepare prior model state
s = my_Slurm("preAssim", cfg_update=dict(time="2")) s = my_Slurm("preAssim", cfg_update=dict(time="2"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/pre_assim.py ' id = s.run(cluster.python+' '+cluster.scripts_rundir+'/pre_assim.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ') +assim_time.strftime('%Y-%m-%d_%H:%M ')
+prior_init_time.strftime('%Y-%m-%d_%H:%M ') +prior_init_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp, depends_on=[depends_on]) +prior_path_exp, depends_on=[depends_on])
# prepare nature run, generate observations # prepare nature run, generate observations
s = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "30", s = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "60",
"mem": "300G", "ntasks-per-node": "96", "ntasks-per-core": "2"}) "mem": "300G", "ntasks-per-node": "96", "ntasks-per-core": "2"})
id = s.run(cluster.python+' '+cluster.scriptsdir+'/assim_synth_obs.py ' id = s.run(cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py '
+assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id]) +assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id])
# # actuall assimilation step # # actuall assimilation step
...@@ -202,11 +195,11 @@ def assimilate(assim_time, prior_init_time, ...@@ -202,11 +195,11 @@ def assimilate(assim_time, prior_init_time,
# id = s.run(cmd, depends_on=[id]) # id = s.run(cmd, depends_on=[id])
# s = my_Slurm("archiveAssim", cfg_update=dict(time="10")) # s = my_Slurm("archiveAssim", cfg_update=dict(time="10"))
# id = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py ' # id = s.run(cluster.python+' '+cluster.scripts_rundir+'/archive_assim.py '
# + assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id]) # + assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id])
s = my_Slurm("updateIC", cfg_update=dict(time="8")) s = my_Slurm("updateIC", cfg_update=dict(time="8"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_filteroutput.py ' id = s.run(cluster.python+' '+cluster.scripts_rundir+'/update_wrfinput_from_filteroutput.py '
+assim_time.strftime('%Y-%m-%d_%H:%M ') +assim_time.strftime('%Y-%m-%d_%H:%M ')
+prior_init_time.strftime('%Y-%m-%d_%H:%M ') +prior_init_time.strftime('%Y-%m-%d_%H:%M ')
+prior_path_exp, depends_on=[id]) +prior_path_exp, depends_on=[id])
...@@ -215,7 +208,7 @@ def assimilate(assim_time, prior_init_time, ...@@ -215,7 +208,7 @@ def assimilate(assim_time, prior_init_time,
def create_satimages(init_time, depends_on=None): def create_satimages(init_time, depends_on=None):
s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "30"}) s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "30"})
s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/run_init.py '+cluster.archivedir() s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/run_init.py '+cluster.archivedir
+init_time.strftime('/%Y-%m-%d_%H:%M/'), +init_time.strftime('/%Y-%m-%d_%H:%M/'),
depends_on=[depends_on]) depends_on=[depends_on])
...@@ -226,27 +219,32 @@ def mailme(depends_on=None): ...@@ -226,27 +219,32 @@ def mailme(depends_on=None):
def gen_obsseq(depends_on=None): def gen_obsseq(depends_on=None):
s = my_Slurm("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"}) s = my_Slurm("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"})
s.run(cluster.python+' '+cluster.scriptsdir+'/obsseq_to_netcdf.py', s.run(cluster.python+' '+cluster.scripts_rundir+'/obsseq_to_netcdf.py',
depends_on=[depends_on])
def copy_to_jet(depends_on=None):
s = my_Slurm("rsync-jet", cfg_update={"time": "1", "mail-type": "FAIL"})
s.run('bash rsync -avh '+cluster.archivedir+' lkugler@jet01.img.univie.ac.at:/jetfs/home/lkugler/data_jetfs/sim_archive/ &',
depends_on=[depends_on]) depends_on=[depends_on])
################################ ################################
if __name__ == "__main__": if __name__ == "__main__":
print('starting osse') print('starting osse')
timedelta_integrate = dt.timedelta(minutes=75) timedelta_integrate = dt.timedelta(minutes=10)
timedelta_btw_assim = dt.timedelta(minutes=60) timedelta_btw_assim = dt.timedelta(minutes=10)
backup_scripts() backup_scripts()
id = None id = None
start_from_existing_state = False start_from_existing_state = True
is_new_run = not start_from_existing_state is_new_run = not start_from_existing_state
if is_new_run: if is_new_run:
id = prepare_wrfinput() # create initial conditions init_time = dt.datetime(2008, 7, 30, 6)
id = prepare_wrfinput(init_time) # create initial conditions
# spin up the ensemble # spin up the ensemble
init_time = dt.datetime(2008, 7, 30, 6)
integration_end_time = dt.datetime(2008, 7, 30, 9) integration_end_time = dt.datetime(2008, 7, 30, 9)
id = run_ENS(begin=init_time, id = run_ENS(begin=init_time,
end=integration_end_time, end=integration_end_time,
...@@ -255,13 +253,14 @@ if __name__ == "__main__": ...@@ -255,13 +253,14 @@ if __name__ == "__main__":
prior_path_exp = False # for next assimilation prior_path_exp = False # for next assimilation
elif start_from_existing_state: elif start_from_existing_state:
id = prepare_wrfinput() # create initial conditions init_time = dt.datetime(2008, 7, 30, 6)
id = prepare_wrfinput(init_time) # create initial conditions
# get initial conditions from archive # get initial conditions from archive
init_time = dt.datetime(2008, 7, 30, 6) integration_end_time = dt.datetime(2008, 7, 30, 13)
integration_end_time = dt.datetime(2008, 7, 30, 9) exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_P1_40mem'
exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_P0_40mem' #id = update_wrfinput_from_archive(integration_end_time, init_time, exppath_arch, depends_on=id)
id = update_wrfinput_from_archive(integration_end_time, init_time, exppath_arch, depends_on=id) id = wrfinput_insert_wbubble(depends_on=id)
prior_path_exp = exppath_arch # for next assimilation prior_path_exp = exppath_arch # for next assimilation
# values for assimilation # values for assimilation
...@@ -269,7 +268,7 @@ if __name__ == "__main__": ...@@ -269,7 +268,7 @@ if __name__ == "__main__":
assim_time = integration_end_time assim_time = integration_end_time
prior_init_time = init_time prior_init_time = init_time
while time <= dt.datetime(2008, 7, 30, 20): while time <= dt.datetime(2008, 7, 30, 14):
id = assimilate(assim_time, id = assimilate(assim_time,
prior_init_time, prior_init_time,
...@@ -279,6 +278,11 @@ if __name__ == "__main__": ...@@ -279,6 +278,11 @@ if __name__ == "__main__":
# integration # integration
this_forecast_init = assim_time # start integration from here this_forecast_init = assim_time # start integration from here
timedelta_integrate = timedelta_btw_assim
if this_forecast_init.minute == 0 and this_forecast_init.hour != dt.datetime(2008, 7, 30, 13): # longer forecast every full hour
timedelta_integrate = dt.timedelta(minutes=3*60)
this_forecast_end = assim_time + timedelta_integrate this_forecast_end = assim_time + timedelta_integrate
id = run_ENS(begin=this_forecast_init, id = run_ENS(begin=this_forecast_init,
...@@ -295,3 +299,4 @@ if __name__ == "__main__": ...@@ -295,3 +299,4 @@ if __name__ == "__main__":
prior_init_time = time - timedelta_btw_assim prior_init_time = time - timedelta_btw_assim
gen_obsseq(id) # mailme(id) gen_obsseq(id) # mailme(id)
copy_to_jet(id)
...@@ -20,18 +20,39 @@ usually applied to 1 min forecasts to assess the posterior analysis quality ...@@ -20,18 +20,39 @@ usually applied to 1 min forecasts to assess the posterior analysis quality
""" """
if __name__ == '__main__': if __name__ == '__main__':
prev_forecast_init = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') prev_forecast_init = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M') time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M')
exppath_firstguess = cluster.archivedir() exppath_firstguess = cluster.archivedir
print(prev_forecast_init, time) print(prev_forecast_init, time)
# link ensemble states to run_DART directory # link ensemble states to run_DART directory
# we want the observation operator applied to these states! # we want the observation operator applied to these states!
pre_assim.run(time, prev_forecast_init, exppath_firstguess) pre_assim.run(time, prev_forecast_init, exppath_firstguess)
savedir = cluster.archivedir()+'/obs_seq_final_1min/' savedir = cluster.archivedir+'/obs_seq_final_1min/'
first_obscfg = exp.observations[0]
aso.set_DART_nml(cov_loc_radius_km=first_obscfg['cov_loc_radius_km'],
cov_loc_vert_km=first_obscfg.get('cov_loc_vert_km', False),
just_prior_values=True)
osq.create_obsseqin_alltypes(time, exp.observations, obs_errors=False)
# prepare dummy nature (this Hx is irrelevant)
os.chdir(cluster.dartrundir)
os.system('cp ./advance_temp1/wrfout_d01 ./wrfout_d01')
wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc',
cluster.dartrundir+'/wrfout_d01')
aso.run_perfect_model_obs()
aso.assimilate(nproc=96)
# only the prior state values are of interest in this file
# observation and truth is wrong in this file (dummy)
istage = 0
archive_stage = savedir+'/assim_stage'+str(istage)
aso.archive_diagnostics(archive_stage, time)
sys.exit() # multi stage below
n_stages = len(exp.observations) n_stages = len(exp.observations)
for istage, obscfg in enumerate(exp.observations): for istage, obscfg in enumerate(exp.observations):
...@@ -40,13 +61,14 @@ if __name__ == '__main__': ...@@ -40,13 +61,14 @@ if __name__ == '__main__':
sat_channel = obscfg.get('sat_channel', False) sat_channel = obscfg.get('sat_channel', False)
obscfg['folder_obs_coords'] = False obscfg['folder_obs_coords'] = False
aso.set_DART_nml(sat_channel=sat_channel, aso.set_DART_nml(cov_loc_radius_km=obscfg['cov_loc_radius_km'],
cov_loc_radius_km=obscfg['cov_loc_radius_km'],
cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False), cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False),
just_prior_values=True) just_prior_values=True)
osq.create_obsseq_in(time, obscfg) osq.create_obsseq_in(time, obscfg)
# prepare dummy nature (this Hx is irrelevant) # prepare dummy nature (this Hx is irrelevant)
os.chdir(cluster.dartrundir) os.chdir(cluster.dartrundir)
os.system('cp ./advance_temp1/wrfout_d01 ./wrfout_d01') os.system('cp ./advance_temp1/wrfout_d01 ./wrfout_d01')
......
import os, sys, warnings, glob import os, sys, warnings, glob
import datetime as dt import datetime as dt
from config.cfg import exp, cluster from cfg import exp, cluster
from utils import symlink, copy_scp_srvx8, copy, mkdir, mkdir_srvx8, clean_wrfdir from utils import symlink, copy_scp_srvx8, copy, mkdir, mkdir_srvx8, clean_wrfdir
# if cluster.name != 'srvx8': # if cluster.name != 'srvx8':
...@@ -11,7 +11,7 @@ time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') ...@@ -11,7 +11,7 @@ time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
try: try:
print('archive obs space diagnostics') print('archive obs space diagnostics')
savedir = cluster.archivedir()+'/obs_seq_final/' savedir = cluster.archivedir+'/obs_seq_final/'
mkdir(savedir) mkdir(savedir)
copy(cluster.dartrundir+'/obs_seq.final', savedir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final')) copy(cluster.dartrundir+'/obs_seq.final', savedir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final'))
except Exception as e: except Exception as e:
...@@ -19,7 +19,7 @@ except Exception as e: ...@@ -19,7 +19,7 @@ except Exception as e:
try: try:
print('archive regression diagnostics') print('archive regression diagnostics')
savedir = cluster.archivedir()+'/reg_factor/' savedir = cluster.archivedir+'/reg_factor/'
mkdir(savedir) mkdir(savedir)
copy(cluster.dartrundir+'/reg_diagnostics', savedir+time.strftime('/%Y-%m-%d_%H:%M_reg_diagnostics')) copy(cluster.dartrundir+'/reg_diagnostics', savedir+time.strftime('/%Y-%m-%d_%H:%M_reg_diagnostics'))
except Exception as e: except Exception as e:
...@@ -28,15 +28,15 @@ except Exception as e: ...@@ -28,15 +28,15 @@ except Exception as e:
print('archive model state') print('archive model state')
try: try:
mkdir(cluster.archivedir()) mkdir(cluster.archivedir)
print('copy prior posterior members to archive') print('copy prior posterior members to archive')
for iens in range(1, exp.n_ens+1): for iens in range(1, exp.n_ens+1):
savedir = cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/')+str(iens) savedir = cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/')+str(iens)
mkdir(savedir) mkdir(savedir)
copy(cluster.dartrundir+'/input.nml', copy(cluster.dartrundir+'/input.nml',
cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/input.nml')) cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/input.nml'))
filter_in = cluster.dartrundir+'/preassim_member_'+str(iens).zfill(4)+'.nc' filter_in = cluster.dartrundir+'/preassim_member_'+str(iens).zfill(4)+'.nc'
filter_out = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4) filter_out = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4)
...@@ -51,7 +51,7 @@ try: ...@@ -51,7 +51,7 @@ try:
for f in ['preassim_mean.nc', 'preassim_sd.nc', for f in ['preassim_mean.nc', 'preassim_sd.nc',
'output_mean.nc', 'output_sd.nc']: 'output_mean.nc', 'output_sd.nc']:
copy(cluster.dartrundir+'/'+f, copy(cluster.dartrundir+'/'+f,
cluster.archivedir()+'/'+f[:-3]+time.strftime('_%Y-%m-%d_%H:%M:%S')) cluster.archivedir+'/'+f[:-3]+time.strftime('_%Y-%m-%d_%H:%M:%S'))
except Exception as e: except Exception as e:
warnings.warn(str(e)) warnings.warn(str(e))
import os, sys, shutil, warnings import os, sys, shutil, warnings
import time as time_module
import datetime as dt import datetime as dt
import numpy as np import numpy as np
from scipy.interpolate import interp1d from scipy.interpolate import interp1d
...@@ -106,9 +107,8 @@ def replace_errors_obsseqout(f, new_errors): ...@@ -106,9 +107,8 @@ def replace_errors_obsseqout(f, new_errors):
print('replaced obs errors in', f) print('replaced obs errors in', f)
def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False, def set_DART_nml_singleobstype(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False,
just_prior_values=False): just_prior_values=False):
"""descr"""
cov_loc_radian = cov_loc_radius_km/earth_radius_km cov_loc_radian = cov_loc_radius_km/earth_radius_km
if just_prior_values: if just_prior_values:
...@@ -144,13 +144,42 @@ def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False, ...@@ -144,13 +144,42 @@ def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False,
rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml' rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml'
append_file(cluster.dartrundir+'/input.nml', rttov_nml) append_file(cluster.dartrundir+'/input.nml', rttov_nml)
def set_DART_nml(cov_loc_radius_km=32, cov_loc_vert_km=False, just_prior_values=False):
cov_loc_radian = cov_loc_radius_km/earth_radius_km
if just_prior_values:
template = cluster.scriptsdir+'/../templates/input.eval.nml'
else:
template = cluster.scriptsdir+'/../templates/input.nml'
copy(template, cluster.dartrundir+'/input.nml')
# options keys are replaced in input.nml with values
options = {'<n_ens>': str(int(exp.n_ens)),
'<cov_loc_radian>': str(cov_loc_radian)}
if cov_loc_vert_km:
vert_norm_rad = earth_radius_km*cov_loc_vert_km/cov_loc_radius_km*1000
options['<horiz_dist_only>'] = '.false.'
options['<vert_norm_hgt>'] = str(vert_norm_rad)
else:
options['<horiz_dist_only>'] = '.true.'
options['<vert_norm_hgt>'] = '50000.0' # dummy value
for key, value in options.items():
sed_inplace(cluster.dartrundir+'/input.nml', key, value)
# input.nml for RTTOV
rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml'
append_file(cluster.dartrundir+'/input.nml', rttov_nml)
def obs_operator_ensemble(istage): def obs_operator_ensemble(istage):
# assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01 # assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01
print('running obs operator on ensemble forecast') print('running obs operator on ensemble forecast')
os.chdir(cluster.dartrundir) os.chdir(cluster.dartrundir)
if sat_channel:
list_ensemble_truths = [] list_ensemble_truths = []
t = time_module.time()
for iens in range(1, exp.n_ens+1): for iens in range(1, exp.n_ens+1):
print('observation operator for ens #'+str(iens)) print('observation operator for ens #'+str(iens))
...@@ -171,16 +200,15 @@ def obs_operator_ensemble(istage): ...@@ -171,16 +200,15 @@ def obs_operator_ensemble(istage):
true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out')
list_ensemble_truths.append(true) list_ensemble_truths.append(true)
print('obs operator ensemble took', int(time_module.time()-t), 'seconds')
n_obs = len(list_ensemble_truths[0]) n_obs = len(list_ensemble_truths[0])
np_array = np.full((exp.n_ens, n_obs), np.nan) np_array = np.full((exp.n_ens, n_obs), np.nan)
for i in range(exp.n_ens): for i in range(exp.n_ens):
np_array[i, :] = list_ensemble_truths[i] np_array[i, :] = list_ensemble_truths[i]
return np_array return np_array
else:
raise NotImplementedError()
def obs_operator_nature(time): def obs_operator_nature(time):
print('running obs operator on nature run') print('getting true values in obs space from nature run')
prepare_nature_dart(time) prepare_nature_dart(time)
run_perfect_model_obs() run_perfect_model_obs()
true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out')
...@@ -196,6 +224,7 @@ def link_nature_to_dart_truth(time): ...@@ -196,6 +224,7 @@ def link_nature_to_dart_truth(time):
def prepare_nature_dart(time): def prepare_nature_dart(time):
print('linking nature to DART & georeferencing')
link_nature_to_dart_truth(time) link_nature_to_dart_truth(time)
wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01')
...@@ -217,6 +246,7 @@ def calc_obserr_WV73(Hx_nature, Hx_prior): ...@@ -217,6 +246,7 @@ def calc_obserr_WV73(Hx_nature, Hx_prior):
return OEs return OEs
def run_perfect_model_obs(): def run_perfect_model_obs():
print('generating observations - running ./perfect_model_obs')
os.chdir(cluster.dartrundir) os.chdir(cluster.dartrundir)
try_remove(cluster.dartrundir+'/obs_seq.out') try_remove(cluster.dartrundir+'/obs_seq.out')
if not os.path.exists(cluster.dartrundir+'/obs_seq.in'): if not os.path.exists(cluster.dartrundir+'/obs_seq.in'):
...@@ -227,10 +257,13 @@ def run_perfect_model_obs(): ...@@ -227,10 +257,13 @@ def run_perfect_model_obs():
'\n look for '+cluster.dartrundir+'log.perfect_model_obs') '\n look for '+cluster.dartrundir+'log.perfect_model_obs')
def assimilate(nproc=96): def assimilate(nproc=96):
print('time now', dt.datetime.now())
print('running filter') print('running filter')
os.chdir(cluster.dartrundir) os.chdir(cluster.dartrundir)
try_remove(cluster.dartrundir+'/obs_seq.final') try_remove(cluster.dartrundir+'/obs_seq.final')
t = time_module.time()
os.system('mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-'+str(int(nproc)-1)+' -np '+str(int(nproc))+' ./filter > log.filter') os.system('mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-'+str(int(nproc)-1)+' -np '+str(int(nproc))+' ./filter > log.filter')
print('./filter took', int(time_module.time()-t), 'seconds')
def archive_diagnostics(archive_dir, time): def archive_diagnostics(archive_dir, time):
print('archive obs space diagnostics') print('archive obs space diagnostics')
...@@ -239,11 +272,6 @@ def archive_diagnostics(archive_dir, time): ...@@ -239,11 +272,6 @@ def archive_diagnostics(archive_dir, time):
copy(cluster.dartrundir+'/obs_seq.final', fout) copy(cluster.dartrundir+'/obs_seq.final', fout)
print(fout, 'saved.') print(fout, 'saved.')
try:
copy(cluster.dartrundir+'/obs_coords.pkl', archive_stage+'/obs_coords.pkl')
except Exception as e:
warnings.warn(str(e))
# try: # what are regression diagnostics?! # try: # what are regression diagnostics?!
# print('archive regression diagnostics') # print('archive regression diagnostics')
# copy(cluster.dartrundir+'/reg_diagnostics', archive_dir+'/reg_diagnostics') # copy(cluster.dartrundir+'/reg_diagnostics', archive_dir+'/reg_diagnostics')
...@@ -293,33 +321,93 @@ def archive_output(archive_stage): ...@@ -293,33 +321,93 @@ def archive_output(archive_stage):
if __name__ == "__main__": if __name__ == "__main__":
"""Generate observations (obs_seq.out file) """Assimilate observations (different obs types)
as defined in config/cfg.py as defined in config/cfg.py
for a certain timestamp (argument) of the nature run (defined in config/clusters.py) for a certain timestamp (argument) of the nature run (defined in config/clusters.py)
Workflow: Workflow:
for each assimilation stage (one obs_seq.in and e.g. one observation type): for each assimilation stage (one obs_seq.in and e.g. one observation type):
1) prepare nature run for DART 1) create obs_seq.in with obs-errors
optional: 2) calculate obs-error from parametrization 2) prepare nature run for DART
3) create obs_seq.in with obs-errors from 2) 3) create obs from nature (obs_seq.out)
4) generate actual observations (obs_seq.out) with obs_seq.in from 3) 4) Assimilate
- adapt obs errors for assimilation
- calculate obs-error from parametrization - calculate assim obs error from parametrization
1) create obs_seq.in with obs error=0 1) create obs_seq.in with obs error=0
2) calculate y_nat = H(x_nature) and y_ens = H(x_ensemble) 2) calculate y_nat = H(x_nature) and y_ens = H(x_ensemble)
3) calculate obs error as function of y_nat, y_ensmean 3) calculate obs error as function of y_nat, y_ensmean
- or get assim obs error from config
- edit obs error in obs_seq.out
- assimilate
- write state to archive
Assumptions: Assumptions:
- x_ensemble is already linked for DART to advance_temp<iens>/wrfout_d01 - x_ensemble is already linked for DART to advance_temp<iens>/wrfout_d01
""" """
time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
archive_time = cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/') archive_time = cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/')
os.chdir(cluster.dartrundir) os.chdir(cluster.dartrundir)
os.system('rm -f obs_seq.in obs_seq.out obs_seq.final') # remove any existing observation files os.system('rm -f obs_seq.in obs_seq.out obs_seq.final') # remove any existing observation files
n_stages = len(exp.observations) print('create obs_seq.in from config')
istage = 0
archive_stage = archive_time + '/assim_stage'+str(istage)+'/'
prepare_nature_dart(time) # link WRF files to DART directory
################################################
print(' 1) get the assimilation errors in a single vector ')
error_assimilate = []
for i, obscfg in enumerate(exp.observations):
n_obs = obscfg['n_obs']
n_obs_z = obscfg.get('heights', 1)
n_obs_3d = n_obs * n_obs_z
parametrized = obscfg.get('sat_channel') == 6
if not parametrized:
err_this_type = np.zeros(n_obs_3d) + obscfg['error_generate']
else: # error parametrization for WV73
# get observations for sat 6
osq.create_obsseqin_alltypes(time, [obscfg,], obs_errors='error_generate')
run_perfect_model_obs()
# depends on obs_seq.out produced before by run_perfect_model_obs()
Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out')
Hx_prior = obs_operator_ensemble(istage) # files are already linked to DART directory
err_this_type = calc_obserr_WV73(Hx_nat, Hx_prior)
error_assimilate.extend(err_this_type)
################################################
print(' 2) generate observations ')
osq.create_obsseqin_alltypes(time, exp.observations, obs_errors='error_generate',
archive_obs_coords=archive_stage+'/obs_coords.pkl')
first_obstype = exp.observations[0]
set_DART_nml(cov_loc_radius_km=first_obstype['cov_loc_radius_km'],
cov_loc_vert_km=first_obstype.get('cov_loc_vert_km', False))
run_perfect_model_obs() # actually create observations that are used to assimilate
################################################
print(' 3) assimilate with adapted observation-errors ')
replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate)
t = time.time()
assimilate()
dir_obsseq = cluster.archivedir+'/obs_seq_final/assim_stage'+str(istage)
archive_diagnostics(dir_obsseq, time)
archive_output(archive_stage)
sys.exit() # below is the code for separate assimilation of different obs types
for istage, obscfg in enumerate(exp.observations): for istage, obscfg in enumerate(exp.observations):
print('running observation stage', istage, obscfg) print('running observation stage', istage, obscfg)
...@@ -327,42 +415,30 @@ if __name__ == "__main__": ...@@ -327,42 +415,30 @@ if __name__ == "__main__":
n_obs = obscfg['n_obs'] n_obs = obscfg['n_obs']
n_obs_3d = n_obs * len(obscfg['heights']) n_obs_3d = n_obs * len(obscfg['heights'])
sat_channel = obscfg.get('sat_channel', False) sat_channel = obscfg.get('sat_channel', False)
error_generate = obscfg['error_generate']
error_assimilate = obscfg['error_assimilate']
set_DART_nml(sat_channel=sat_channel, # debug option: overwrite time in prior files
cov_loc_radius_km=obscfg['cov_loc_radius_km'], # for iens in range(1,41):
cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False)) # os.system('ncks -A -v Times '+cluster.dartrundir+'/wrfout_d01 '+cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01')
use_error_parametrization = error_generate == False if error_assimilate == False:
if use_error_parametrization: # use error parametrization for assimilation-obs.errors
if sat_channel != 6: if sat_channel != 6:
raise NotImplementedError('sat channel '+str(sat_channel)) raise NotImplementedError('sat channel '+str(sat_channel))
osq.create_obsseq_in(time, obscfg, obs_errors=0) # zero error to get truth vals # depends on obs_seq.out produced before by run_perfect_model_obs()
Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out')
Hx_nat = obs_operator_nature(time)
Hx_prior = obs_operator_ensemble(istage) # files are already linked to DART directory Hx_prior = obs_operator_ensemble(istage) # files are already linked to DART directory
error_generate = calc_obserr_WV73(Hx_nat, Hx_prior) error_assimilate = calc_obserr_WV73(Hx_nat, Hx_prior)
# create obs template file, now with correct errors
osq.create_obsseq_in(time, obscfg, obs_errors=error_generate,
archive_obs_coords=archive_stage+'/obs_coords.pkl')
prepare_nature_dart(time) # link WRF files to DART directory
run_perfect_model_obs() # actually create observations that are used to assimilate
# debug option: overwrite time in prior files else:
# for iens in range(1,41):
# os.system('ncks -A -v Times '+cluster.dartrundir+'/wrfout_d01 '+cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01')
if error_assimilate is not False:
# overwrite error values in obs_seq.out # overwrite error values in obs_seq.out
error_assimilate = np.zeros(n_obs_3d) + error_assimilate # ensure shape error_assimilate = np.zeros(n_obs_3d) + error_assimilate # ensure shape
replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate)
replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate)
assimilate() assimilate()
dir_obsseq = cluster.archivedir()+'/obs_seq_final/assim_stage'+str(istage) dir_obsseq = cluster.archivedir+'/obs_seq_final/assim_stage'+str(istage)
archive_diagnostics(dir_obsseq, time) archive_diagnostics(dir_obsseq, time)
if istage < n_stages-1: if istage < n_stages-1:
......
import os, sys, shutil, warnings
import datetime as dt
import numpy as np
from scipy.interpolate import interp1d
from config.cfg import exp, cluster
from utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print
import create_obsseq as osq
import wrfout_add_geo
earth_radius_km = 6370
# fit of Fig 7, Harnisch 2016
x_ci = [0, 5, 10.5, 13, 16]
y_oe = [1, 4.5, 10, 12, 13] # Kelvin
oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True)
def oe_73(ci):
if ci < 13:
return oe_73_linear(ci)
else:
return 16.
def cloudimpact_73(bt_mod, bt_obs):
"""
follows Harnisch 2016
"""
biascor_obs = 0
bt_lim = 255 # Kelvin for 7.3 micron WV channel
ci_obs = max(0, bt_lim-(bt_obs - biascor_obs))
ci_mod = max(0, bt_lim-bt_mod)
ci = (ci_obs+ci_mod)/2
return ci
def read_prior_obs(f_obs_prior):
"""
docstring
"""
obsseq = open(f_obs_prior, 'r').readlines()
OBSs = []
# read observations from obs_seq.final
for i, line in enumerate(obsseq):
if ' OBS ' in line:
observed = float(obsseq[i+1].strip())
truth = float(obsseq[i+2].strip())
prior_ensmean = float(obsseq[i+3].strip())
prior_enssd = float(obsseq[i+4].strip())
prior_ens = []
for j in range(5, 5+exp.n_ens):
prior_ens.append(float(obsseq[i+j].strip()))
OBSs.append(dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens)))
return OBSs
def read_truth_obs_obsseq(f):
"""Reads observed and true values from obs_seq.out/final files."""
obsseq = open(f, 'r').readlines()
true = []
obs = []
# read observations from obs_seq.out
for i, line in enumerate(obsseq):
if ' OBS ' in line:
observed = float(obsseq[i+1].strip())
truth = float(obsseq[i+2].strip())
true.append(truth)
obs.append(observed)
return true, obs
def replace_errors_obsseqout(f, new_errors):
"""Replaces existing observation errors in obs_seq.final files
new_errors (np.array) : standard deviation,
shape must match the number of observations
"""
obsseq = open(f, 'r').readlines()
# find number of lines between two ' OBS ' lines:
first_obs = None
for i, line in enumerate(obsseq):
if ' OBS ' in line:
if first_obs is None:
first_obs = i
else:
second_obs = i
break
lines_between = second_obs - first_obs
lines_obserr_after_obsnr = lines_between - 1 # obserr line is one before ' OBS ' line
# replace values in list obsseq
i_obs = 0
for i, line in enumerate(obsseq):
if ' OBS ' in line:
line_error_obs_i = i+lines_obserr_after_obsnr
previous_err_var = obsseq[line_error_obs_i]
new_err_obs_i = new_errors[i_obs]**2 # variance in obs_seq.out
#print('previous err var ', previous_err_var, 'new error', new_err_obs_i)
obsseq[line_error_obs_i] = ' '+str(new_err_obs_i)+' \n'
i_obs += 1 # next iteration
with open(f, 'w') as file:
for line in obsseq:
file.write(line)
print('replaced obs errors in', f)
def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False,
just_prior_values=False):
"""descr"""
cov_loc_radian = cov_loc_radius_km/earth_radius_km
if just_prior_values:
template = cluster.scriptsdir+'/../templates/input.eval.nml'
else:
template = cluster.scriptsdir+'/../templates/input.nml'
copy(template, cluster.dartrundir+'/input.nml')
# options are overwritten with settings
options = {'<n_ens>': str(int(exp.n_ens)),
'<cov_loc_radian>': str(cov_loc_radian)}
if cov_loc_vert_km:
vert_norm_rad = earth_radius_km*cov_loc_vert_km/cov_loc_radius_km*1000
options['<horiz_dist_only>'] = '.false.'
options['<vert_norm_hgt>'] = str(vert_norm_rad)
else:
options['<horiz_dist_only>'] = '.true.'
options['<vert_norm_hgt>'] = '50000.0' # dummy value
for key, value in options.items():
sed_inplace(cluster.dartrundir+'/input.nml', key, value)
# input.nml for RTTOV
if sat_channel > 0:
if sat_channel in [1, 2, 3, 12]: # VIS channels
rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml'
else: # IR channels
rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml'
append_file(cluster.dartrundir+'/input.nml', rttov_nml)
else:
# append any rttov segment, needs to exist anyway
rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml'
append_file(cluster.dartrundir+'/input.nml', rttov_nml)
def obs_operator_ensemble(istage):
# assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01
print('running obs operator on ensemble forecast')
os.chdir(cluster.dartrundir)
if sat_channel:
list_ensemble_truths = []
for iens in range(1, exp.n_ens+1):
print('observation operator for ens #'+str(iens))
# ens members are already linked to advance_temp<i>/wrfout_d01
copy(cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01',
cluster.dartrundir+'/wrfout_d01')
# DART may need a wrfinput file as well, which serves as a template for dimension sizes
symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01')
# add geodata, if istage>0, wrfout is DART output (has coords)
if istage == 0:
wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01')
# run perfect_model obs (forward operator)
os.system('mpirun -np 12 ./perfect_model_obs > /dev/null')
# truth values in obs_seq.out are H(x) values
true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out')
list_ensemble_truths.append(true)
n_obs = len(list_ensemble_truths[0])
np_array = np.full((exp.n_ens, n_obs), np.nan)
for i in range(exp.n_ens):
np_array[i, :] = list_ensemble_truths[i]
return np_array
else:
raise NotImplementedError()
def obs_operator_nature(time):
print('what are the true values in obs space of nature run')
prepare_nature_dart(time)
run_perfect_model_obs()
true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out')
return true
def link_nature_to_dart_truth(time):
# get wrfout_d01 from nature run
shutil.copy(time.strftime(cluster.nature_wrfout),
cluster.dartrundir+'/wrfout_d01')
# DART may need a wrfinput file as well, which serves as a template for dimension sizes
symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01')
def prepare_nature_dart(time):
link_nature_to_dart_truth(time)
wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01')
def calc_obserr_WV73(Hx_nature, Hx_prior):
n_obs = len(Hx_nature)
OEs = np.ones(n_obs)
for iobs in range(n_obs):
bt_y = Hx_nature[iobs]
bt_x_ens = Hx_prior[:, iobs]
CIs = [cloudimpact_73(bt_x, bt_y) for bt_x in bt_x_ens]
mean_CI = np.mean(CIs)
oe_nature = oe_73(mean_CI)
print('oe_nature:', oe_nature, ', bt_y:', bt_y, ', mean_CI:', mean_CI)
OEs[iobs] = oe_nature
return OEs
def run_perfect_model_obs():
os.chdir(cluster.dartrundir)
try_remove(cluster.dartrundir+'/obs_seq.out')
if not os.path.exists(cluster.dartrundir+'/obs_seq.in'):
raise RuntimeError('obs_seq.in does not exist in '+cluster.dartrundir)
os.system('mpirun -np 12 ./perfect_model_obs > log.perfect_model_obs')
if not os.path.exists(cluster.dartrundir+'/obs_seq.out'):
raise RuntimeError('obs_seq.out does not exist in '+cluster.dartrundir,
'\n look for '+cluster.dartrundir+'log.perfect_model_obs')
def assimilate(nproc=96):
print('running filter')
os.chdir(cluster.dartrundir)
try_remove(cluster.dartrundir+'/obs_seq.final')
os.system('mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-'+str(int(nproc)-1)+' -np '+str(int(nproc))+' ./filter > log.filter')
def archive_diagnostics(archive_dir, time):
print('archive obs space diagnostics')
mkdir(archive_dir)
fout = archive_dir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final')
copy(cluster.dartrundir+'/obs_seq.final', fout)
print(fout, 'saved.')
try:
copy(cluster.dartrundir+'/obs_coords.pkl', archive_stage+'/obs_coords.pkl')
except Exception as e:
warnings.warn(str(e))
# try: # what are regression diagnostics?!
# print('archive regression diagnostics')
# copy(cluster.dartrundir+'/reg_diagnostics', archive_dir+'/reg_diagnostics')
# except Exception as e:
# warnings.warn(str(e))
def recycle_output():
update_vars = ['U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'QRAIN', 'U10', 'V10', 'T2', 'Q2', 'TSK', 'PSFC', 'CLDFRA']
updates = ','.join(update_vars)
print('recycle DART output to be used as input')
for iens in range(1, exp.n_ens+1):
dart_output = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4)
dart_input = cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01'
#print('check for non-monotonic vertical pressure')
# convert link to file in order to be able to update the content
if os.path.islink(dart_input):
l = os.readlink(dart_input)
os.remove(dart_input)
copy(l, dart_input)
# print('move DART output to input: '+dart_output+' -> '+dart_input)
# os.rename(dart_output, dart_input) # probably doesnt work
print('updating', updates, 'in', dart_input, 'from', dart_output)
os.system(cluster.ncks+' -A -v '+updates+' '+dart_output+' '+dart_input)
def archive_output(archive_stage):
print('archiving output')
mkdir(archive_stage)
copy(cluster.dartrundir+'/input.nml', archive_stage+'/input.nml')
# single members
for iens in range(1, exp.n_ens+1):
#savedir = archive_stage+'/'+str(iens)
#mkdir(savedir)
# filter_in = cluster.dartrundir+'/preassim_member_'+str(iens).zfill(4)+'.nc'
filter_out = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4)
copy(filter_out, archive_stage+'/filter_restart_d01.'+str(iens).zfill(4))
# copy mean and sd to archive
for f in ['output_mean.nc', 'output_sd.nc']:
copy(cluster.dartrundir+'/'+f, archive_stage+'/'+f)
if __name__ == "__main__":
"""Generate observations (obs_seq.out file)
as defined in config/cfg.py
for a certain timestamp (argument) of the nature run (defined in config/clusters.py)
Workflow:
for each assimilation stage (one obs_seq.in and e.g. one observation type):
1) prepare nature run for DART
optional: 2) calculate obs-error from parametrization
3) create obs_seq.in with obs-errors from 2)
4) generate actual observations (obs_seq.out) with obs_seq.in from 3)
- calculate obs-error from parametrization
1) create obs_seq.in with obs error=0
2) calculate y_nat = H(x_nature) and y_ens = H(x_ensemble)
3) calculate obs error as function of y_nat, y_ensmean
Assumptions:
- x_ensemble is already linked for DART to advance_temp<iens>/wrfout_d01
"""
time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
archive_time = cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/')
os.chdir(cluster.dartrundir)
os.system('rm -f obs_seq.in obs_seq.out obs_seq.final') # remove any existing observation files
n_stages = len(exp.observations)
for istage, obscfg in enumerate(exp.observations):
print('running observation stage', istage, obscfg)
archive_stage = archive_time + '/assim_stage'+str(istage)+'/'
n_obs = obscfg['n_obs']
n_obs_3d = n_obs * len(obscfg['heights'])
sat_channel = obscfg.get('sat_channel', False)
error_generate = obscfg['error_generate']
error_assimilate = obscfg['error_assimilate']
set_DART_nml(sat_channel=sat_channel,
cov_loc_radius_km=obscfg['cov_loc_radius_km'],
cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False))
# create obs template file, now with correct errors
osq.create_obsseq_in(time, obscfg, obs_errors=error_generate,
archive_obs_coords=archive_stage+'/obs_coords.pkl')
prepare_nature_dart(time) # link WRF files to DART directory
run_perfect_model_obs() # actually create observations that are used to assimilate
# debug option: overwrite time in prior files
# for iens in range(1,41):
# os.system('ncks -A -v Times '+cluster.dartrundir+'/wrfout_d01 '+cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01')
if error_assimilate == False:
# use error parametrization for assimilation-obs.errors
if sat_channel != 6:
raise NotImplementedError('sat channel '+str(sat_channel))
# depends on obs_seq.out produced before by run_perfect_model_obs()
Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out')
Hx_prior = obs_operator_ensemble(istage) # files are already linked to DART directory
error_assimilate = calc_obserr_WV73(Hx_nat, Hx_prior)
else:
# overwrite error values in obs_seq.out
error_assimilate = np.zeros(n_obs_3d) + error_assimilate # ensure shape
replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate)
assimilate()
dir_obsseq = cluster.archivedir+'/obs_seq_final/assim_stage'+str(istage)
archive_diagnostics(dir_obsseq, time)
if istage < n_stages-1:
# recirculation: filter output -> input
archive_output(archive_stage)
recycle_output()
elif istage == n_stages-1:
# last assimilation, continue integration now
archive_output(archive_stage)
else:
RuntimeError('this should never occur?!')
/home/fs71386/lkugler/DART-WRF/scripts//../config
\ No newline at end of file
"""Create obs_seq.in """The functions in here create obs_seq.in files.
These are the template files defining obs locations and metadata
according to which observations are generated and subsequently assimilated.
""" """
import os, sys, warnings import os, sys, warnings
import numpy as np import numpy as np
...@@ -43,10 +44,14 @@ def obskind_read(): ...@@ -43,10 +44,14 @@ def obskind_read():
##################### #####################
# Global variables # Global variables
# position on earth to calculate solar angles # position on earth for RTTOV ray geometry
lat0 = 45. lat0 = 45.
lon0 = 0. lon0 = 0.
# position on Earth for DART, domain center when coords_from_domaincenter=True
lat0_center = 45.
lon0_center = 0.
sat_az = "180.0" sat_az = "180.0"
sat_zen = "45.0" sat_zen = "45.0"
...@@ -78,7 +83,7 @@ def get_dart_date(time_dt): ...@@ -78,7 +83,7 @@ def get_dart_date(time_dt):
def calc_obs_locations(n_obs, coords_from_domaincenter=True, def calc_obs_locations(n_obs, coords_from_domaincenter=True,
distance_between_obs_km=9999, fpath_coords=False): distance_between_obs_km=9999, cov_loc_radius_km=32, fpath_coords=False):
"""Calculate coordinate pairs for locations """Calculate coordinate pairs for locations
Args: Args:
...@@ -115,28 +120,50 @@ def calc_obs_locations(n_obs, coords_from_domaincenter=True, ...@@ -115,28 +120,50 @@ def calc_obs_locations(n_obs, coords_from_domaincenter=True,
for iy in range(int(-ny/2), int(ny/2+1)): for iy in range(int(-ny/2), int(ny/2+1)):
for ix in range(int(-nx/2), int(nx/2+1)): for ix in range(int(-nx/2), int(nx/2+1)):
lat = lat0 + iy*dy_4km_in_degree lat = lat0_center + iy*dy_4km_in_degree
m_per_degree_x = 2*np.pi*radius_earth_meters*np.sin(lat/180*np.pi)/360 m_per_degree_x = 2*np.pi*radius_earth_meters*np.sin(lat/180*np.pi)/360
dx_4km_in_degree = distance_between_obs_meters/m_per_degree_x dx_4km_in_degree = distance_between_obs_meters/m_per_degree_x
lon = lon0 + ix*dx_4km_in_degree lon = lon0_center + ix*dx_4km_in_degree
coords.append((lat, lon)) coords.append((lat, lon))
else: else:
"""Observations spread evenly over domain""" """Observations spread evenly over domain
fcoords = '/home/fs71386/lkugler/run_DART/geo_em.d01.nc' but: leave out a distance to the border of the domain
so that the assimilation increments are zero on the boundary
distance to boundary = 1.5x localization-radius
"""
fcoords = cluster.dartrundir+'/geo_em.d01.nc'
import xarray as xr import xarray as xr
ds = xr.open_dataset(fcoords) ds = xr.open_dataset(fcoords)
lons = ds.XLONG_M.isel(Time=0).values lons = ds.XLONG_M.isel(Time=0).values
lats = ds.XLAT_M.isel(Time=0).values lats = ds.XLAT_M.isel(Time=0).values
n_obs_x = int(np.sqrt(n_obs)) n_obs_x = int(np.sqrt(n_obs))
dx = int(len(ds.south_north)/n_obs_x) nx = len(ds.south_north) # number of gridpoints in one direction
skip = int(dx/2) model_dx_km = exp.model_dx/1000
omit_covloc_radius_on_boundary = True
if omit_covloc_radius_on_boundary: # in order to avoid an increment step on the boundary
skip_km = cov_loc_radius_km*1.5
skip_gridpoints = int(skip_km/model_dx_km) # skip this many gridpoints on each side
gridpoints_left = nx - 2*skip_gridpoints
# now spread observations evenly across the space left
gridpoints_between_obs = int(gridpoints_left/(n_obs_x-1))
else:
gridpoints_between_obs = int(nx/n_obs_x) # number of gridpoints / number of observations in one direction
skip_gridpoints = int(gridpoints_between_obs/2) # to center the observations in the domain
km_between_obs = gridpoints_between_obs*model_dx_km
print('observation density: one observation every', km_between_obs, 'km /',
gridpoints_between_obs,'gridpoints \n', 'leaving a domain boundary on each side of',
skip_gridpoints, 'gridpoints or', skip_gridpoints*model_dx_km, 'km')
# skip_gridpoints = space we have to leave out on the boundary of the domain
# in order to have the observations centered in the domain
for i in range(n_obs_x): for i in range(n_obs_x):
for j in range(n_obs_x): for j in range(n_obs_x):
coords.append((lats[skip+i*dx, skip+j*dx], coords.append((lats[skip_gridpoints+i*gridpoints_between_obs, skip_gridpoints+j*gridpoints_between_obs],
lons[skip+i*dx, skip+j*dx])) lons[skip_gridpoints+i*gridpoints_between_obs, skip_gridpoints+j*gridpoints_between_obs]))
try: try:
if fpath_coords: if fpath_coords:
import pickle import pickle
...@@ -184,6 +211,51 @@ obs_kind_definitions ...@@ -184,6 +211,51 @@ obs_kind_definitions
first: 1 last: """+n_obs_str first: 1 last: """+n_obs_str
def preamble_multi(n_obs_3d_total, list_kinds):
lines_obstypedef = ''
for kind in list_kinds:
lines_obstypedef += '\n '+obs_kind_nrs[kind]+' '+kind
n_obs_str = str(n_obs_3d_total)
num_obstypes = str(len(list_kinds))
return """ obs_sequence
obs_kind_definitions
"""+num_obstypes+' '+lines_obstypedef+"""
num_copies: 0 num_qc: 0
num_obs: """+n_obs_str+" max_num_obs: "+n_obs_str+"""
first: 1 last: """+n_obs_str
def determine_vert_coords(sat_channel, kind, obscfg):
if not sat_channel:
if 'SYNOP' in kind:
vert_coord_sys = "-1" # meters AGL
vert_coords = [2, ]
else:
vert_coord_sys = "3" # meters AMSL
vert_coords = obscfg['heights']
else:
vert_coord_sys = "-2" # undefined height
vert_coords = ["-888888.0000000000", ]
return vert_coord_sys, vert_coords
def write_sat_angle_appendix(sat_channel, lat0, lon0, time_dt):
if sat_channel:
sun_az = str(get_azimuth(lat0, lon0, time_dt))
sun_zen = str(90. - get_altitude(lat0, lon0, time_dt))
print('sunzen', sun_zen, 'sunazi', sun_az)
return """visir
"""+sat_az+""" """+sat_zen+""" """+sun_az+"""
"""+sun_zen+"""
12 4 21 """+str(sat_channel)+"""
-888888.000000000
1"""
else:
return ''
def write_section(obs, last=False): def write_section(obs, last=False):
""" """
Args: Args:
...@@ -211,9 +283,9 @@ kind ...@@ -211,9 +283,9 @@ kind
"""+str(obs['obserr_var']) """+str(obs['obserr_var'])
def create_obsseq_in(time_dt, obscfg, obs_errors=False, def create_obsseq_in_separate_obs(time_dt, obscfg, obs_errors=False,
archive_obs_coords=False): archive_obs_coords=False):
"""Create obs_seq.in """Create obs_seq.in of one obstype
Args: Args:
time_dt (dt.datetime): time of observation time_dt (dt.datetime): time of observation
...@@ -304,26 +376,95 @@ def create_obsseq_in(time_dt, obscfg, obs_errors=False, ...@@ -304,26 +376,95 @@ def create_obsseq_in(time_dt, obscfg, obs_errors=False,
write_file(txt, output_path=cluster.dartrundir+'/obs_seq.in') write_file(txt, output_path=cluster.dartrundir+'/obs_seq.in')
def create_obsseqin_alltypes(time_dt, list_obscfg, obs_errors='error_assimilate', archive_obs_coords=False):
"""Create obs_seq.in with multiple obs types in one file
Args:
time_dt (dt.datetime): time of observation
list_obscfg (list of dict)
obs_errors (False or str): Key to obsdict, which field contains the error values
archive_obs_coords (bool, str): False or str (filepath where `obs_seq.in` will be saved)
"""
print('creating obs_seq.in:')
time_dt = add_timezone_UTC(time_dt)
dart_date_day, secs_thatday = get_dart_date(time_dt)
# print('secs, days:', secs_thatday, dart_date_day)
txt = ''
i_obs_total = 0
for istage, obscfg in enumerate(list_obscfg):
n_obs = obscfg['n_obs']
coords = calc_obs_locations(n_obs,
coords_from_domaincenter=False,
distance_between_obs_km=obscfg.get('distance_between_obs_km', False),
cov_loc_radius_km=obscfg['cov_loc_radius_km'],
fpath_coords=archive_obs_coords)
kind = obscfg['kind']
print('obstype', kind)
sat_channel = obscfg.get('sat_channel', False)
vert_coord_sys, vert_coords = determine_vert_coords(sat_channel, kind, obscfg)
coords = append_hgt_to_coords(coords, vert_coords)
n_obs_3d_thistype = len(coords)
# define obs error
obserr_std = np.zeros(n_obs_3d_thistype)
if obs_errors:
obserr_std += obscfg[obs_errors]
sat_info = write_sat_angle_appendix(sat_channel, lat0, lon0, time_dt)
for i_obs in range(n_obs_3d_thistype):
i_obs_total += 1
last = False
is_last_obs_in_type = (i_obs == int(n_obs_3d_thistype)-1)
is_last_obstype = (istage == len(list_obscfg)-1)
if is_last_obs_in_type and is_last_obstype:
last = True
txt += write_section(dict(i=i_obs_total,
kind_nr=obs_kind_nrs[kind],
dart_date_day=dart_date_day,
secs_thatday=secs_thatday,
lon=coords[i_obs][1],
lat=coords[i_obs][0],
vert_coord=coords[i_obs][2],
vert_coord_sys=vert_coord_sys,
obserr_var=obserr_std[i_obs]**2,
appendix=sat_info),
last=last)
n_obs_total = i_obs_total
list_kinds = [a['kind'] for a in list_obscfg]
pretxt = preamble_multi(n_obs_total, list_kinds)
alltxt = pretxt + txt
write_file(alltxt, output_path=cluster.dartrundir+'/obs_seq.in')
if __name__ == '__main__': if __name__ == '__main__':
# for testing # for testing
time_dt = dt.datetime(2008, 7, 30, 9, 0) time_dt = dt.datetime(2008, 7, 30, 9, 0)
n_obs = 1600 # radar: n_obs for each observation height level n_obs = 900 # radar: n_obs for each observation height level
vis = dict(plotname='VIS 0.6µm', plotunits='[1]', vis = dict(plotname='VIS 0.6µm', plotunits='[1]',
kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs, kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs,
err_std=0.03,
error_generate=0.03, error_assimilate=0.06, error_generate=0.03, error_assimilate=0.06,
cov_loc_radius_km=32) cov_loc_radius_km=32)
wv73 = dict(plotname='Brightness temperature WV 7.3µm', plotunits='[K]', wv73 = dict(plotname='Brightness temperature WV 7.3µm', plotunits='[K]',
kind='MSG_4_SEVIRI_TB', sat_channel=6, n_obs=n_obs, kind='MSG_4_SEVIRI_TB', sat_channel=6, n_obs=n_obs,
err_std=False, error_generate=1., error_assimilate=False,
error_generate=False, error_assimilate=False,
cov_loc_radius_km=32) cov_loc_radius_km=32)
ir108 = dict(plotname='Brightness temperature IR 10.8µm', plotunits='[K]', ir108 = dict(plotname='Brightness temperature IR 10.8µm', plotunits='[K]',
kind='MSG_4_SEVIRI_TB', sat_channel=9, n_obs=n_obs, kind='MSG_4_SEVIRI_TB', sat_channel=9, n_obs=n_obs,
err_std=5.,
error_generate=5., error_assimilate=10., error_generate=5., error_assimilate=10.,
cov_loc_radius_km=32) cov_loc_radius_km=32)
...@@ -331,7 +472,7 @@ if __name__ == '__main__': ...@@ -331,7 +472,7 @@ if __name__ == '__main__':
kind='RADAR_REFLECTIVITY', n_obs=n_obs, kind='RADAR_REFLECTIVITY', n_obs=n_obs,
error_generate=2.5, error_assimilate=5., error_generate=2.5, error_assimilate=5.,
heights=np.arange(1000, 15001, 1000), heights=np.arange(1000, 15001, 1000),
cov_loc_radius_km=30, cov_loc_vert_km=4) cov_loc_radius_km=32, cov_loc_vert_km=4)
t2m = dict(plotname='SYNOP Temperature', plotunits='[K]', t2m = dict(plotname='SYNOP Temperature', plotunits='[K]',
kind='SYNOP_TEMPERATURE', n_obs=n_obs, kind='SYNOP_TEMPERATURE', n_obs=n_obs,
...@@ -345,6 +486,10 @@ if __name__ == '__main__': ...@@ -345,6 +486,10 @@ if __name__ == '__main__':
#create_obsseq_in(time_dt, radar, archive_obs_coords=False) #'./coords_stage1.pkl') #create_obsseq_in(time_dt, radar, archive_obs_coords=False) #'./coords_stage1.pkl')
create_obsseqin_alltypes(time_dt, [vis, wv73], obs_errors='error_generate', archive_obs_coords='./obs_coords.pkl')
error_assimilate = 5.*np.ones(n_obs*len(radar['heights'])) error_assimilate = 5.*np.ones(n_obs*len(radar['heights']))
import assim_synth_obs as aso import assim_synth_obs as aso
aso.replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) #aso.replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate)
import os, sys, shutil
import datetime as dt
import numpy as np
from config.cfg import exp, cluster
import netCDF4 as nc
dx_km = 2
cr = 50 # km
cz = 3000 # meters
for iens in range(1, exp.n_ens+1):
print('update state in wrfinput wrfout file to DART background file')
wrfin = cluster.wrf_rundir(iens)+'/wrfinput_d01'
# insert wbubble
with nc.Dataset(wrfin, 'r+') as ds:
t = ds.variables['T'][:]
nt, nz, ny, nx = t.shape
z = ds.variables['PHB'][0,:,100,100]/9.81
z = np.array(z)
z = z - z[0]
z = (z[1:]+z[:-1])/2
z = z[:, np.newaxis, np.newaxis]
cx = 100*dx_km #(80 + 40*np.random.rand())*dx_km
cy = 100*dx_km #(80 + 40*np.random.rand())*dx_km
print(cx, cy)
x = np.arange(0,nx)*dx_km
y = np.arange(0,ny)*dx_km
dx = x-cx
dy = y-cy
xx, yy = np.meshgrid(dx, dy)
dr = np.sqrt(xx**2 + yy**2)[np.newaxis, :, :]
pert = 3*np.exp(-(dr/cr)**2)*np.exp(-(z/cz)**2)
ds.variables['T'][0,...] = ds.variables['T'][0,...] + pert
print(wrfin, 'wbubble inserted.')
import os
from config.cfg import exp, cluster
from utils import symlink, copy_scp_srvx8, copy, sed_inplace
joinp = os.path.join
# DART executables
bins = ['perfect_model_obs', 'filter', 'obs_diag', 'obs_seq_to_netcdf']
for b in bins:
symlink(joinp(cluster.dart_srcdir, b),
joinp(cluster.dartrundir, b))
rttov_files = ['rttov13pred54L/rtcoef_msg_4_seviri_o3.dat',
#'mfasis_lut/rttov_mfasis_cld_msg_4_seviri_deff.dat',
'mfasis_lut/rttov_mfasis_cld_msg_4_seviri_deff.H5',
'cldaer_visir/sccldcoef_msg_4_seviri.dat']
rttov_src_dir = '/home/fs71386/lkugler/RTTOV13/rtcoef_rttov13/'
for f_src in rttov_files:
destname = os.path.basename(f_src)
if 'rtcoef' in f_src:
destname = 'rtcoef_msg_4_seviri.dat'
symlink(rttov_src_dir+f_src, cluster.dartrundir+'/'+destname)
symlink(cluster.dartrundir+'/rttov_mfasis_cld_msg_4_seviri_deff.H5',
cluster.dartrundir+'/rttov_mfasis_cld_msg_4_seviri.H5')
symlink(cluster.dart_srcdir+'/../../../observations/forward_operators/rttov_sensor_db.csv',
cluster.dartrundir+'/rttov_sensor_db.csv')
...@@ -57,6 +57,8 @@ def run(assim_time, background_init_time, exppath_firstguess): ...@@ -57,6 +57,8 @@ def run(assim_time, background_init_time, exppath_firstguess):
os.system('rm -rf '+cluster.dartrundir+'/perfect_output_*') os.system('rm -rf '+cluster.dartrundir+'/perfect_output_*')
os.system('rm -rf '+cluster.dartrundir+'/obs_seq.fina*') os.system('rm -rf '+cluster.dartrundir+'/obs_seq.fina*')
os.system(cluster.python+' '+cluster.scriptsdir+'/link_rttov.py')
if __name__ == '__main__': if __name__ == '__main__':
assim_time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') assim_time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
......
import os, sys
from config.cfg import exp, cluster
from utils import symlink
joinp = os.path.join
dart_bin_dir = '/home/fs71386/lkugler/DART/DART_WRF_RTTOV_early_access/models/wrf/work/'
rttov_dir = '/home/fs71386/lkugler/RTTOV/'
# DART executables
bins = ['perfect_model_obs', 'filter', 'obs_diag']
for b in bins:
symlink(joinp(dart_bin_dir, b),
joinp(cluster.dartrundir, b))
# DART RTTOV capability
symlink('/home/fs71386/lkugler/DART/DART_WRF_RTTOV_early_access/'
+'observations/forward_operators/rttov_sensor_db.csv',
joinp(cluster.dartrundir, 'rttov_sensor_db.csv'))
# Basic MSG4 Seviri
files = ['rtcoef_rttov12/rttov9pred54L/rtcoef_msg_4_seviri.dat',
'rtcoef_rttov12/cldaer_visir/sccldcoef_msg_4_seviri.dat',]
for f in files:
symlink(joinp(rttov_dir, f),
joinp(cluster.dartrundir, os.path.basename(f)))
# MFASIS
deff = True
if deff:
mfasis_tbl = 'rtcoef_rttov12/mfasis_lut/rttov_mfasis_cld_msg_4_seviri_deff.H5'
else:
mfasis_tbl = 'rtcoef_rttov12/mfasis_lut/rttov_mfasis_cld_msg_4_seviri_opac.H5'
symlink(joinp(rttov_dir, mfasis_tbl),
joinp(cluster.dartrundir, 'rttov_mfasis_cld_msg_4_seviri.dat'))
import os, sys, shutil, warnings import os, sys, shutil, warnings
import datetime as dt import datetime as dt
from config.cfg import exp, cluster from config.cfg import exp, cluster
sys.path.append(cluster.scriptsdir)
from config.cfg import exp, cluster
from utils import sed_inplace, copy, symlink, mkdir from utils import sed_inplace, copy, symlink, mkdir
def run(iens, begin, end, hist_interval=5, radt=5, archive=True): def run(iens, begin, end, hist_interval=5, radt=5, archive=True):
...@@ -22,7 +19,7 @@ def run(iens, begin, end, hist_interval=5, radt=5, archive=True): ...@@ -22,7 +19,7 @@ def run(iens, begin, end, hist_interval=5, radt=5, archive=True):
sed_inplace(rundir+'/namelist.input', '<radt>', str(int(radt))) sed_inplace(rundir+'/namelist.input', '<radt>', str(int(radt)))
if archive: if archive:
archdir = cluster.archivedir()+begin.strftime('/%Y-%m-%d_%H:%M/'+str(iens)+'/') archdir = cluster.archivedir+begin.strftime('/%Y-%m-%d_%H:%M/'+str(iens)+'/')
os.makedirs(archdir, exist_ok=True) os.makedirs(archdir, exist_ok=True)
else: else:
archdir = './' archdir = './'
...@@ -41,7 +38,7 @@ def run(iens, begin, end, hist_interval=5, radt=5, archive=True): ...@@ -41,7 +38,7 @@ def run(iens, begin, end, hist_interval=5, radt=5, archive=True):
######################### #########################
if archive: if archive:
init_dir = cluster.archivedir()+begin.strftime('/%Y-%m-%d_%H:%M/')+str(iens) init_dir = cluster.archivedir+begin.strftime('/%Y-%m-%d_%H:%M/')+str(iens)
os.makedirs(init_dir, exist_ok=True) os.makedirs(init_dir, exist_ok=True)
try: try:
print('copy wrfinput of this run to archive') print('copy wrfinput of this run to archive')
......
...@@ -5,6 +5,8 @@ from utils import symlink, copy, link_contents ...@@ -5,6 +5,8 @@ from utils import symlink, copy, link_contents
import prepare_namelist import prepare_namelist
init_time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
for iens in range(1, exp.n_ens+1): for iens in range(1, exp.n_ens+1):
print('preparing ens', iens) print('preparing ens', iens)
input_prof = (cluster.input_profile).replace('<iens>', str(iens).zfill(3)) input_prof = (cluster.input_profile).replace('<iens>', str(iens).zfill(3))
...@@ -17,8 +19,7 @@ for iens in range(1, exp.n_ens+1): ...@@ -17,8 +19,7 @@ for iens in range(1, exp.n_ens+1):
symlink(cluster.wrfexe, rundir+'/wrf.exe') symlink(cluster.wrfexe, rundir+'/wrf.exe')
# time not important, but general settings # time not important, but general settings
prepare_namelist.run(iens, begin=dt.datetime(2008, 7, 30, 6, 0), prepare_namelist.run(iens, begin=init_time, end=dt.datetime(2008, 7, 30, 12),
end=dt.datetime(2008, 7, 30, 6, 30),
archive=False) archive=False)
symlink(input_prof, rundir+'/input_sounding') symlink(input_prof, rundir+'/input_sounding')
......
...@@ -7,13 +7,13 @@ export SLURM_STEP_GRES=none ...@@ -7,13 +7,13 @@ export SLURM_STEP_GRES=none
echo "SLURM_ARRAY_TASK_ID:"$SLURM_ARRAY_TASK_ID echo "SLURM_ARRAY_TASK_ID:"$SLURM_ARRAY_TASK_ID
EXPNAME=<expname> EXPNAME=<expname>
USERDIR=/home/fs71386/lkugler MAINDIR=/gpfs/data/fs71386/lkugler/run_WRF
pinning=(0-11 12-23 24-35 36-47) pinning=(0-11 12-23 24-35 36-47)
for ((n=1; n<=4; n++)) for ((n=1; n<=4; n++))
do do
IENS="$(((($SLURM_ARRAY_TASK_ID - 1)* 4) + $n))" IENS="$(((($SLURM_ARRAY_TASK_ID - 1)* 4) + $n))"
RUNDIR=$USERDIR/run_WRF/$EXPNAME/$IENS RUNDIR=$MAINDIR/$EXPNAME/$IENS
echo "ENSEMBLE NR: "$IENS" in "$RUNDIR echo "ENSEMBLE NR: "$IENS" in "$RUNDIR
cd $RUNDIR cd $RUNDIR
rm -rf wrfrst_d01_* wrfout_d01_* rsl.out.0* rm -rf wrfrst_d01_* wrfout_d01_* rsl.out.0*
...@@ -27,7 +27,7 @@ wait ...@@ -27,7 +27,7 @@ wait
for ((n=1; n<=4; n++)) for ((n=1; n<=4; n++))
do do
IENS="$(((($SLURM_ARRAY_TASK_ID - 1)* 4) + $n))" IENS="$(((($SLURM_ARRAY_TASK_ID - 1)* 4) + $n))"
RUNDIR=$USERDIR/run_WRF/$EXPNAME/$IENS RUNDIR=$MAINDIR/$EXPNAME/$IENS
cd $RUNDIR cd $RUNDIR
line=`tail -n 1 rsl.out.0000` line=`tail -n 1 rsl.out.0000`
if [[ $line == *"SUCCESS COMPLETE WRF"* ]]; if [[ $line == *"SUCCESS COMPLETE WRF"* ]];
......
print('loading modules')
import os, sys, warnings import os, sys, warnings
import datetime as dt import datetime as dt
import netCDF4 as nc import netCDF4 as nc
...@@ -10,12 +11,14 @@ background_init_time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M') ...@@ -10,12 +11,14 @@ background_init_time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M')
exppath_firstguess = str(sys.argv[3]) exppath_firstguess = str(sys.argv[3])
""" """
sets initial condition data (wrfinput file) in run_WRF directories -) sets initial condition data (wrfinput file) in the run_WRF directory for each ensemble member
from a DART output state (set of filter_restart files) from a DART output state (set of filter_restart files)
-) cycles (copies) some state variables from the prior ensemble to the wrfinput of the next run
# assumes T = THM (dry potential temperature as prognostic variable) # assumes T = THM (dry potential temperature as prognostic variable)
""" """
update_vars = ['Times', 'U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'TSK', 'CLDFRA'] update_vars = ['Times',]
update_vars.extend(exp.update_vars) # 'U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'TSK', 'CLDFRA']
updates = ','.join(update_vars) updates = ','.join(update_vars)
print('move output to WRF dir as new initial conditions') print('move output to WRF dir as new initial conditions')
......
"""Add geogrid data to wrfinput """Add geogrid data to wrfinput
this is needed for DART, but not provided by ideal.exe DART needs a georeference, but ideal.exe does not provide it
take LAT,LON, mapfac from geogrid, so that they are consistent. Takes LAT,LON, mapfac from geogrid, so that they are consistent.
do not change E, F, HGT_M as they would alter the dynamics and have no impact on assimilation Does not change E, F, HGT_M as they would alter the dynamics and have no impact on assimilation
example call: Example call:
./wrfout_add_geo.py geo_em.d01.nc wrfinput_d01 ./wrfout_add_geo.py geo_em.d01.nc wrfinput_d01
""" """
import os, sys import os, sys
import netCDF4 as nc import netCDF4 as nc
from config.cfg import exp, cluster from config.cfg import exp, cluster
def run(geo_data_file, wrfout_file):
geo_ds = nc.Dataset(geo_data_file, 'r')
wrfinp_ds = nc.Dataset(wrfout_file, 'r+')
fields_old = ["XLAT_M", "XLONG_M", "CLAT", fields_old = ["XLAT_M", "XLONG_M", "CLAT",
"XLONG_U", "XLONG_V", "XLAT_U", "XLAT_V"] "XLONG_U", "XLONG_V", "XLAT_U", "XLAT_V"]
fields_new = ["XLAT", "XLONG", "CLAT", fields_new = ["XLAT", "XLONG", "CLAT",
"XLONG_U", "XLONG_V", "XLAT_U", "XLAT_V"] "XLONG_U", "XLONG_V", "XLAT_U", "XLAT_V"]
def run(geo_data_file, wrfout_file):
debug = False
print('updating geodata in', wrfout_file, 'from', geo_data_file)
geo_ds = nc.Dataset(geo_data_file, 'r')
wrfinp_ds = nc.Dataset(wrfout_file, 'r+')
for old, new in zip(fields_old, fields_new): for old, new in zip(fields_old, fields_new):
if debug:
print('moving old field', old, 'into new field', new) print('moving old field', old, 'into new field', new)
#print(geo_ds.variables[old][:].shape, wrfinp_ds.variables[new][:].shape) print(geo_ds.variables[old][:].shape, wrfinp_ds.variables[new][:].shape)
wrfinp_ds.variables[new][:] = geo_ds.variables[old][:] wrfinp_ds.variables[new][:] = geo_ds.variables[old][:]
#print(wrfinp_ds.variables[new][:])
wrfinp_ds.close() wrfinp_ds.close()
geo_ds.close() geo_ds.close()
......
...@@ -51,16 +51,16 @@ ...@@ -51,16 +51,16 @@
output_timestamps = .false., output_timestamps = .false.,
trace_execution = .false., trace_execution = .false.,
stages_to_write = 'preassim', 'output' stages_to_write = 'output'
output_members = .true. output_members = .true.
output_mean = .true. output_mean = .true.
output_sd = .true. output_sd = .true.
write_all_stages_at_end = .false. write_all_stages_at_end = .false.
inf_flavor = 0, 0, inf_flavor = 0, 4,
inf_initial_from_restart = .true., .false., inf_initial_from_restart = .true., .false.,
inf_sd_initial_from_restart = .true., .false., inf_sd_initial_from_restart = .true., .false.,
inf_initial = 1.0, 1.00, inf_initial = 1.00, 0.90,
inf_lower_bound = 1.0, 1.0, inf_lower_bound = 1.0, 1.0,
inf_upper_bound = 1000000.0, 1000000.0, inf_upper_bound = 1000000.0, 1000000.0,
inf_damping = 0.9, 1.0, inf_damping = 0.9, 1.0,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment