From 500cc4cf3393c07784616b4a14dbda56eb4001c5 Mon Sep 17 00:00:00 2001 From: lkugler <lukas.kugler@gmail.com> Date: Tue, 17 Aug 2021 19:07:34 +0200 Subject: [PATCH] update --- config/cfg.py | 10 +- config/clusters.py | 26 +- generate_nature.py | 203 +--------- scheduler.py | 135 +++---- scripts/apply_obs_op_dart.py | 36 +- scripts/archive_assim.py | 14 +- scripts/assim_synth_obs.py | 214 +++++++---- scripts/assim_synth_obs_separate_obstypes.py | 378 +++++++++++++++++++ scripts/config | 1 + scripts/create_obsseq.py | 203 ++++++++-- scripts/create_wbubble_wrfinput.py | 41 ++ scripts/link_rttov.py | 33 ++ scripts/pre_assim.py | 2 + scripts/prepare_dart.py | 35 -- scripts/prepare_namelist.py | 7 +- scripts/prepare_wrfinput.py | 7 +- scripts/run_ens.vsc.sh | 6 +- scripts/update_wrfinput_from_filteroutput.py | 9 +- scripts/wrfout_add_geo.py | 28 +- templates/input.nml | 6 +- templates/namelist.input | 14 +- templates/obs_def_rttov.IR.nml | 1 - templates/obs_def_rttov.VIS.nml | 8 +- 23 files changed, 969 insertions(+), 448 deletions(-) create mode 100755 scripts/assim_synth_obs_separate_obstypes.py create mode 120000 scripts/config create mode 100644 scripts/create_wbubble_wrfinput.py create mode 100644 scripts/link_rttov.py delete mode 100755 scripts/prepare_dart.py diff --git a/config/cfg.py b/config/cfg.py index f246dce..005b1e1 100755 --- a/config/cfg.py +++ b/config/cfg.py @@ -1,5 +1,5 @@ import numpy as np -from . import clusters +from config import clusters # from . = problem in archivedir cluster = clusters.vsc # change cluster configuration here @@ -9,12 +9,12 @@ class ExperimentConfiguration(object): exp = ExperimentConfiguration() -exp.expname = "exp_v1.16_P0-4_Radar5+I-test" +exp.expname = "exp_v1.16_Pwbub-1_40mem" exp.model_dx = 2000 exp.n_ens = 40 exp.n_nodes = 10 -n_obs = 4 #1600 # 50km res: 64:8x8; 10km res: 1600:40x40 # radar: n_obs for each observation height level +n_obs = 121 #961 900: 10km resoltn # radar: n_obs for each observation height level vis = dict(plotname='VIS 0.6µm', plotunits='[1]', kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs, @@ -48,7 +48,9 @@ psfc = dict(plotname='SYNOP Pressure', plotunits='[dBz]', cov_loc_radius_km=32, cov_loc_vert_km=5) -exp.observations = [vis, wv73, ] # 108, wv73, vis] +exp.observations = [] #wv73, vis] # 108, wv73, vis] +exp.update_vars = ['T', 'QVAPOR', 'QCLOUD', 'QICE','CLDFRA'] +#exp.update_vars = ['U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'TSK', 'CLDFRA'] # directory paths depend on the name of the experiment cluster.expname = exp.expname diff --git a/config/clusters.py b/config/clusters.py index 792d80e..0a688b8 100755 --- a/config/clusters.py +++ b/config/clusters.py @@ -6,11 +6,18 @@ class ClusterConfig(object): def __init__(self): pass + @property def archivedir(self): return self.archive_base+'/'+self.expname def wrf_rundir(self, iens): - return self.userdir+'/run_WRF/'+self.expname+'/'+str(iens) + return '/gpfs/data/fs71386/lkugler/run_WRF/'+self.expname+'/'+str(iens) + + @property + def scripts_rundir(self): + return self.archivedir+'/DART-WRF/' + + ####################################################################################### @@ -19,21 +26,22 @@ vsc = ClusterConfig() vsc.name = 'vsc' vsc.python = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/python' vsc.ncks = '/home/fs71386/lkugler/miniconda3/envs/DART/bin/ncks' +vsc.tmpfiledir = '/gpfs/data/fs71386/lkugler' vsc.userdir = '/home/fs71386/lkugler' -vsc.srcdir = '/home/fs71386/lkugler/compile/WRF/WRF-4.2.2/run' +vsc.srcdir = '/gpfs/data/fs71386/lkugler/compile/WRF/WRF-4.2.2/run' vsc.archive_base = '/gpfs/data/fs71386/lkugler/sim_archive/' vsc.dart_srcdir = '/home/fs71386/lkugler/DART/DART-9.9.0/models/wrf/work' -vsc.dartrundir = '/home/fs71386/lkugler/run_DART' -vsc.scriptsdir = '/home/fs71386/lkugler/DART-WRF/scripts' +vsc.dartrundir = '/gpfs/data/fs71386/lkugler/run_DART' +vsc.scriptsdir = '/home/fs71386/lkugler/DART-WRF/scripts/' -vsc.nature_wrfout = '/home/fs71386/lkugler/data/sim_archive/exp_v1.16_P1_nature/2008-07-30_06:00/1/wrfout_d01_%Y-%m-%d_%H:%M:%S' +vsc.nature_wrfout = '/home/fs71386/lkugler/data/sim_archive/exp_v1.16_P1_nature_tstep/2008-07-30_06:00/1/wrfout_d01_%Y-%m-%d_%H:%M:%S' #vsc.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/from_LMU/raso.nat.<iens>.wrfprof' -vsc.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/2021-05-04/raso.fc.<iens>.wrfprof' +vsc.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/2021-05-04/raso.nat.1.wrfprof' vsc.ideal = vsc.userdir+'/compile/bin/ideal-v4.2.2_v1.16.exe' -vsc.wrfexe = vsc.userdir+'/compile/bin/wrf-v4.2.2_v1.16.exe' +vsc.wrfexe = vsc.userdir+'/compile/bin/wrf-v4.3_v1.16.exe' vsc.namelist = vsc.scriptsdir+'/../templates/namelist.input' -vsc.run_WRF = '/gpfs/data/fs71386/lkugler/DART-WRF/scripts/osse/run_ens.vsc.sh' +vsc.run_WRF = '/home/fs71386/lkugler/DART-WRF/scripts/run_ens.vsc.sh' vsc.slurm_cfg = {"account": "p71386", "partition": "mem_0384", "qos": "p71386_0384", "ntasks": "1", "nodes": "1", "ntasks-per-node": "48", "ntasks-per-core": "1", @@ -46,9 +54,9 @@ jet.python = '/jetfs/home/lkugler/miniconda3/bin/python' jet.ncks = 'ncks' jet.userdir = '/jetfs/home/lkugler' jet.srcdir = '/jetfs/home/lkugler/compile/WRF/WRF-4.1.5/run' +jet.scriptsdir = '' jet.archive_base = '/jetfs/home/lkugler/data_jetfs/sim_archive/' jet.dartrundir = '/jetfs/home/lkugler/DART-WRF/rundir' -jet.scriptsdir = '/jetfs/home/lkugler/DART-WRF/scripts/osse' jet.nature_wrfout = '/raid61/scratch/lkugler/VSC/sim_archive/OSSE_v1.10_LMU+shear/2/single/wrfout_d01_%Y-%m-%d_%H:%M:%S' jet.ideal = jet.userdir+'/compile/bin/ideal.exe' diff --git a/generate_nature.py b/generate_nature.py index e646b58..ca9db9a 100755 --- a/generate_nature.py +++ b/generate_nature.py @@ -7,195 +7,21 @@ import os, sys, shutil import datetime as dt from slurmpy import Slurm -# necessary to find modules in folder, since SLURM runs the script elsewhere -sys.path.append(os.getcwd()) - from config.cfg import exp, cluster from scripts.utils import script_to_str, symlink +# necessary to find modules in folder, since SLURM runs the script elsewhere +sys.path.append(os.getcwd()) + # allow scripts to access the configuration -symlink(cluster.scriptsdir+'/../config', cluster.scriptsdir+'/config') +# symlink(cluster.scriptsdir+'/../config', cluster.scriptsdir+'/config') -log_dir = cluster.archivedir()+'/logs/' -slurm_scripts_dir = cluster.archivedir()+'/slurm-scripts/' +log_dir = cluster.archivedir+'/logs/' +slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/' print('logging to', log_dir) print('scripts, which are submitted to SLURM:', slurm_scripts_dir) -def my_Slurm(*args, cfg_update=dict(), **kwargs): - """Shortcut to slurmpy's class; keep certain default kwargs - and only update some with kwarg `cfg_update` - see https://github.com/brentp/slurmpy - """ - return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update), - log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs) - -class Cmdline(object): - def __init__(self, name, cfg_update): - self.name = name - - def run(self, cmd, **kwargs): - print('running', self.name, 'without SLURM') - os.system(cmd) - -from scheduler import backup_scripts - -def prepare_wrfinput(): - """Create WRF/run directories and wrfinput files - """ - s = my_Slurm("prep_wrfinput", cfg_update={"time": "5", "mail-type": "BEGIN"}) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py') - - cmd = """# run ideal.exe in parallel, then add geodata -export SLURM_STEP_GRES=none -for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) -do - rundir="""+cluster.userdir+'/run_WRF/'+exp.expname+"""/$n - echo $rundir - cd $rundir - mpirun -np 1 ./ideal.exe & -done -wait -for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) -do - rundir="""+cluster.userdir+'/run_WRF/'+exp.expname+"""/$n - mv $rundir/rsl.out.0000 $rundir/rsl.out.input -done -""" - s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens), "nodes": "1", - "time": "10", "mem-per-cpu": "2G"}) - id = s.run(cmd, depends_on=[id]) - return id - -def update_wrfinput_from_archive(valid_time, background_init_time, exppath, depends_on=None): - """Given that directories with wrfinput files exist, - update these wrfinput files according to wrfout files - """ - s = my_Slurm("upd_wrfinput", cfg_update={"time": "5"}) - - # path of initial conditions, <iens> is replaced by member index - IC_path = exppath + background_init_time.strftime('/%Y-%m-%d_%H:%M/') \ - +'*iens*/'+valid_time.strftime('/wrfout_d01_%Y-%m-%d_%H:%M:%S') - id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_wrfout.py ' - +IC_path, depends_on=[depends_on]) - return id - -def run_ENS(begin, end, depends_on=None): - """Run forecast for 1 minute, save output. - Then run whole timespan with 5 minutes interval. - """ - id = depends_on - - # first minute forecast (needed for validating an assimilation) - hist_interval = 1 - radt = 1 # calc CFRAC also in first minute - begin_plus1 = begin+dt.timedelta(minutes=1) - s = my_Slurm("preWRF1", cfg_update=dict(time="2")) - id = s.run(' '.join([cluster.python, - cluster.scriptsdir+'/prepare_namelist.py', - begin.strftime('%Y-%m-%d_%H:%M'), - begin_plus1.strftime('%Y-%m-%d_%H:%M'), - str(hist_interval), str(radt),]), - depends_on=[id]) - - s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), - "time": "2", "mem-per-cpu": "2G"}) - cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname) - id = s.run(cmd, depends_on=[id]) - - # apply forward operator (DART filter without assimilation) - s = my_Slurm("fwOP-1m", cfg_update=dict(time="10", ntasks=48)) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/apply_obs_op_dart.py ' - + begin.strftime('%Y-%m-%d_%H:%M')+' ' - + begin_plus1.strftime('%Y-%m-%d_%H:%M'), - depends_on=[id]) - - # whole forecast timespan - hist_interval = 5 - radt = 5 - s = my_Slurm("preWRF2", cfg_update=dict(time="2")) - id = s.run(' '.join([cluster.python, - cluster.scriptsdir+'/prepare_namelist.py', - begin.strftime('%Y-%m-%d_%H:%M'), - end.strftime('%Y-%m-%d_%H:%M'), - str(hist_interval), str(radt),]), - depends_on=[id]) - - time_in_simulation_hours = (end-begin).total_seconds()/3600 - runtime_wallclock_mins_expected = int(4+time_in_simulation_hours*9) # usually below 8 min/hour - s = my_Slurm("runWRF2", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes), - "time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"}) - cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname) - id = s.run(cmd, depends_on=[id]) - - # not needed, since wrf.exe writes directly to archive folder - #s = my_Slurm("archiveWRF", cfg_update=dict(nodes="1", ntasks="1", time="10")) - #id3 = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_wrf.py ' - # + begin.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2]) - return id - -def assimilate(assim_time, prior_init_time, - prior_path_exp=False, depends_on=None): - """Creates observations from a nature run and assimilates them. - - Args: - assim_time (dt.datetime): timestamp of prior wrfout files - prior_init_time (dt.datetime): - timestamp to find the directory where the prior wrfout files are - prior_path_exp (bool or str): - put a `str` to take the prior from a different experiment - if False: use `archivedir` (defined in config) to get prior state - if str: use this directory to get prior state - """ - if not prior_path_exp: - prior_path_exp = cluster.archivedir() - elif not isinstance(prior_path_exp, str): - raise TypeError('prior_path_exp either str or False, is '+str(type(prior_path_exp))) - - # prepare state of nature run, from which observation is sampled - #s = my_Slurm("prepNature", cfg_update=dict(time="2")) - #id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_nature.py ' - # +time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on]) - - # prepare prior model state - s = my_Slurm("preAssim", cfg_update=dict(time="2")) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/pre_assim.py ' - +assim_time.strftime('%Y-%m-%d_%H:%M ') - +prior_init_time.strftime('%Y-%m-%d_%H:%M ') - +prior_path_exp, depends_on=[depends_on]) - - # prepare nature run, generate observations - s = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "30", - "mem": "300G", "ntasks-per-node": "96", "ntasks-per-core": "2"}) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/assim_synth_obs.py ' - +time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id]) - - # # actuall assimilation step - # s = my_Slurm("Assim", cfg_update=dict(nodes="1", ntasks="48", time="50", mem="200G")) - # cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./filter; rm obs_seq_all.out' - # id = s.run(cmd, depends_on=[id]) - - # s = my_Slurm("archiveAssim", cfg_update=dict(time="10")) - # id = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py ' - # + assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id]) - - s = my_Slurm("updateIC", cfg_update=dict(time="8")) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_filteroutput.py ' - +assim_time.strftime('%Y-%m-%d_%H:%M ') - +prior_init_time.strftime('%Y-%m-%d_%H:%M ') - +prior_path_exp, depends_on=[id]) - return id - - -def create_satimages(depends_on=None): - s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "40"}) - s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/loop.py '+exp.expname, - depends_on=[depends_on]) - -def mailme(depends_on=None): - if depends_on: - s = my_Slurm("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"}) - s.run('sleep 1', depends_on=[depends_on]) - +from scheduler import * ################################ print('starting osse') @@ -203,10 +29,19 @@ print('starting osse') backup_scripts() id = None -id = prepare_wrfinput() # create initial conditions +init_time = dt.datetime(2008, 7, 30, 9) +id = prepare_wrfinput(init_time) # create initial conditions + +# get initial conditions from archive +integration_end_time = dt.datetime(2008, 7, 30, 12) +#exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_P1_40mem' +#id = update_wrfinput_from_archive(integration_end_time, init_time, exppath_arch, depends_on=id) + +id = wrfinput_insert_wbubble(depends_on=id) + -begin = dt.datetime(2008, 7, 30, 6, 0) -end = dt.datetime(2008, 7, 30, 21, 0) +begin = dt.datetime(2008, 7, 30, 9, 0) +end = dt.datetime(2008, 7, 30, 12, 0) # whole forecast timespan hist_interval = 5 diff --git a/scheduler.py b/scheduler.py index 7828ed1..7c70e6e 100755 --- a/scheduler.py +++ b/scheduler.py @@ -15,10 +15,10 @@ from scripts.utils import script_to_str, symlink sys.path.append(os.getcwd()) # allow scripts to access the configuration -symlink(cluster.scriptsdir+'/../config', cluster.scriptsdir+'/config') +# symlink(cluster.scriptsdir+'/../config', cluster.scriptsdir+'/config') -log_dir = cluster.archivedir()+'/logs/' -slurm_scripts_dir = cluster.archivedir()+'/slurm-scripts/' +log_dir = cluster.archivedir+'/logs/' +slurm_scripts_dir = cluster.archivedir+'/slurm-scripts/' print('logging to', log_dir) print('scripts, which are submitted to SLURM:', slurm_scripts_dir) @@ -30,52 +30,36 @@ def my_Slurm(*args, cfg_update=dict(), **kwargs): return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update), log_dir=log_dir, scripts_dir=slurm_scripts_dir, **kwargs) -class Cmdline(object): - def __init__(self, name, cfg_update): - self.name = name - - def run(self, cmd, **kwargs): - print('running', self.name, 'without SLURM') - os.system(cmd) - def backup_scripts(): current = cluster.scriptsdir - main_a = cluster.archivedir()+'/DART-WRF/' - old_a = main_a+'/old/' - - os.makedirs(cluster.archivedir(), exist_ok=True) - os.makedirs(main_a, exist_ok=True) - os.makedirs(old_a, exist_ok=True) - - def func(a, b, method): # call method if not link or directory - if os.path.islink(a) or os.path.isdir(a): - pass - else: - method(a, b) - - # archive existing files - for f in os.listdir(main_a): - func(os.path.join(main_a, f), old_a+'/'+f, shutil.move) - - # reproducibility - for f in ['scheduler.py', 'config/clusters.py', 'config/cfg.py']: - fname = os.path.basename(f) - func(current+'/../'+f, main_a+'/'+fname, shutil.copy) - - for f in os.listdir(current): - func(os.path.join(current, f), main_a+'/', shutil.copy) - -def prepare_wrfinput(): + main_a = cluster.scripts_rundir + # old_a = main_a+'/old/' + os.makedirs(cluster.archivedir, exist_ok=True) + + # def func(a, b, method): # call method if not link or directory + # if os.path.islink(a) or os.path.isdir(a): + # pass + # else: + # method(a, b) + try: + shutil.copytree(cluster.scriptsdir, cluster.scripts_rundir) + except FileExistsError: + pass + except: + raise + +def prepare_wrfinput(init_time): """Create WRF/run directories and wrfinput files """ s = my_Slurm("prep_wrfinput", cfg_update={"time": "10", "mail-type": "BEGIN"}) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py') + id = s.run(cluster.python+' '+cluster.scripts_rundir+'/prepare_wrfinput.py ' + +init_time.strftime('%Y-%m-%d_%H:%M')) cmd = """# run ideal.exe in parallel, then add geodata export SLURM_STEP_GRES=none for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) do - rundir="""+cluster.userdir+'/run_WRF/'+exp.expname+"""/$n + rundir="""+cluster.tmpfiledir+'/run_WRF/'+exp.expname+"""/$n echo $rundir cd $rundir mpirun -np 1 ./ideal.exe & @@ -83,7 +67,7 @@ done wait for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) do - rundir="""+cluster.userdir+'/run_WRF/'+exp.expname+"""/$n + rundir="""+cluster.tmpfiledir+'/run_WRF/'+exp.expname+"""/$n mv $rundir/rsl.out.0000 $rundir/rsl.out.input done """ @@ -101,10 +85,19 @@ def update_wrfinput_from_archive(valid_time, background_init_time, exppath, depe # path of initial conditions, <iens> is replaced by member index IC_path = exppath + background_init_time.strftime('/%Y-%m-%d_%H:%M/') \ +'*iens*/'+valid_time.strftime('/wrfout_d01_%Y-%m-%d_%H:%M:%S') - id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_wrfout.py ' + id = s.run(cluster.python+' '+cluster.scripts_rundir+'/update_wrfinput_from_wrfout.py ' +IC_path, depends_on=[depends_on]) return id +def wrfinput_insert_wbubble(depends_on=None): + """Given that directories with wrfinput files exist, + update these wrfinput files with warm bubbles + """ + s = my_Slurm("ins_wbubble", cfg_update={"time": "5"}) + id = s.run(cluster.python+' '+cluster.scripts_rundir+'/create_wbubble_wrfinput.py', + depends_on=[depends_on]) + return id + def run_ENS(begin, end, depends_on=None, first_minute=True): """Run forecast for 1 minute, save output. Then run whole timespan with 5 minutes interval. @@ -118,7 +111,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True): begin_plus1 = begin+dt.timedelta(minutes=1) s = my_Slurm("preWRF1", cfg_update=dict(time="2")) id = s.run(' '.join([cluster.python, - cluster.scriptsdir+'/prepare_namelist.py', + cluster.scripts_rundir+'/prepare_namelist.py', begin.strftime('%Y-%m-%d_%H:%M'), begin_plus1.strftime('%Y-%m-%d_%H:%M'), str(hist_interval), str(radt),]), @@ -131,7 +124,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True): # apply forward operator (DART filter without assimilation) s = my_Slurm("fwOP-1m", cfg_update=dict(time="10", ntasks=48)) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/apply_obs_op_dart.py ' + id = s.run(cluster.python+' '+cluster.scripts_rundir+'/apply_obs_op_dart.py ' + begin.strftime('%Y-%m-%d_%H:%M')+' ' + begin_plus1.strftime('%Y-%m-%d_%H:%M'), depends_on=[id]) @@ -141,7 +134,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True): radt = 5 s = my_Slurm("preWRF2", cfg_update=dict(time="2")) id = s.run(' '.join([cluster.python, - cluster.scriptsdir+'/prepare_namelist.py', + cluster.scripts_rundir+'/prepare_namelist.py', begin.strftime('%Y-%m-%d_%H:%M'), end.strftime('%Y-%m-%d_%H:%M'), str(hist_interval), str(radt),]), @@ -156,7 +149,7 @@ def run_ENS(begin, end, depends_on=None, first_minute=True): # not needed, since wrf.exe writes directly to archive folder #s = my_Slurm("archiveWRF", cfg_update=dict(nodes="1", ntasks="1", time="10")) - #id3 = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_wrf.py ' + #id3 = s.run(cluster.python+' '+cluster.scripts_rundir+'/archive_wrf.py ' # + begin.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2]) return id @@ -174,26 +167,26 @@ def assimilate(assim_time, prior_init_time, if str: use this directory to get prior state """ if not prior_path_exp: - prior_path_exp = cluster.archivedir() + prior_path_exp = cluster.archivedir elif not isinstance(prior_path_exp, str): raise TypeError('prior_path_exp either str or False, is '+str(type(prior_path_exp))) # prepare state of nature run, from which observation is sampled #s = my_Slurm("prepNature", cfg_update=dict(time="2")) - #id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_nature.py ' + #id = s.run(cluster.python+' '+cluster.scripts_rundir+'/prepare_nature.py ' # +time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on]) # prepare prior model state s = my_Slurm("preAssim", cfg_update=dict(time="2")) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/pre_assim.py ' + id = s.run(cluster.python+' '+cluster.scripts_rundir+'/pre_assim.py ' +assim_time.strftime('%Y-%m-%d_%H:%M ') +prior_init_time.strftime('%Y-%m-%d_%H:%M ') +prior_path_exp, depends_on=[depends_on]) # prepare nature run, generate observations - s = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "30", + s = my_Slurm("Assim", cfg_update={"nodes": "1", "ntasks": "96", "time": "60", "mem": "300G", "ntasks-per-node": "96", "ntasks-per-core": "2"}) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/assim_synth_obs.py ' + id = s.run(cluster.python+' '+cluster.scripts_rundir+'/assim_synth_obs.py ' +assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id]) # # actuall assimilation step @@ -202,11 +195,11 @@ def assimilate(assim_time, prior_init_time, # id = s.run(cmd, depends_on=[id]) # s = my_Slurm("archiveAssim", cfg_update=dict(time="10")) - # id = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py ' + # id = s.run(cluster.python+' '+cluster.scripts_rundir+'/archive_assim.py ' # + assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id]) s = my_Slurm("updateIC", cfg_update=dict(time="8")) - id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_filteroutput.py ' + id = s.run(cluster.python+' '+cluster.scripts_rundir+'/update_wrfinput_from_filteroutput.py ' +assim_time.strftime('%Y-%m-%d_%H:%M ') +prior_init_time.strftime('%Y-%m-%d_%H:%M ') +prior_path_exp, depends_on=[id]) @@ -215,7 +208,7 @@ def assimilate(assim_time, prior_init_time, def create_satimages(init_time, depends_on=None): s = my_Slurm("pRTTOV", cfg_update={"ntasks": "48", "time": "30"}) - s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/run_init.py '+cluster.archivedir() + s.run(cluster.python+' /home/fs71386/lkugler/RTTOV-WRF/run_init.py '+cluster.archivedir +init_time.strftime('/%Y-%m-%d_%H:%M/'), depends_on=[depends_on]) @@ -226,27 +219,32 @@ def mailme(depends_on=None): def gen_obsseq(depends_on=None): s = my_Slurm("obsseq_netcdf", cfg_update={"time": "10", "mail-type": "FAIL,END"}) - s.run(cluster.python+' '+cluster.scriptsdir+'/obsseq_to_netcdf.py', + s.run(cluster.python+' '+cluster.scripts_rundir+'/obsseq_to_netcdf.py', + depends_on=[depends_on]) + +def copy_to_jet(depends_on=None): + s = my_Slurm("rsync-jet", cfg_update={"time": "1", "mail-type": "FAIL"}) + s.run('bash rsync -avh '+cluster.archivedir+' lkugler@jet01.img.univie.ac.at:/jetfs/home/lkugler/data_jetfs/sim_archive/ &', depends_on=[depends_on]) ################################ if __name__ == "__main__": print('starting osse') - timedelta_integrate = dt.timedelta(minutes=75) - timedelta_btw_assim = dt.timedelta(minutes=60) + timedelta_integrate = dt.timedelta(minutes=10) + timedelta_btw_assim = dt.timedelta(minutes=10) backup_scripts() id = None - start_from_existing_state = False + start_from_existing_state = True is_new_run = not start_from_existing_state if is_new_run: - id = prepare_wrfinput() # create initial conditions + init_time = dt.datetime(2008, 7, 30, 6) + id = prepare_wrfinput(init_time) # create initial conditions # spin up the ensemble - init_time = dt.datetime(2008, 7, 30, 6) integration_end_time = dt.datetime(2008, 7, 30, 9) id = run_ENS(begin=init_time, end=integration_end_time, @@ -255,13 +253,14 @@ if __name__ == "__main__": prior_path_exp = False # for next assimilation elif start_from_existing_state: - id = prepare_wrfinput() # create initial conditions + init_time = dt.datetime(2008, 7, 30, 6) + id = prepare_wrfinput(init_time) # create initial conditions # get initial conditions from archive - init_time = dt.datetime(2008, 7, 30, 6) - integration_end_time = dt.datetime(2008, 7, 30, 9) - exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_P0_40mem' - id = update_wrfinput_from_archive(integration_end_time, init_time, exppath_arch, depends_on=id) + integration_end_time = dt.datetime(2008, 7, 30, 13) + exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.16_P1_40mem' + #id = update_wrfinput_from_archive(integration_end_time, init_time, exppath_arch, depends_on=id) + id = wrfinput_insert_wbubble(depends_on=id) prior_path_exp = exppath_arch # for next assimilation # values for assimilation @@ -269,7 +268,7 @@ if __name__ == "__main__": assim_time = integration_end_time prior_init_time = init_time - while time <= dt.datetime(2008, 7, 30, 20): + while time <= dt.datetime(2008, 7, 30, 14): id = assimilate(assim_time, prior_init_time, @@ -279,6 +278,11 @@ if __name__ == "__main__": # integration this_forecast_init = assim_time # start integration from here + + timedelta_integrate = timedelta_btw_assim + if this_forecast_init.minute == 0 and this_forecast_init.hour != dt.datetime(2008, 7, 30, 13): # longer forecast every full hour + timedelta_integrate = dt.timedelta(minutes=3*60) + this_forecast_end = assim_time + timedelta_integrate id = run_ENS(begin=this_forecast_init, @@ -295,3 +299,4 @@ if __name__ == "__main__": prior_init_time = time - timedelta_btw_assim gen_obsseq(id) # mailme(id) + copy_to_jet(id) diff --git a/scripts/apply_obs_op_dart.py b/scripts/apply_obs_op_dart.py index 3ce490d..8db2c2e 100755 --- a/scripts/apply_obs_op_dart.py +++ b/scripts/apply_obs_op_dart.py @@ -20,18 +20,39 @@ usually applied to 1 min forecasts to assess the posterior analysis quality """ - if __name__ == '__main__': prev_forecast_init = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M') - exppath_firstguess = cluster.archivedir() + exppath_firstguess = cluster.archivedir print(prev_forecast_init, time) # link ensemble states to run_DART directory # we want the observation operator applied to these states! pre_assim.run(time, prev_forecast_init, exppath_firstguess) - savedir = cluster.archivedir()+'/obs_seq_final_1min/' + savedir = cluster.archivedir+'/obs_seq_final_1min/' + + first_obscfg = exp.observations[0] + aso.set_DART_nml(cov_loc_radius_km=first_obscfg['cov_loc_radius_km'], + cov_loc_vert_km=first_obscfg.get('cov_loc_vert_km', False), + just_prior_values=True) + osq.create_obsseqin_alltypes(time, exp.observations, obs_errors=False) + + # prepare dummy nature (this Hx is irrelevant) + os.chdir(cluster.dartrundir) + os.system('cp ./advance_temp1/wrfout_d01 ./wrfout_d01') + wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', + cluster.dartrundir+'/wrfout_d01') + aso.run_perfect_model_obs() + aso.assimilate(nproc=96) + + # only the prior state values are of interest in this file + # observation and truth is wrong in this file (dummy) + istage = 0 + archive_stage = savedir+'/assim_stage'+str(istage) + aso.archive_diagnostics(archive_stage, time) + + sys.exit() # multi stage below n_stages = len(exp.observations) for istage, obscfg in enumerate(exp.observations): @@ -40,13 +61,14 @@ if __name__ == '__main__': sat_channel = obscfg.get('sat_channel', False) obscfg['folder_obs_coords'] = False - aso.set_DART_nml(sat_channel=sat_channel, - cov_loc_radius_km=obscfg['cov_loc_radius_km'], - cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False), - just_prior_values=True) + aso.set_DART_nml(cov_loc_radius_km=obscfg['cov_loc_radius_km'], + cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False), + just_prior_values=True) osq.create_obsseq_in(time, obscfg) + + # prepare dummy nature (this Hx is irrelevant) os.chdir(cluster.dartrundir) os.system('cp ./advance_temp1/wrfout_d01 ./wrfout_d01') diff --git a/scripts/archive_assim.py b/scripts/archive_assim.py index ad2c81b..66553e2 100755 --- a/scripts/archive_assim.py +++ b/scripts/archive_assim.py @@ -1,6 +1,6 @@ import os, sys, warnings, glob import datetime as dt -from config.cfg import exp, cluster +from cfg import exp, cluster from utils import symlink, copy_scp_srvx8, copy, mkdir, mkdir_srvx8, clean_wrfdir # if cluster.name != 'srvx8': @@ -11,7 +11,7 @@ time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') try: print('archive obs space diagnostics') - savedir = cluster.archivedir()+'/obs_seq_final/' + savedir = cluster.archivedir+'/obs_seq_final/' mkdir(savedir) copy(cluster.dartrundir+'/obs_seq.final', savedir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final')) except Exception as e: @@ -19,7 +19,7 @@ except Exception as e: try: print('archive regression diagnostics') - savedir = cluster.archivedir()+'/reg_factor/' + savedir = cluster.archivedir+'/reg_factor/' mkdir(savedir) copy(cluster.dartrundir+'/reg_diagnostics', savedir+time.strftime('/%Y-%m-%d_%H:%M_reg_diagnostics')) except Exception as e: @@ -28,15 +28,15 @@ except Exception as e: print('archive model state') try: - mkdir(cluster.archivedir()) + mkdir(cluster.archivedir) print('copy prior posterior members to archive') for iens in range(1, exp.n_ens+1): - savedir = cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/')+str(iens) + savedir = cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/')+str(iens) mkdir(savedir) copy(cluster.dartrundir+'/input.nml', - cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/input.nml')) + cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/input.nml')) filter_in = cluster.dartrundir+'/preassim_member_'+str(iens).zfill(4)+'.nc' filter_out = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4) @@ -51,7 +51,7 @@ try: for f in ['preassim_mean.nc', 'preassim_sd.nc', 'output_mean.nc', 'output_sd.nc']: copy(cluster.dartrundir+'/'+f, - cluster.archivedir()+'/'+f[:-3]+time.strftime('_%Y-%m-%d_%H:%M:%S')) + cluster.archivedir+'/'+f[:-3]+time.strftime('_%Y-%m-%d_%H:%M:%S')) except Exception as e: warnings.warn(str(e)) diff --git a/scripts/assim_synth_obs.py b/scripts/assim_synth_obs.py index e0648fc..b15eec7 100755 --- a/scripts/assim_synth_obs.py +++ b/scripts/assim_synth_obs.py @@ -1,4 +1,5 @@ import os, sys, shutil, warnings +import time as time_module import datetime as dt import numpy as np from scipy.interpolate import interp1d @@ -106,9 +107,8 @@ def replace_errors_obsseqout(f, new_errors): print('replaced obs errors in', f) -def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False, +def set_DART_nml_singleobstype(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False, just_prior_values=False): - """descr""" cov_loc_radian = cov_loc_radius_km/earth_radius_km if just_prior_values: @@ -144,43 +144,71 @@ def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False, rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml' append_file(cluster.dartrundir+'/input.nml', rttov_nml) +def set_DART_nml(cov_loc_radius_km=32, cov_loc_vert_km=False, just_prior_values=False): + cov_loc_radian = cov_loc_radius_km/earth_radius_km + + if just_prior_values: + template = cluster.scriptsdir+'/../templates/input.eval.nml' + else: + template = cluster.scriptsdir+'/../templates/input.nml' + copy(template, cluster.dartrundir+'/input.nml') + + # options keys are replaced in input.nml with values + options = {'<n_ens>': str(int(exp.n_ens)), + '<cov_loc_radian>': str(cov_loc_radian)} + + if cov_loc_vert_km: + vert_norm_rad = earth_radius_km*cov_loc_vert_km/cov_loc_radius_km*1000 + options['<horiz_dist_only>'] = '.false.' + options['<vert_norm_hgt>'] = str(vert_norm_rad) + else: + options['<horiz_dist_only>'] = '.true.' + options['<vert_norm_hgt>'] = '50000.0' # dummy value + + for key, value in options.items(): + sed_inplace(cluster.dartrundir+'/input.nml', key, value) + + # input.nml for RTTOV + rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml' + append_file(cluster.dartrundir+'/input.nml', rttov_nml) + + def obs_operator_ensemble(istage): # assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01 print('running obs operator on ensemble forecast') os.chdir(cluster.dartrundir) - if sat_channel: - list_ensemble_truths = [] + list_ensemble_truths = [] + t = time_module.time() - for iens in range(1, exp.n_ens+1): - print('observation operator for ens #'+str(iens)) - # ens members are already linked to advance_temp<i>/wrfout_d01 - copy(cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01', - cluster.dartrundir+'/wrfout_d01') - # DART may need a wrfinput file as well, which serves as a template for dimension sizes - symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01') - - # add geodata, if istage>0, wrfout is DART output (has coords) - if istage == 0: - wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') + for iens in range(1, exp.n_ens+1): + print('observation operator for ens #'+str(iens)) + # ens members are already linked to advance_temp<i>/wrfout_d01 + copy(cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01', + cluster.dartrundir+'/wrfout_d01') + # DART may need a wrfinput file as well, which serves as a template for dimension sizes + symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01') + + # add geodata, if istage>0, wrfout is DART output (has coords) + if istage == 0: + wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') - # run perfect_model obs (forward operator) - os.system('mpirun -np 12 ./perfect_model_obs > /dev/null') + # run perfect_model obs (forward operator) + os.system('mpirun -np 12 ./perfect_model_obs > /dev/null') - # truth values in obs_seq.out are H(x) values - true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') - list_ensemble_truths.append(true) - - n_obs = len(list_ensemble_truths[0]) - np_array = np.full((exp.n_ens, n_obs), np.nan) - for i in range(exp.n_ens): - np_array[i, :] = list_ensemble_truths[i] - return np_array - else: - raise NotImplementedError() + # truth values in obs_seq.out are H(x) values + true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') + list_ensemble_truths.append(true) + + print('obs operator ensemble took', int(time_module.time()-t), 'seconds') + n_obs = len(list_ensemble_truths[0]) + np_array = np.full((exp.n_ens, n_obs), np.nan) + for i in range(exp.n_ens): + np_array[i, :] = list_ensemble_truths[i] + return np_array def obs_operator_nature(time): - print('running obs operator on nature run') + print('getting true values in obs space from nature run') prepare_nature_dart(time) run_perfect_model_obs() true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') @@ -196,6 +224,7 @@ def link_nature_to_dart_truth(time): def prepare_nature_dart(time): + print('linking nature to DART & georeferencing') link_nature_to_dart_truth(time) wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') @@ -217,6 +246,7 @@ def calc_obserr_WV73(Hx_nature, Hx_prior): return OEs def run_perfect_model_obs(): + print('generating observations - running ./perfect_model_obs') os.chdir(cluster.dartrundir) try_remove(cluster.dartrundir+'/obs_seq.out') if not os.path.exists(cluster.dartrundir+'/obs_seq.in'): @@ -227,10 +257,13 @@ def run_perfect_model_obs(): '\n look for '+cluster.dartrundir+'log.perfect_model_obs') def assimilate(nproc=96): + print('time now', dt.datetime.now()) print('running filter') os.chdir(cluster.dartrundir) try_remove(cluster.dartrundir+'/obs_seq.final') + t = time_module.time() os.system('mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-'+str(int(nproc)-1)+' -np '+str(int(nproc))+' ./filter > log.filter') + print('./filter took', int(time_module.time()-t), 'seconds') def archive_diagnostics(archive_dir, time): print('archive obs space diagnostics') @@ -239,11 +272,6 @@ def archive_diagnostics(archive_dir, time): copy(cluster.dartrundir+'/obs_seq.final', fout) print(fout, 'saved.') - try: - copy(cluster.dartrundir+'/obs_coords.pkl', archive_stage+'/obs_coords.pkl') - except Exception as e: - warnings.warn(str(e)) - # try: # what are regression diagnostics?! # print('archive regression diagnostics') # copy(cluster.dartrundir+'/reg_diagnostics', archive_dir+'/reg_diagnostics') @@ -293,33 +321,93 @@ def archive_output(archive_stage): if __name__ == "__main__": - """Generate observations (obs_seq.out file) + """Assimilate observations (different obs types) as defined in config/cfg.py for a certain timestamp (argument) of the nature run (defined in config/clusters.py) Workflow: for each assimilation stage (one obs_seq.in and e.g. one observation type): - 1) prepare nature run for DART - optional: 2) calculate obs-error from parametrization - 3) create obs_seq.in with obs-errors from 2) - 4) generate actual observations (obs_seq.out) with obs_seq.in from 3) - - - calculate obs-error from parametrization - 1) create obs_seq.in with obs error=0 - 2) calculate y_nat = H(x_nature) and y_ens = H(x_ensemble) - 3) calculate obs error as function of y_nat, y_ensmean + 1) create obs_seq.in with obs-errors + 2) prepare nature run for DART + 3) create obs from nature (obs_seq.out) + 4) Assimilate + - adapt obs errors for assimilation + - calculate assim obs error from parametrization + 1) create obs_seq.in with obs error=0 + 2) calculate y_nat = H(x_nature) and y_ens = H(x_ensemble) + 3) calculate obs error as function of y_nat, y_ensmean + - or get assim obs error from config + - edit obs error in obs_seq.out + - assimilate + - write state to archive + Assumptions: - x_ensemble is already linked for DART to advance_temp<iens>/wrfout_d01 """ time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') - archive_time = cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/') + archive_time = cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/') os.chdir(cluster.dartrundir) os.system('rm -f obs_seq.in obs_seq.out obs_seq.final') # remove any existing observation files - n_stages = len(exp.observations) + print('create obs_seq.in from config') + istage = 0 + archive_stage = archive_time + '/assim_stage'+str(istage)+'/' + + prepare_nature_dart(time) # link WRF files to DART directory + + ################################################ + print(' 1) get the assimilation errors in a single vector ') + error_assimilate = [] + + for i, obscfg in enumerate(exp.observations): + n_obs = obscfg['n_obs'] + n_obs_z = obscfg.get('heights', 1) + n_obs_3d = n_obs * n_obs_z + + parametrized = obscfg.get('sat_channel') == 6 + if not parametrized: + err_this_type = np.zeros(n_obs_3d) + obscfg['error_generate'] + + else: # error parametrization for WV73 + # get observations for sat 6 + osq.create_obsseqin_alltypes(time, [obscfg,], obs_errors='error_generate') + run_perfect_model_obs() + + # depends on obs_seq.out produced before by run_perfect_model_obs() + Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') + + Hx_prior = obs_operator_ensemble(istage) # files are already linked to DART directory + err_this_type = calc_obserr_WV73(Hx_nat, Hx_prior) + + error_assimilate.extend(err_this_type) + + ################################################ + print(' 2) generate observations ') + osq.create_obsseqin_alltypes(time, exp.observations, obs_errors='error_generate', + archive_obs_coords=archive_stage+'/obs_coords.pkl') + + first_obstype = exp.observations[0] + set_DART_nml(cov_loc_radius_km=first_obstype['cov_loc_radius_km'], + cov_loc_vert_km=first_obstype.get('cov_loc_vert_km', False)) + + run_perfect_model_obs() # actually create observations that are used to assimilate + + ################################################ + print(' 3) assimilate with adapted observation-errors ') + replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) + t = time.time() + assimilate() + + dir_obsseq = cluster.archivedir+'/obs_seq_final/assim_stage'+str(istage) + archive_diagnostics(dir_obsseq, time) + archive_output(archive_stage) + + + sys.exit() # below is the code for separate assimilation of different obs types + for istage, obscfg in enumerate(exp.observations): print('running observation stage', istage, obscfg) @@ -327,42 +415,30 @@ if __name__ == "__main__": n_obs = obscfg['n_obs'] n_obs_3d = n_obs * len(obscfg['heights']) sat_channel = obscfg.get('sat_channel', False) - error_generate = obscfg['error_generate'] - error_assimilate = obscfg['error_assimilate'] - set_DART_nml(sat_channel=sat_channel, - cov_loc_radius_km=obscfg['cov_loc_radius_km'], - cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False)) + # debug option: overwrite time in prior files + # for iens in range(1,41): + # os.system('ncks -A -v Times '+cluster.dartrundir+'/wrfout_d01 '+cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01') - use_error_parametrization = error_generate == False - if use_error_parametrization: + if error_assimilate == False: + # use error parametrization for assimilation-obs.errors if sat_channel != 6: raise NotImplementedError('sat channel '+str(sat_channel)) - osq.create_obsseq_in(time, obscfg, obs_errors=0) # zero error to get truth vals + # depends on obs_seq.out produced before by run_perfect_model_obs() + Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') - Hx_nat = obs_operator_nature(time) Hx_prior = obs_operator_ensemble(istage) # files are already linked to DART directory - error_generate = calc_obserr_WV73(Hx_nat, Hx_prior) - - # create obs template file, now with correct errors - osq.create_obsseq_in(time, obscfg, obs_errors=error_generate, - archive_obs_coords=archive_stage+'/obs_coords.pkl') - prepare_nature_dart(time) # link WRF files to DART directory - run_perfect_model_obs() # actually create observations that are used to assimilate + error_assimilate = calc_obserr_WV73(Hx_nat, Hx_prior) - # debug option: overwrite time in prior files - # for iens in range(1,41): - # os.system('ncks -A -v Times '+cluster.dartrundir+'/wrfout_d01 '+cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01') - - if error_assimilate is not False: + else: # overwrite error values in obs_seq.out error_assimilate = np.zeros(n_obs_3d) + error_assimilate # ensure shape - replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) + replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) assimilate() - dir_obsseq = cluster.archivedir()+'/obs_seq_final/assim_stage'+str(istage) + dir_obsseq = cluster.archivedir+'/obs_seq_final/assim_stage'+str(istage) archive_diagnostics(dir_obsseq, time) if istage < n_stages-1: diff --git a/scripts/assim_synth_obs_separate_obstypes.py b/scripts/assim_synth_obs_separate_obstypes.py new file mode 100755 index 0000000..6689af0 --- /dev/null +++ b/scripts/assim_synth_obs_separate_obstypes.py @@ -0,0 +1,378 @@ +import os, sys, shutil, warnings +import datetime as dt +import numpy as np +from scipy.interpolate import interp1d + +from config.cfg import exp, cluster +from utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print +import create_obsseq as osq +import wrfout_add_geo + +earth_radius_km = 6370 + +# fit of Fig 7, Harnisch 2016 +x_ci = [0, 5, 10.5, 13, 16] +y_oe = [1, 4.5, 10, 12, 13] # Kelvin +oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True) + +def oe_73(ci): + if ci < 13: + return oe_73_linear(ci) + else: + return 16. + +def cloudimpact_73(bt_mod, bt_obs): + """ + follows Harnisch 2016 + """ + biascor_obs = 0 + bt_lim = 255 # Kelvin for 7.3 micron WV channel + + ci_obs = max(0, bt_lim-(bt_obs - biascor_obs)) + ci_mod = max(0, bt_lim-bt_mod) + ci = (ci_obs+ci_mod)/2 + return ci + +def read_prior_obs(f_obs_prior): + """ + docstring + """ + obsseq = open(f_obs_prior, 'r').readlines() + OBSs = [] + # read observations from obs_seq.final + for i, line in enumerate(obsseq): + if ' OBS ' in line: + observed = float(obsseq[i+1].strip()) + truth = float(obsseq[i+2].strip()) + prior_ensmean = float(obsseq[i+3].strip()) + prior_enssd = float(obsseq[i+4].strip()) + prior_ens = [] + for j in range(5, 5+exp.n_ens): + prior_ens.append(float(obsseq[i+j].strip())) + + OBSs.append(dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens))) + return OBSs + +def read_truth_obs_obsseq(f): + """Reads observed and true values from obs_seq.out/final files.""" + obsseq = open(f, 'r').readlines() + true = [] + obs = [] + # read observations from obs_seq.out + for i, line in enumerate(obsseq): + if ' OBS ' in line: + observed = float(obsseq[i+1].strip()) + truth = float(obsseq[i+2].strip()) + true.append(truth) + obs.append(observed) + return true, obs + +def replace_errors_obsseqout(f, new_errors): + """Replaces existing observation errors in obs_seq.final files + + new_errors (np.array) : standard deviation, + shape must match the number of observations + """ + obsseq = open(f, 'r').readlines() + + # find number of lines between two ' OBS ' lines: + first_obs = None + for i, line in enumerate(obsseq): + if ' OBS ' in line: + if first_obs is None: + first_obs = i + else: + second_obs = i + break + lines_between = second_obs - first_obs + lines_obserr_after_obsnr = lines_between - 1 # obserr line is one before ' OBS ' line + + # replace values in list obsseq + i_obs = 0 + for i, line in enumerate(obsseq): + if ' OBS ' in line: + line_error_obs_i = i+lines_obserr_after_obsnr + + previous_err_var = obsseq[line_error_obs_i] + new_err_obs_i = new_errors[i_obs]**2 # variance in obs_seq.out + #print('previous err var ', previous_err_var, 'new error', new_err_obs_i) + obsseq[line_error_obs_i] = ' '+str(new_err_obs_i)+' \n' + + i_obs += 1 # next iteration + + with open(f, 'w') as file: + for line in obsseq: + file.write(line) + print('replaced obs errors in', f) + + +def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False, + just_prior_values=False): + """descr""" + cov_loc_radian = cov_loc_radius_km/earth_radius_km + + if just_prior_values: + template = cluster.scriptsdir+'/../templates/input.eval.nml' + else: + template = cluster.scriptsdir+'/../templates/input.nml' + copy(template, cluster.dartrundir+'/input.nml') + + # options are overwritten with settings + options = {'<n_ens>': str(int(exp.n_ens)), + '<cov_loc_radian>': str(cov_loc_radian)} + + if cov_loc_vert_km: + vert_norm_rad = earth_radius_km*cov_loc_vert_km/cov_loc_radius_km*1000 + options['<horiz_dist_only>'] = '.false.' + options['<vert_norm_hgt>'] = str(vert_norm_rad) + else: + options['<horiz_dist_only>'] = '.true.' + options['<vert_norm_hgt>'] = '50000.0' # dummy value + + for key, value in options.items(): + sed_inplace(cluster.dartrundir+'/input.nml', key, value) + + # input.nml for RTTOV + if sat_channel > 0: + if sat_channel in [1, 2, 3, 12]: # VIS channels + rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml' + else: # IR channels + rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml' + append_file(cluster.dartrundir+'/input.nml', rttov_nml) + else: + # append any rttov segment, needs to exist anyway + rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml' + append_file(cluster.dartrundir+'/input.nml', rttov_nml) + +def obs_operator_ensemble(istage): + # assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01 + print('running obs operator on ensemble forecast') + os.chdir(cluster.dartrundir) + + if sat_channel: + list_ensemble_truths = [] + + for iens in range(1, exp.n_ens+1): + print('observation operator for ens #'+str(iens)) + # ens members are already linked to advance_temp<i>/wrfout_d01 + copy(cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01', + cluster.dartrundir+'/wrfout_d01') + # DART may need a wrfinput file as well, which serves as a template for dimension sizes + symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01') + + # add geodata, if istage>0, wrfout is DART output (has coords) + if istage == 0: + wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') + + # run perfect_model obs (forward operator) + os.system('mpirun -np 12 ./perfect_model_obs > /dev/null') + + # truth values in obs_seq.out are H(x) values + true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') + list_ensemble_truths.append(true) + + n_obs = len(list_ensemble_truths[0]) + np_array = np.full((exp.n_ens, n_obs), np.nan) + for i in range(exp.n_ens): + np_array[i, :] = list_ensemble_truths[i] + return np_array + else: + raise NotImplementedError() + +def obs_operator_nature(time): + print('what are the true values in obs space of nature run') + prepare_nature_dart(time) + run_perfect_model_obs() + true, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') + return true + + +def link_nature_to_dart_truth(time): + # get wrfout_d01 from nature run + shutil.copy(time.strftime(cluster.nature_wrfout), + cluster.dartrundir+'/wrfout_d01') + # DART may need a wrfinput file as well, which serves as a template for dimension sizes + symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01') + + +def prepare_nature_dart(time): + link_nature_to_dart_truth(time) + wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01') + + +def calc_obserr_WV73(Hx_nature, Hx_prior): + + n_obs = len(Hx_nature) + OEs = np.ones(n_obs) + for iobs in range(n_obs): + + bt_y = Hx_nature[iobs] + bt_x_ens = Hx_prior[:, iobs] + CIs = [cloudimpact_73(bt_x, bt_y) for bt_x in bt_x_ens] + mean_CI = np.mean(CIs) + + oe_nature = oe_73(mean_CI) + print('oe_nature:', oe_nature, ', bt_y:', bt_y, ', mean_CI:', mean_CI) + OEs[iobs] = oe_nature + return OEs + +def run_perfect_model_obs(): + os.chdir(cluster.dartrundir) + try_remove(cluster.dartrundir+'/obs_seq.out') + if not os.path.exists(cluster.dartrundir+'/obs_seq.in'): + raise RuntimeError('obs_seq.in does not exist in '+cluster.dartrundir) + os.system('mpirun -np 12 ./perfect_model_obs > log.perfect_model_obs') + if not os.path.exists(cluster.dartrundir+'/obs_seq.out'): + raise RuntimeError('obs_seq.out does not exist in '+cluster.dartrundir, + '\n look for '+cluster.dartrundir+'log.perfect_model_obs') + +def assimilate(nproc=96): + print('running filter') + os.chdir(cluster.dartrundir) + try_remove(cluster.dartrundir+'/obs_seq.final') + os.system('mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-'+str(int(nproc)-1)+' -np '+str(int(nproc))+' ./filter > log.filter') + +def archive_diagnostics(archive_dir, time): + print('archive obs space diagnostics') + mkdir(archive_dir) + fout = archive_dir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final') + copy(cluster.dartrundir+'/obs_seq.final', fout) + print(fout, 'saved.') + + try: + copy(cluster.dartrundir+'/obs_coords.pkl', archive_stage+'/obs_coords.pkl') + except Exception as e: + warnings.warn(str(e)) + + # try: # what are regression diagnostics?! + # print('archive regression diagnostics') + # copy(cluster.dartrundir+'/reg_diagnostics', archive_dir+'/reg_diagnostics') + # except Exception as e: + # warnings.warn(str(e)) + +def recycle_output(): + update_vars = ['U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'QRAIN', 'U10', 'V10', 'T2', 'Q2', 'TSK', 'PSFC', 'CLDFRA'] + updates = ','.join(update_vars) + + print('recycle DART output to be used as input') + for iens in range(1, exp.n_ens+1): + dart_output = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4) + dart_input = cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01' + + #print('check for non-monotonic vertical pressure') + + # convert link to file in order to be able to update the content + if os.path.islink(dart_input): + l = os.readlink(dart_input) + os.remove(dart_input) + copy(l, dart_input) + + # print('move DART output to input: '+dart_output+' -> '+dart_input) + # os.rename(dart_output, dart_input) # probably doesnt work + + print('updating', updates, 'in', dart_input, 'from', dart_output) + os.system(cluster.ncks+' -A -v '+updates+' '+dart_output+' '+dart_input) + +def archive_output(archive_stage): + print('archiving output') + mkdir(archive_stage) + copy(cluster.dartrundir+'/input.nml', archive_stage+'/input.nml') + + # single members + for iens in range(1, exp.n_ens+1): + #savedir = archive_stage+'/'+str(iens) + #mkdir(savedir) + # filter_in = cluster.dartrundir+'/preassim_member_'+str(iens).zfill(4)+'.nc' + filter_out = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4) + copy(filter_out, archive_stage+'/filter_restart_d01.'+str(iens).zfill(4)) + + # copy mean and sd to archive + for f in ['output_mean.nc', 'output_sd.nc']: + copy(cluster.dartrundir+'/'+f, archive_stage+'/'+f) + + +if __name__ == "__main__": + + """Generate observations (obs_seq.out file) + as defined in config/cfg.py + for a certain timestamp (argument) of the nature run (defined in config/clusters.py) + + Workflow: + for each assimilation stage (one obs_seq.in and e.g. one observation type): + 1) prepare nature run for DART + optional: 2) calculate obs-error from parametrization + 3) create obs_seq.in with obs-errors from 2) + 4) generate actual observations (obs_seq.out) with obs_seq.in from 3) + + - calculate obs-error from parametrization + 1) create obs_seq.in with obs error=0 + 2) calculate y_nat = H(x_nature) and y_ens = H(x_ensemble) + 3) calculate obs error as function of y_nat, y_ensmean + + Assumptions: + - x_ensemble is already linked for DART to advance_temp<iens>/wrfout_d01 + """ + + time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') + archive_time = cluster.archivedir+time.strftime('/%Y-%m-%d_%H:%M/') + + os.chdir(cluster.dartrundir) + os.system('rm -f obs_seq.in obs_seq.out obs_seq.final') # remove any existing observation files + + n_stages = len(exp.observations) + for istage, obscfg in enumerate(exp.observations): + print('running observation stage', istage, obscfg) + + archive_stage = archive_time + '/assim_stage'+str(istage)+'/' + n_obs = obscfg['n_obs'] + n_obs_3d = n_obs * len(obscfg['heights']) + sat_channel = obscfg.get('sat_channel', False) + error_generate = obscfg['error_generate'] + error_assimilate = obscfg['error_assimilate'] + + set_DART_nml(sat_channel=sat_channel, + cov_loc_radius_km=obscfg['cov_loc_radius_km'], + cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False)) + + # create obs template file, now with correct errors + osq.create_obsseq_in(time, obscfg, obs_errors=error_generate, + archive_obs_coords=archive_stage+'/obs_coords.pkl') + prepare_nature_dart(time) # link WRF files to DART directory + run_perfect_model_obs() # actually create observations that are used to assimilate + + # debug option: overwrite time in prior files + # for iens in range(1,41): + # os.system('ncks -A -v Times '+cluster.dartrundir+'/wrfout_d01 '+cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01') + + if error_assimilate == False: + # use error parametrization for assimilation-obs.errors + + if sat_channel != 6: + raise NotImplementedError('sat channel '+str(sat_channel)) + + # depends on obs_seq.out produced before by run_perfect_model_obs() + Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') + + Hx_prior = obs_operator_ensemble(istage) # files are already linked to DART directory + error_assimilate = calc_obserr_WV73(Hx_nat, Hx_prior) + + else: + # overwrite error values in obs_seq.out + error_assimilate = np.zeros(n_obs_3d) + error_assimilate # ensure shape + + replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) + assimilate() + dir_obsseq = cluster.archivedir+'/obs_seq_final/assim_stage'+str(istage) + archive_diagnostics(dir_obsseq, time) + + if istage < n_stages-1: + # recirculation: filter output -> input + archive_output(archive_stage) + recycle_output() + + elif istage == n_stages-1: + # last assimilation, continue integration now + archive_output(archive_stage) + + else: + RuntimeError('this should never occur?!') diff --git a/scripts/config b/scripts/config new file mode 120000 index 0000000..8086613 --- /dev/null +++ b/scripts/config @@ -0,0 +1 @@ +/home/fs71386/lkugler/DART-WRF/scripts//../config \ No newline at end of file diff --git a/scripts/create_obsseq.py b/scripts/create_obsseq.py index 400b50d..2b54b79 100755 --- a/scripts/create_obsseq.py +++ b/scripts/create_obsseq.py @@ -1,5 +1,6 @@ -"""Create obs_seq.in - +"""The functions in here create obs_seq.in files. +These are the template files defining obs locations and metadata +according to which observations are generated and subsequently assimilated. """ import os, sys, warnings import numpy as np @@ -43,10 +44,14 @@ def obskind_read(): ##################### # Global variables -# position on earth to calculate solar angles +# position on earth for RTTOV ray geometry lat0 = 45. lon0 = 0. +# position on Earth for DART, domain center when coords_from_domaincenter=True +lat0_center = 45. +lon0_center = 0. + sat_az = "180.0" sat_zen = "45.0" @@ -77,8 +82,8 @@ def get_dart_date(time_dt): return dart_date_day, secs_thatday -def calc_obs_locations(n_obs, coords_from_domaincenter=True, - distance_between_obs_km=9999, fpath_coords=False): +def calc_obs_locations(n_obs, coords_from_domaincenter=True, + distance_between_obs_km=9999, cov_loc_radius_km=32, fpath_coords=False): """Calculate coordinate pairs for locations Args: @@ -115,28 +120,50 @@ def calc_obs_locations(n_obs, coords_from_domaincenter=True, for iy in range(int(-ny/2), int(ny/2+1)): for ix in range(int(-nx/2), int(nx/2+1)): - lat = lat0 + iy*dy_4km_in_degree + lat = lat0_center + iy*dy_4km_in_degree m_per_degree_x = 2*np.pi*radius_earth_meters*np.sin(lat/180*np.pi)/360 dx_4km_in_degree = distance_between_obs_meters/m_per_degree_x - lon = lon0 + ix*dx_4km_in_degree + lon = lon0_center + ix*dx_4km_in_degree coords.append((lat, lon)) else: - """Observations spread evenly over domain""" - fcoords = '/home/fs71386/lkugler/run_DART/geo_em.d01.nc' + """Observations spread evenly over domain + but: leave out a distance to the border of the domain + so that the assimilation increments are zero on the boundary + distance to boundary = 1.5x localization-radius + """ + fcoords = cluster.dartrundir+'/geo_em.d01.nc' import xarray as xr ds = xr.open_dataset(fcoords) lons = ds.XLONG_M.isel(Time=0).values lats = ds.XLAT_M.isel(Time=0).values n_obs_x = int(np.sqrt(n_obs)) - dx = int(len(ds.south_north)/n_obs_x) - skip = int(dx/2) + nx = len(ds.south_north) # number of gridpoints in one direction + model_dx_km = exp.model_dx/1000 + + omit_covloc_radius_on_boundary = True + if omit_covloc_radius_on_boundary: # in order to avoid an increment step on the boundary + skip_km = cov_loc_radius_km*1.5 + skip_gridpoints = int(skip_km/model_dx_km) # skip this many gridpoints on each side + + gridpoints_left = nx - 2*skip_gridpoints + # now spread observations evenly across the space left + gridpoints_between_obs = int(gridpoints_left/(n_obs_x-1)) + else: + gridpoints_between_obs = int(nx/n_obs_x) # number of gridpoints / number of observations in one direction + skip_gridpoints = int(gridpoints_between_obs/2) # to center the observations in the domain + + km_between_obs = gridpoints_between_obs*model_dx_km + print('observation density: one observation every', km_between_obs, 'km /', + gridpoints_between_obs,'gridpoints \n', 'leaving a domain boundary on each side of', + skip_gridpoints, 'gridpoints or', skip_gridpoints*model_dx_km, 'km') + # skip_gridpoints = space we have to leave out on the boundary of the domain + # in order to have the observations centered in the domain for i in range(n_obs_x): for j in range(n_obs_x): - coords.append((lats[skip+i*dx, skip+j*dx], - lons[skip+i*dx, skip+j*dx])) - + coords.append((lats[skip_gridpoints+i*gridpoints_between_obs, skip_gridpoints+j*gridpoints_between_obs], + lons[skip_gridpoints+i*gridpoints_between_obs, skip_gridpoints+j*gridpoints_between_obs])) try: if fpath_coords: import pickle @@ -184,6 +211,51 @@ obs_kind_definitions first: 1 last: """+n_obs_str +def preamble_multi(n_obs_3d_total, list_kinds): + lines_obstypedef = '' + for kind in list_kinds: + lines_obstypedef += '\n '+obs_kind_nrs[kind]+' '+kind + + n_obs_str = str(n_obs_3d_total) + num_obstypes = str(len(list_kinds)) + return """ obs_sequence +obs_kind_definitions + """+num_obstypes+' '+lines_obstypedef+""" + num_copies: 0 num_qc: 0 + num_obs: """+n_obs_str+" max_num_obs: "+n_obs_str+""" + first: 1 last: """+n_obs_str + + +def determine_vert_coords(sat_channel, kind, obscfg): + if not sat_channel: + if 'SYNOP' in kind: + vert_coord_sys = "-1" # meters AGL + vert_coords = [2, ] + else: + vert_coord_sys = "3" # meters AMSL + vert_coords = obscfg['heights'] + else: + vert_coord_sys = "-2" # undefined height + vert_coords = ["-888888.0000000000", ] + return vert_coord_sys, vert_coords + + +def write_sat_angle_appendix(sat_channel, lat0, lon0, time_dt): + if sat_channel: + sun_az = str(get_azimuth(lat0, lon0, time_dt)) + sun_zen = str(90. - get_altitude(lat0, lon0, time_dt)) + print('sunzen', sun_zen, 'sunazi', sun_az) + + return """visir + """+sat_az+""" """+sat_zen+""" """+sun_az+""" + """+sun_zen+""" + 12 4 21 """+str(sat_channel)+""" + -888888.000000000 + 1""" + else: + return '' + + def write_section(obs, last=False): """ Args: @@ -211,9 +283,9 @@ kind """+str(obs['obserr_var']) -def create_obsseq_in(time_dt, obscfg, obs_errors=False, +def create_obsseq_in_separate_obs(time_dt, obscfg, obs_errors=False, archive_obs_coords=False): - """Create obs_seq.in + """Create obs_seq.in of one obstype Args: time_dt (dt.datetime): time of observation @@ -304,47 +376,120 @@ def create_obsseq_in(time_dt, obscfg, obs_errors=False, write_file(txt, output_path=cluster.dartrundir+'/obs_seq.in') +def create_obsseqin_alltypes(time_dt, list_obscfg, obs_errors='error_assimilate', archive_obs_coords=False): + """Create obs_seq.in with multiple obs types in one file + + Args: + time_dt (dt.datetime): time of observation + list_obscfg (list of dict) + obs_errors (False or str): Key to obsdict, which field contains the error values + archive_obs_coords (bool, str): False or str (filepath where `obs_seq.in` will be saved) + """ + print('creating obs_seq.in:') + time_dt = add_timezone_UTC(time_dt) + dart_date_day, secs_thatday = get_dart_date(time_dt) + # print('secs, days:', secs_thatday, dart_date_day) + + txt = '' + + i_obs_total = 0 + for istage, obscfg in enumerate(list_obscfg): + n_obs = obscfg['n_obs'] + + coords = calc_obs_locations(n_obs, + coords_from_domaincenter=False, + distance_between_obs_km=obscfg.get('distance_between_obs_km', False), + cov_loc_radius_km=obscfg['cov_loc_radius_km'], + fpath_coords=archive_obs_coords) + + kind = obscfg['kind'] + print('obstype', kind) + sat_channel = obscfg.get('sat_channel', False) + + vert_coord_sys, vert_coords = determine_vert_coords(sat_channel, kind, obscfg) + coords = append_hgt_to_coords(coords, vert_coords) + n_obs_3d_thistype = len(coords) + + # define obs error + obserr_std = np.zeros(n_obs_3d_thistype) + if obs_errors: + obserr_std += obscfg[obs_errors] + + sat_info = write_sat_angle_appendix(sat_channel, lat0, lon0, time_dt) + + for i_obs in range(n_obs_3d_thistype): + i_obs_total += 1 + last = False + + is_last_obs_in_type = (i_obs == int(n_obs_3d_thistype)-1) + is_last_obstype = (istage == len(list_obscfg)-1) + + if is_last_obs_in_type and is_last_obstype: + last = True + + txt += write_section(dict(i=i_obs_total, + kind_nr=obs_kind_nrs[kind], + dart_date_day=dart_date_day, + secs_thatday=secs_thatday, + lon=coords[i_obs][1], + lat=coords[i_obs][0], + vert_coord=coords[i_obs][2], + vert_coord_sys=vert_coord_sys, + obserr_var=obserr_std[i_obs]**2, + appendix=sat_info), + last=last) + + n_obs_total = i_obs_total + list_kinds = [a['kind'] for a in list_obscfg] + pretxt = preamble_multi(n_obs_total, list_kinds) + + alltxt = pretxt + txt + write_file(alltxt, output_path=cluster.dartrundir+'/obs_seq.in') + + + if __name__ == '__main__': # for testing time_dt = dt.datetime(2008, 7, 30, 9, 0) - n_obs = 1600 # radar: n_obs for each observation height level + n_obs = 900 # radar: n_obs for each observation height level vis = dict(plotname='VIS 0.6µm', plotunits='[1]', - kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs, - err_std=0.03, + kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs, error_generate=0.03, error_assimilate=0.06, cov_loc_radius_km=32) wv73 = dict(plotname='Brightness temperature WV 7.3µm', plotunits='[K]', - kind='MSG_4_SEVIRI_TB', sat_channel=6, n_obs=n_obs, - err_std=False, - error_generate=False, error_assimilate=False, + kind='MSG_4_SEVIRI_TB', sat_channel=6, n_obs=n_obs, + error_generate=1., error_assimilate=False, cov_loc_radius_km=32) ir108 = dict(plotname='Brightness temperature IR 10.8µm', plotunits='[K]', - kind='MSG_4_SEVIRI_TB', sat_channel=9, n_obs=n_obs, - err_std=5., + kind='MSG_4_SEVIRI_TB', sat_channel=9, n_obs=n_obs, error_generate=5., error_assimilate=10., cov_loc_radius_km=32) radar = dict(plotname='Radar reflectivity', plotunits='[dBz]', - kind='RADAR_REFLECTIVITY', n_obs=n_obs, + kind='RADAR_REFLECTIVITY', n_obs=n_obs, error_generate=2.5, error_assimilate=5., heights=np.arange(1000, 15001, 1000), - cov_loc_radius_km=30, cov_loc_vert_km=4) + cov_loc_radius_km=32, cov_loc_vert_km=4) t2m = dict(plotname='SYNOP Temperature', plotunits='[K]', - kind='SYNOP_TEMPERATURE', n_obs=n_obs, + kind='SYNOP_TEMPERATURE', n_obs=n_obs, error_generate=0.1, error_assimilate=1., cov_loc_radius_km=20, cov_loc_vert_km=3) psfc = dict(plotname='SYNOP Pressure', plotunits='[dBz]', - kind='SYNOP_SURFACE_PRESSURE', n_obs=n_obs, + kind='SYNOP_SURFACE_PRESSURE', n_obs=n_obs, error_generate=50., error_assimilate=100., cov_loc_radius_km=32, cov_loc_vert_km=5) #create_obsseq_in(time_dt, radar, archive_obs_coords=False) #'./coords_stage1.pkl') + create_obsseqin_alltypes(time_dt, [vis, wv73], obs_errors='error_generate', archive_obs_coords='./obs_coords.pkl') + error_assimilate = 5.*np.ones(n_obs*len(radar['heights'])) import assim_synth_obs as aso - aso.replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) + #aso.replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) + + diff --git a/scripts/create_wbubble_wrfinput.py b/scripts/create_wbubble_wrfinput.py new file mode 100644 index 0000000..01732c8 --- /dev/null +++ b/scripts/create_wbubble_wrfinput.py @@ -0,0 +1,41 @@ +import os, sys, shutil +import datetime as dt +import numpy as np + +from config.cfg import exp, cluster +import netCDF4 as nc + +dx_km = 2 +cr = 50 # km +cz = 3000 # meters + +for iens in range(1, exp.n_ens+1): + print('update state in wrfinput wrfout file to DART background file') + + wrfin = cluster.wrf_rundir(iens)+'/wrfinput_d01' + + # insert wbubble + with nc.Dataset(wrfin, 'r+') as ds: + t = ds.variables['T'][:] + nt, nz, ny, nx = t.shape + z = ds.variables['PHB'][0,:,100,100]/9.81 + z = np.array(z) + z = z - z[0] + z = (z[1:]+z[:-1])/2 + z = z[:, np.newaxis, np.newaxis] + + cx = 100*dx_km #(80 + 40*np.random.rand())*dx_km + cy = 100*dx_km #(80 + 40*np.random.rand())*dx_km + print(cx, cy) + x = np.arange(0,nx)*dx_km + y = np.arange(0,ny)*dx_km + dx = x-cx + dy = y-cy + xx, yy = np.meshgrid(dx, dy) + dr = np.sqrt(xx**2 + yy**2)[np.newaxis, :, :] + + pert = 3*np.exp(-(dr/cr)**2)*np.exp(-(z/cz)**2) + + ds.variables['T'][0,...] = ds.variables['T'][0,...] + pert + + print(wrfin, 'wbubble inserted.') diff --git a/scripts/link_rttov.py b/scripts/link_rttov.py new file mode 100644 index 0000000..e93a00a --- /dev/null +++ b/scripts/link_rttov.py @@ -0,0 +1,33 @@ +import os +from config.cfg import exp, cluster +from utils import symlink, copy_scp_srvx8, copy, sed_inplace + +joinp = os.path.join + +# DART executables +bins = ['perfect_model_obs', 'filter', 'obs_diag', 'obs_seq_to_netcdf'] +for b in bins: + symlink(joinp(cluster.dart_srcdir, b), + joinp(cluster.dartrundir, b)) + + +rttov_files = ['rttov13pred54L/rtcoef_msg_4_seviri_o3.dat', + #'mfasis_lut/rttov_mfasis_cld_msg_4_seviri_deff.dat', + 'mfasis_lut/rttov_mfasis_cld_msg_4_seviri_deff.H5', + 'cldaer_visir/sccldcoef_msg_4_seviri.dat'] + +rttov_src_dir = '/home/fs71386/lkugler/RTTOV13/rtcoef_rttov13/' + + +for f_src in rttov_files: + destname = os.path.basename(f_src) + if 'rtcoef' in f_src: + destname = 'rtcoef_msg_4_seviri.dat' + + symlink(rttov_src_dir+f_src, cluster.dartrundir+'/'+destname) + +symlink(cluster.dartrundir+'/rttov_mfasis_cld_msg_4_seviri_deff.H5', + cluster.dartrundir+'/rttov_mfasis_cld_msg_4_seviri.H5') + +symlink(cluster.dart_srcdir+'/../../../observations/forward_operators/rttov_sensor_db.csv', + cluster.dartrundir+'/rttov_sensor_db.csv') diff --git a/scripts/pre_assim.py b/scripts/pre_assim.py index 6a5342e..9828417 100755 --- a/scripts/pre_assim.py +++ b/scripts/pre_assim.py @@ -57,6 +57,8 @@ def run(assim_time, background_init_time, exppath_firstguess): os.system('rm -rf '+cluster.dartrundir+'/perfect_output_*') os.system('rm -rf '+cluster.dartrundir+'/obs_seq.fina*') + os.system(cluster.python+' '+cluster.scriptsdir+'/link_rttov.py') + if __name__ == '__main__': assim_time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') diff --git a/scripts/prepare_dart.py b/scripts/prepare_dart.py deleted file mode 100755 index 2e5a14a..0000000 --- a/scripts/prepare_dart.py +++ /dev/null @@ -1,35 +0,0 @@ -import os, sys -from config.cfg import exp, cluster -from utils import symlink -joinp = os.path.join - -dart_bin_dir = '/home/fs71386/lkugler/DART/DART_WRF_RTTOV_early_access/models/wrf/work/' -rttov_dir = '/home/fs71386/lkugler/RTTOV/' - -# DART executables -bins = ['perfect_model_obs', 'filter', 'obs_diag'] -for b in bins: - symlink(joinp(dart_bin_dir, b), - joinp(cluster.dartrundir, b)) - -# DART RTTOV capability -symlink('/home/fs71386/lkugler/DART/DART_WRF_RTTOV_early_access/' - +'observations/forward_operators/rttov_sensor_db.csv', - joinp(cluster.dartrundir, 'rttov_sensor_db.csv')) - -# Basic MSG4 Seviri -files = ['rtcoef_rttov12/rttov9pred54L/rtcoef_msg_4_seviri.dat', - 'rtcoef_rttov12/cldaer_visir/sccldcoef_msg_4_seviri.dat',] -for f in files: - symlink(joinp(rttov_dir, f), - joinp(cluster.dartrundir, os.path.basename(f))) - -# MFASIS -deff = True -if deff: - mfasis_tbl = 'rtcoef_rttov12/mfasis_lut/rttov_mfasis_cld_msg_4_seviri_deff.H5' -else: - mfasis_tbl = 'rtcoef_rttov12/mfasis_lut/rttov_mfasis_cld_msg_4_seviri_opac.H5' - -symlink(joinp(rttov_dir, mfasis_tbl), - joinp(cluster.dartrundir, 'rttov_mfasis_cld_msg_4_seviri.dat')) diff --git a/scripts/prepare_namelist.py b/scripts/prepare_namelist.py index c01e60d..e352085 100755 --- a/scripts/prepare_namelist.py +++ b/scripts/prepare_namelist.py @@ -1,9 +1,6 @@ import os, sys, shutil, warnings import datetime as dt from config.cfg import exp, cluster - -sys.path.append(cluster.scriptsdir) -from config.cfg import exp, cluster from utils import sed_inplace, copy, symlink, mkdir def run(iens, begin, end, hist_interval=5, radt=5, archive=True): @@ -22,7 +19,7 @@ def run(iens, begin, end, hist_interval=5, radt=5, archive=True): sed_inplace(rundir+'/namelist.input', '<radt>', str(int(radt))) if archive: - archdir = cluster.archivedir()+begin.strftime('/%Y-%m-%d_%H:%M/'+str(iens)+'/') + archdir = cluster.archivedir+begin.strftime('/%Y-%m-%d_%H:%M/'+str(iens)+'/') os.makedirs(archdir, exist_ok=True) else: archdir = './' @@ -41,7 +38,7 @@ def run(iens, begin, end, hist_interval=5, radt=5, archive=True): ######################### if archive: - init_dir = cluster.archivedir()+begin.strftime('/%Y-%m-%d_%H:%M/')+str(iens) + init_dir = cluster.archivedir+begin.strftime('/%Y-%m-%d_%H:%M/')+str(iens) os.makedirs(init_dir, exist_ok=True) try: print('copy wrfinput of this run to archive') diff --git a/scripts/prepare_wrfinput.py b/scripts/prepare_wrfinput.py index f3cb04f..41f8217 100755 --- a/scripts/prepare_wrfinput.py +++ b/scripts/prepare_wrfinput.py @@ -5,6 +5,8 @@ from utils import symlink, copy, link_contents import prepare_namelist +init_time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') + for iens in range(1, exp.n_ens+1): print('preparing ens', iens) input_prof = (cluster.input_profile).replace('<iens>', str(iens).zfill(3)) @@ -17,9 +19,8 @@ for iens in range(1, exp.n_ens+1): symlink(cluster.wrfexe, rundir+'/wrf.exe') # time not important, but general settings - prepare_namelist.run(iens, begin=dt.datetime(2008, 7, 30, 6, 0), - end=dt.datetime(2008, 7, 30, 6, 30), - archive=False) + prepare_namelist.run(iens, begin=init_time, end=dt.datetime(2008, 7, 30, 12), + archive=False) symlink(input_prof, rundir+'/input_sounding') print('finished.') diff --git a/scripts/run_ens.vsc.sh b/scripts/run_ens.vsc.sh index d7936e7..059a25b 100755 --- a/scripts/run_ens.vsc.sh +++ b/scripts/run_ens.vsc.sh @@ -7,13 +7,13 @@ export SLURM_STEP_GRES=none echo "SLURM_ARRAY_TASK_ID:"$SLURM_ARRAY_TASK_ID EXPNAME=<expname> -USERDIR=/home/fs71386/lkugler +MAINDIR=/gpfs/data/fs71386/lkugler/run_WRF pinning=(0-11 12-23 24-35 36-47) for ((n=1; n<=4; n++)) do IENS="$(((($SLURM_ARRAY_TASK_ID - 1)* 4) + $n))" - RUNDIR=$USERDIR/run_WRF/$EXPNAME/$IENS + RUNDIR=$MAINDIR/$EXPNAME/$IENS echo "ENSEMBLE NR: "$IENS" in "$RUNDIR cd $RUNDIR rm -rf wrfrst_d01_* wrfout_d01_* rsl.out.0* @@ -27,7 +27,7 @@ wait for ((n=1; n<=4; n++)) do IENS="$(((($SLURM_ARRAY_TASK_ID - 1)* 4) + $n))" - RUNDIR=$USERDIR/run_WRF/$EXPNAME/$IENS + RUNDIR=$MAINDIR/$EXPNAME/$IENS cd $RUNDIR line=`tail -n 1 rsl.out.0000` if [[ $line == *"SUCCESS COMPLETE WRF"* ]]; diff --git a/scripts/update_wrfinput_from_filteroutput.py b/scripts/update_wrfinput_from_filteroutput.py index 1f7a46b..e438c04 100755 --- a/scripts/update_wrfinput_from_filteroutput.py +++ b/scripts/update_wrfinput_from_filteroutput.py @@ -1,3 +1,4 @@ +print('loading modules') import os, sys, warnings import datetime as dt import netCDF4 as nc @@ -10,12 +11,14 @@ background_init_time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M') exppath_firstguess = str(sys.argv[3]) """ -sets initial condition data (wrfinput file) in run_WRF directories -from a DART output state (set of filter_restart files) +-) sets initial condition data (wrfinput file) in the run_WRF directory for each ensemble member + from a DART output state (set of filter_restart files) +-) cycles (copies) some state variables from the prior ensemble to the wrfinput of the next run # assumes T = THM (dry potential temperature as prognostic variable) """ -update_vars = ['Times', 'U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'TSK', 'CLDFRA'] +update_vars = ['Times',] +update_vars.extend(exp.update_vars) # 'U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'TSK', 'CLDFRA'] updates = ','.join(update_vars) print('move output to WRF dir as new initial conditions') diff --git a/scripts/wrfout_add_geo.py b/scripts/wrfout_add_geo.py index 14b9c17..08e6127 100755 --- a/scripts/wrfout_add_geo.py +++ b/scripts/wrfout_add_geo.py @@ -1,31 +1,33 @@ """Add geogrid data to wrfinput -this is needed for DART, but not provided by ideal.exe +DART needs a georeference, but ideal.exe does not provide it -take LAT,LON, mapfac from geogrid, so that they are consistent. -do not change E, F, HGT_M as they would alter the dynamics and have no impact on assimilation +Takes LAT,LON, mapfac from geogrid, so that they are consistent. +Does not change E, F, HGT_M as they would alter the dynamics and have no impact on assimilation -example call: +Example call: ./wrfout_add_geo.py geo_em.d01.nc wrfinput_d01 """ import os, sys import netCDF4 as nc - from config.cfg import exp, cluster +fields_old = ["XLAT_M", "XLONG_M", "CLAT", + "XLONG_U", "XLONG_V", "XLAT_U", "XLAT_V"] +fields_new = ["XLAT", "XLONG", "CLAT", + "XLONG_U", "XLONG_V", "XLAT_U", "XLAT_V"] + def run(geo_data_file, wrfout_file): + debug = False + + print('updating geodata in', wrfout_file, 'from', geo_data_file) geo_ds = nc.Dataset(geo_data_file, 'r') wrfinp_ds = nc.Dataset(wrfout_file, 'r+') - fields_old = ["XLAT_M", "XLONG_M", "CLAT", - "XLONG_U", "XLONG_V", "XLAT_U", "XLAT_V"] - fields_new = ["XLAT", "XLONG", "CLAT", - "XLONG_U", "XLONG_V", "XLAT_U", "XLAT_V"] - for old, new in zip(fields_old, fields_new): - print('moving old field', old, 'into new field', new) - #print(geo_ds.variables[old][:].shape, wrfinp_ds.variables[new][:].shape) + if debug: + print('moving old field', old, 'into new field', new) + print(geo_ds.variables[old][:].shape, wrfinp_ds.variables[new][:].shape) wrfinp_ds.variables[new][:] = geo_ds.variables[old][:] - #print(wrfinp_ds.variables[new][:]) wrfinp_ds.close() geo_ds.close() diff --git a/templates/input.nml b/templates/input.nml index 561461b..0c0c1ce 100644 --- a/templates/input.nml +++ b/templates/input.nml @@ -51,16 +51,16 @@ output_timestamps = .false., trace_execution = .false., - stages_to_write = 'preassim', 'output' + stages_to_write = 'output' output_members = .true. output_mean = .true. output_sd = .true. write_all_stages_at_end = .false. - inf_flavor = 0, 0, + inf_flavor = 0, 4, inf_initial_from_restart = .true., .false., inf_sd_initial_from_restart = .true., .false., - inf_initial = 1.0, 1.00, + inf_initial = 1.00, 0.90, inf_lower_bound = 1.0, 1.0, inf_upper_bound = 1000000.0, 1000000.0, inf_damping = 0.9, 1.0, diff --git a/templates/namelist.input b/templates/namelist.input index 911d0e4..b488b18 100644 --- a/templates/namelist.input +++ b/templates/namelist.input @@ -24,9 +24,17 @@ / &domains - time_step = <timestep>, + use_adaptive_time_step = .true., + target_cfl = 1.2, 1.2, 1.2, + max_step_increase_pct = 5, 51, 51, + starting_time_step = 8, + max_time_step = 16, + min_time_step = 6, + step_to_output_time = .true., + time_step = 8, time_step_fract_num = 0, time_step_fract_den = 1, + step_to_output_time = .true., max_dom = 1, s_we = 1, 1, 1, e_we = 201, 43, 43, @@ -67,8 +75,8 @@ &dynamics hybrid_opt = 0, rk_ord = 3, - diff_opt = 2, 2, 2, - km_opt = 2, 2, 2, + diff_opt = 1, 1, 1, + km_opt = 4, 4, 2, damp_opt = 2, zdamp = 5000., 5000., 5000., dampcoef = 0.003, 0.003, 0.003 diff --git a/templates/obs_def_rttov.IR.nml b/templates/obs_def_rttov.IR.nml index f529d88..26c0098 100644 --- a/templates/obs_def_rttov.IR.nml +++ b/templates/obs_def_rttov.IR.nml @@ -17,7 +17,6 @@ addrefrac = .false. plane_parallel = .false. use_salinity = .false. - apply_band_correction = .true. cfrac_data = .true. clw_data = .true. rain_data = .false. diff --git a/templates/obs_def_rttov.VIS.nml b/templates/obs_def_rttov.VIS.nml index 2fed46b..2b97f67 100644 --- a/templates/obs_def_rttov.VIS.nml +++ b/templates/obs_def_rttov.VIS.nml @@ -17,7 +17,6 @@ addrefrac = .true. plane_parallel = .false. use_salinity = .false. - apply_band_correction = .true. cfrac_data = .true. clw_data = .true. rain_data = .false. @@ -54,10 +53,9 @@ user_aer_opt_param = .false. user_cld_opt_param = .false. grid_box_avg_cloud = .true. - cldstr_threshold = -1.0 - cldstr_simple = .false. - cldstr_low_cloud_top = 750.0 - ir_scatt_model = 2 + cldcol_threshold = -1.0 + cloud_overlap = 1 + ir_scatt_model = 2 vis_scatt_model = 3 dom_nstreams = 8 dom_accuracy = 0.0 -- GitLab