From 64235dd4da597dae473c082efa8c38cd7262c1d5 Mon Sep 17 00:00:00 2001
From: Kugler Lukas <lukas.kugler@univie.ac.at>
Date: Fri, 20 Nov 2020 16:05:43 +0100
Subject: [PATCH] fwd_op_1min capability

---
 scheduler.py                 | 74 +++++++++++++++-------------
 scripts/apply_obs_op_dart.py | 72 ++++++++++++++++++++++++++++
 scripts/create_obsseq.py     | 21 ++++----
 scripts/pre_assim.py         | 93 +++++++++++++++++++-----------------
 4 files changed, 173 insertions(+), 87 deletions(-)
 create mode 100644 scripts/apply_obs_op_dart.py

diff --git a/scheduler.py b/scheduler.py
index 576641d..240aa3c 100755
--- a/scheduler.py
+++ b/scheduler.py
@@ -66,8 +66,8 @@ do
     mv $rundir/rsl.out.0000 $rundir/rsl.out.input
 done
 """
-    s = my_Slurm("prep_wrfinput", cfg_update={"ntasks": str(exp.n_ens),
-                                              "time": "10", "mem-per-cpu": "2G"})
+    s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens),
+                                      "time": "10", "mem-per-cpu": "2G"})
     id = s.run(cmd, depends_on=[id])
     return id
 
@@ -88,43 +88,51 @@ def run_ENS(begin, end, depends_on=None):
     """Run forecast for 1 minute, save output. 
     Then run whole timespan with 5 minutes interval.
     """
-    prev_id = depends_on
+    id = depends_on
 
     # first minute forecast (needed for validating an assimilation)
     hist_interval = 1
+    begin_plus1 = begin+dt.timedelta(minutes=1)
     s = my_Slurm("preWRF1", cfg_update=dict(time="2"))
     id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_namelist.py '
-               + begin.strftime('%Y-%m-%d_%H:%M')+' '
-               + (begin+dt.timedelta(minutes=1)).strftime('%Y-%m-%d_%H:%M')+' '
-               + str(hist_interval), 
-               depends_on=[prev_id])
+            + begin.strftime('%Y-%m-%d_%H:%M')+' '
+            + begin_plus1.strftime('%Y-%m-%d_%H:%M')+' '
+            + str(hist_interval), 
+            depends_on=[id])
 
     s = my_Slurm("runWRF1", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
-                 "time": "5", "mem-per-cpu": "2G"})
+                "time": "2", "mem-per-cpu": "2G"})
     cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
-    id2 = s.run(cmd, depends_on=[id])
+    id = s.run(cmd, depends_on=[id])
+
+    # apply forward operator (DART filter without assimilation)
+    s = my_Slurm("fwOP-1m", cfg_update=dict(time="2"))
+    id = s.run(cluster.python+' '+cluster.scriptsdir+'/apply_obs_op_dart.py '
+               + begin.strftime('%Y-%m-%d_%H:%M')+' '
+               + begin_plus1.strftime('%Y-%m-%d_%H:%M'),
+               depends_on=[id])
 
     # whole forecast timespan
     hist_interval = 5
     s = my_Slurm("preWRF2", cfg_update=dict(time="2"))
     id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_namelist.py '
-               + begin.strftime('%Y-%m-%d_%H:%M')+' '
-               + end.strftime('%Y-%m-%d_%H:%M')+' '
-               + str(hist_interval), 
-               depends_on=[prev_id])
+            + begin.strftime('%Y-%m-%d_%H:%M')+' '
+            + end.strftime('%Y-%m-%d_%H:%M')+' '
+            + str(hist_interval), 
+            depends_on=[id])
 
     time_in_simulation_hours = (end-begin).total_seconds()/3600
-    runtime_wallclock_mins_expected = int(5+time_in_simulation_hours*9)  # usually below 8 min/hour
+    runtime_wallclock_mins_expected = int(4+time_in_simulation_hours*9)  # usually below 8 min/hour
     s = my_Slurm("runWRF2", cfg_update={"nodes": "1", "array": "1-"+str(exp.n_nodes),
-                 "time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"})
+                "time": str(runtime_wallclock_mins_expected), "mem-per-cpu": "2G"})
     cmd = script_to_str(cluster.run_WRF).replace('<expname>', exp.expname)
-    id2 = s.run(cmd, depends_on=[id])
+    id = s.run(cmd, depends_on=[id])
 
     # not needed, since wrf.exe writes directly to archive folder
     #s = my_Slurm("archiveWRF", cfg_update=dict(nodes="1", ntasks="1", time="10"))
     #id3 = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_wrf.py '
     #           + begin.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2])
-    return id2
+    return id
 
 def assimilate(assim_time, background_init_time,
                prior_from_different_exp=False, depends_on=None):
@@ -199,7 +207,6 @@ timedelta_integrate = dt.timedelta(minutes=45)
 timedelta_btw_assim = dt.timedelta(minutes=30)
 
 clear_logs(backup_existing_to_archive=False)
-
 id = None
 
 start_from_existing_state = True
@@ -221,28 +228,29 @@ elif start_from_existing_state:
     id = prepare_wrfinput()  # create initial conditions
     
     # get initial conditions from archive
-    background_init_time = dt.datetime(2008, 7, 30, 6)
-    time = dt.datetime(2008, 7, 30, 10)
-    exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.11_LMU_filter'
+    background_init_time = dt.datetime(2008, 7, 30, 11, 30)
+    time = dt.datetime(2008, 7, 30, 12)
+    exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.12_LMU_so_VIS2'
     first_guess = exppath_arch
-    id = update_wrfinput_from_archive(time, background_init_time, exppath_arch,
-                                      depends_on=id)
+    id = update_wrfinput_from_archive(time, background_init_time, exppath_arch, depends_on=id)
 
-while time <= dt.datetime(2008, 7, 30, 16):
-     assim_time = time
+while time <= dt.datetime(2008, 7, 30, 15):
+    assim_time = time
      id = assimilate(assim_time,
                      background_init_time,
                      prior_from_different_exp=first_guess,
                      depends_on=id)
-     first_guess = False
+    first_guess = False
+
+    prev_forecast_init = assim_time - timedelta_btw_assim
+    this_forecast_init = assim_time  # start integration from here
+    this_forecast_end = assim_time + timedelta_btw_assim
 
-     background_init_time = assim_time  # start integration from here
-     integration_end_time = assim_time + timedelta_integrate
-     id = run_ENS(begin=background_init_time,
-                  end=integration_end_time,
-                  depends_on=id)
-     time += timedelta_btw_assim
+    id = run_ENS(begin=this_forecast_init,
+                end=this_forecast_end,
+                depends_on=id)
+    time += timedelta_btw_assim
 
-     create_satimages(depends_on=id)
+    create_satimages(depends_on=id)
 
 mailme(id)
diff --git a/scripts/apply_obs_op_dart.py b/scripts/apply_obs_op_dart.py
new file mode 100644
index 0000000..ba4d18e
--- /dev/null
+++ b/scripts/apply_obs_op_dart.py
@@ -0,0 +1,72 @@
+import os, sys, shutil
+import datetime as dt
+import numpy as np
+from scipy.interpolate import interp1d
+
+from config.cfg import exp, cluster
+from utils import symlink, copy, mkdir, sed_inplace, append_file
+import create_obsseq as osq
+from gen_synth_obs import read_prior_obs, set_input_nml
+import pre_assim
+
+def run_operator(time):
+    """
+    time_for_dart (dt.datetime) : needs to be consistent with wrfout files!
+    """
+
+    # assume only 1 obstype for now
+    obscfg = exp.observations[0]
+
+    # get observation file (obs not important, but their locations)
+    # this should correspond to configuration to have same locations as in real assim
+    obs_seq_all_out = cluster.dartrundir + '/obs_seq_all.out'
+    os.chdir(cluster.dartrundir)
+    n_obs = obscfg['n_obs']
+    error_var = (obscfg['err_std'])**2
+    sat_channel = obscfg.get('channel', False)
+    cov_loc = obscfg['cov_loc_radius_km']
+    dist_obs = obscfg.get('distance_between_obs_km', False)
+    obs_coords = osq.calc_obs_locations(n_obs, coords_from_domaincenter=False, 
+                                        distance_between_obs_km=dist_obs, 
+                                        fpath_obs_locations=None)
+    osq.sat(time, sat_channel, obs_coords, error_var,
+            output_path=cluster.dartrundir)
+    assert os.path.exists(cluster.dartrundir + '/obs_seq.in')
+
+    # prepare dummy nature
+    os.system('cp ./advance_temp1/wrfout_d01 ./wrfout_d01')
+    import wrfout_add_geo
+    wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', 
+                        cluster.dartrundir+'/wrfout_d01')
+
+    print('running perfect model obs', flush=True)
+    os.system('mpirun -np 12 ./perfect_model_obs')
+
+    # set namelist for filter (calc only forward op)
+    set_input_nml(sat_channel=obscfg.get('channel', False),
+                  just_prior_values=True)
+
+    # run filter
+    os.system('mv obs_seq.out obs_seq_all.out')
+    assert os.path.exists(obs_seq_all_out)
+    print('running filter', flush=True)
+    os.system('mpirun -np 40 ./filter')
+
+    # copy output to archive
+    savedir = cluster.archivedir()+'/obs_seq_final_1min/'
+    mkdir(savedir)
+    copy(cluster.dartrundir+'/obs_seq.final', 
+         savedir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final'))
+
+
+if __name__ == '__main__':
+    prev_forecast_init = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
+    time_to_run_fw_op = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M')
+    exppath_firstguess = cluster.archivedir()
+    print(prev_forecast_init, time_to_run_fw_op)
+
+    # link ensemble states to run_DART directory
+    pre_assim.run(time_to_run_fw_op, prev_forecast_init, exppath_firstguess)
+
+    # run fwd operator, save to archive
+    run_operator(time_to_run_fw_op)
diff --git a/scripts/create_obsseq.py b/scripts/create_obsseq.py
index d574098..267af7d 100755
--- a/scripts/create_obsseq.py
+++ b/scripts/create_obsseq.py
@@ -317,19 +317,20 @@ def generic_obs(obs_type, time_dt, coords, error_var, output_path='./'):
 
 
 if __name__ == '__main__':
-    time_dt = dt.datetime(2008, 7, 30, 15, 30)
+    time_dt = dt.datetime(2008, 7, 30, 16, 31)
     n_obs = 100
-    channel_id = 1
+    sat_channel = 1
 
     distance_between_obs_meters = 10000
-
-    # error_var = 0.001
-    # sat(time_dt, channel_id, n_obs, error_var, distance_between_obs_meters,
-    #        output_path='./', fpath_obs_locations='./domain.pkl')
-
-    error_var = (5.)**2
-    generic_obs('RADAR', time_dt, n_obs, error_var, distance_between_obs_meters,
-                output_path='./', fpath_obs_locations='./domain.pkl')
+    error_var = 0.001
+    obs_coords = calc_obs_locations(n_obs, coords_from_domaincenter=False, 
+                                            distance_between_obs_km=distance_between_obs_meters, 
+                                            fpath_obs_locations=None)
+    sat(time_dt, sat_channel, obs_coords, error_var, output_path='./')
+
+    # error_var = (5.)**2
+    # generic_obs('RADAR', time_dt, n_obs, error_var, distance_between_obs_meters,
+    #             output_path='./', fpath_obs_locations='./domain.pkl')
 
     # error_var = (0.5)**2
     # generic_obs('RASO_T', time_dt, n_obs, error_var, distance_between_obs_meters,
diff --git a/scripts/pre_assim.py b/scripts/pre_assim.py
index 9f3b35d..24cfa20 100755
--- a/scripts/pre_assim.py
+++ b/scripts/pre_assim.py
@@ -3,49 +3,54 @@ import datetime as dt
 from config.cfg import exp, cluster
 from utils import symlink, copy_scp_srvx8, copy, sed_inplace
 
-assim_time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
-background_init_time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M')
-exppath_firstguess = str(sys.argv[3])
-
-#if cluster.name != 'srvx8':
-#    copy = copy_scp_srvx8  # use scp
-
-print('prepare prior state estimate')
-for iens in range(1, exp.n_ens+1):
-    #wrfout_run = cluster.wrf_rundir(iens) + time.strftime('/wrfout_d01_%Y-%m-%d_%H:%M:%S')
-
-    print('link wrfout file to DART background file')
-    wrfout_run = exppath_firstguess+background_init_time.strftime('/%Y-%m-%d_%H:%M/')  \
-                 +str(iens)+assim_time.strftime('/wrfout_d01_%Y-%m-%d_%H:%M:%S')
-    dart_ensdir = cluster.dartrundir+'/advance_temp'+str(iens)
-    wrfout_dart = dart_ensdir+'/wrfout_d01'
-
-    os.makedirs(dart_ensdir, exist_ok=True)
-    print('linking', wrfout_run, 'to', wrfout_dart)
-    symlink(wrfout_run, wrfout_dart)
-    symlink(wrfout_dart, dart_ensdir+'/wrfinput_d01')
-
-fpath = cluster.dartrundir+'/input_list.txt'
-print('writing', fpath)
-os.remove(fpath)
-with open(fpath, 'w') as f:
-    for iens in range(1, exp.n_ens+1):
-        f.write('./advance_temp'+str(iens)+'/wrfout_d01')
-        f.write('\n')
+def run(assim_time, background_init_time, exppath_firstguess):
+    #if cluster.name != 'srvx8':
+    #    copy = copy_scp_srvx8  # use scp
 
-fpath = cluster.dartrundir+'/output_list.txt'
-print('writing', fpath)
-os.remove(fpath)
-with open(fpath, 'w') as f:
+    print('prepare prior state estimate')
     for iens in range(1, exp.n_ens+1):
-        f.write('./filter_restart_d01.'+str(iens).zfill(4))
-        f.write('\n')
-
-
-print('removing preassim and filter_restart')
-os.system('rm -rf '+cluster.dartrundir+'/preassim_*')
-os.system('rm -rf '+cluster.dartrundir+'/filter_restart*')
-os.system('rm -rf '+cluster.dartrundir+'/output_mean*')
-os.system('rm -rf '+cluster.dartrundir+'/output_sd*')
-os.system('rm -rf '+cluster.dartrundir+'/perfect_output_*')
-
+        #wrfout_run = cluster.wrf_rundir(iens) + time.strftime('/wrfout_d01_%Y-%m-%d_%H:%M:%S')
+
+        print('link wrfout file to DART background file')
+        wrfout_run = exppath_firstguess+background_init_time.strftime('/%Y-%m-%d_%H:%M/')  \
+                    +str(iens)+assim_time.strftime('/wrfout_d01_%Y-%m-%d_%H:%M:%S')
+        dart_ensdir = cluster.dartrundir+'/advance_temp'+str(iens)
+        wrfout_dart = dart_ensdir+'/wrfout_d01'
+
+        os.makedirs(dart_ensdir, exist_ok=True)
+        print('linking', wrfout_run, 'to', wrfout_dart)
+        symlink(wrfout_run, wrfout_dart)
+        symlink(wrfout_dart, dart_ensdir+'/wrfinput_d01')
+
+    fpath = cluster.dartrundir+'/input_list.txt'
+    print('writing', fpath)
+    os.remove(fpath)
+    with open(fpath, 'w') as f:
+        for iens in range(1, exp.n_ens+1):
+            f.write('./advance_temp'+str(iens)+'/wrfout_d01')
+            f.write('\n')
+
+    fpath = cluster.dartrundir+'/output_list.txt'
+    print('writing', fpath)
+    os.remove(fpath)
+    with open(fpath, 'w') as f:
+        for iens in range(1, exp.n_ens+1):
+            f.write('./filter_restart_d01.'+str(iens).zfill(4))
+            f.write('\n')
+
+
+    print('removing preassim and filter_restart')
+    os.system('rm -rf '+cluster.dartrundir+'/preassim_*')
+    os.system('rm -rf '+cluster.dartrundir+'/filter_restart*')
+    os.system('rm -rf '+cluster.dartrundir+'/output_mean*')
+    os.system('rm -rf '+cluster.dartrundir+'/output_sd*')
+    os.system('rm -rf '+cluster.dartrundir+'/perfect_output_*')
+    os.system('rm -rf '+cluster.dartrundir+'/obs_seq.fina*')
+
+
+if __name__ == '__main__':
+    assim_time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
+    background_init_time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M')
+    exppath_firstguess = str(sys.argv[3])
+
+    run(assim_time, background_init_time, exppath_firstguess)
\ No newline at end of file
-- 
GitLab