From ec82840e00b0c69187b1ac75b812dc3a156716e8 Mon Sep 17 00:00:00 2001
From: lkugler <lukas.kugler@gmail.com>
Date: Tue, 5 Jul 2022 10:43:22 +0200
Subject: [PATCH] update, includes preproc

---
 dartwrf/assim_synth_obs.py | 140 +++++++++++++++++++++----------------
 1 file changed, 78 insertions(+), 62 deletions(-)

diff --git a/dartwrf/assim_synth_obs.py b/dartwrf/assim_synth_obs.py
index 4a43f4a..d9d9608 100755
--- a/dartwrf/assim_synth_obs.py
+++ b/dartwrf/assim_synth_obs.py
@@ -1,3 +1,4 @@
+from multiprocessing.sharedctypes import Value
 import os, sys, shutil, warnings
 import time as time_module
 import datetime as dt
@@ -5,7 +6,7 @@ import numpy as np
 from scipy.interpolate import interp1d
 
 from config.cfg import exp, cluster
-from dartwrf.utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print
+from dartwrf.utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print, shell
 import dartwrf.create_obsseq as osq
 from dartwrf import wrfout_add_geo
 from dartwrf import obsseq
@@ -141,6 +142,8 @@ def set_DART_nml(just_prior_values=False):
 
     # options keys are replaced in input.nml with values
     options = {
+        "<sampling_error_correction>": '.true.' if exp.sec else '.false.',
+        "<post_inflation>": '4' if exp.inflation else '0',
         "<n_ens>": str(int(exp.n_ens)),
         "<cov_loc_radian>": "0.00000001",  # dummy value, used for types not mentioned below
         "<list_obstypes>": "'" + "','".join(list_obstypes) + "'",
@@ -173,7 +176,7 @@ def run_Hx(time, obscfg):
     """
     # existing obs_seq.out is a precondition for filter
     # needs to know where to take observations
-    osq.create_obsseqin_alltypes(time, [obscfg, ], 0)
+    osq.create_obsseqin_alltypes(time, [obscfg, ])
     run_perfect_model_obs()
     
     print("running H(x) : obs operator on ensemble prior")
@@ -182,7 +185,16 @@ def run_Hx(time, obscfg):
     set_DART_nml(just_prior_values=True)
 
     # run filter to calculate prior Hx 
-    os.system("mpirun -np 12 ./filter &> log.filter.preassim")
+    shell("mpirun -np 12 ./filter &> log.filter.preassim")
+
+    osf = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.final")
+
+    if hasattr(exp, "superob_km"):
+        df = osf.df
+        print("superobbing to", exp.superob_km, "km")
+        osf.df = osf.df.superob(window_km=exp.superob_km)
+        osf.to_dart(cluster.dartrundir + "/obs_seq.final")
+    return osf
 
 
 def obs_operator_nature(time):
@@ -243,13 +255,8 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
         # ensure prior time matches assim time (can be off intentionally)
         if assim_time != prior_valid_time:
             print("overwriting time in prior from nature wrfout")
-            os.system(
-                cluster.ncks
-                + " -A -v XTIME,Times "
-                + cluster.dartrundir
-                + "/wrfout_d01 "
-                + wrfout_dart
-            )
+            shell(cluster.ncks+ " -A -v XTIME,Times "+ 
+                    cluster.dartrundir+"/wrfout_d01 "+ wrfout_dart)
 
         # this seems to be necessary (else wrong level selection)
         wrfout_add_geo.run(cluster.dartrundir + "/../geo_em.d01.nc", wrfout_dart)
@@ -304,29 +311,28 @@ def calc_obserr_WV73(Hx_nature, Hx_prior):
         OEs[iobs] = oe_model
     return OEs
 
-
-def run_perfect_model_obs(nproc=12):
+def run_perfect_model_obs(nproc=6):
     print("generating observations - running ./perfect_model_obs")
     os.chdir(cluster.dartrundir)
 
     try_remove(cluster.dartrundir + "/obs_seq.out")
     if not os.path.exists(cluster.dartrundir + "/obs_seq.in"):
         raise RuntimeError("obs_seq.in does not exist in " + cluster.dartrundir)
-    os.system("mpirun -np "+str(nproc)+" ./perfect_model_obs > log.perfect_model_obs")
+    print(shell("mpirun -np "+str(nproc)+" ./perfect_model_obs > log.perfect_model_obs"))
     if not os.path.exists(cluster.dartrundir + "/obs_seq.out"):
         raise RuntimeError(
             "obs_seq.out does not exist in " + cluster.dartrundir,
             "\n look for " + cluster.dartrundir + "/log.perfect_model_obs")
 
 
-def assimilate(nproc=96):
+def assimilate(nproc=12):
     print("time now", dt.datetime.now())
     print("running filter")
     os.chdir(cluster.dartrundir)
     try_remove(cluster.dartrundir + "/obs_seq.final")
     t = time_module.time()
-    #os.system('mpirun -np 12 ./filter &> log.filter')
-    os.system("".join([
+    #shell('mpirun -np 12 ./filter &> log.filter')
+    shell("".join([
         "mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-",
         str(int(nproc) - 1)," -np ",str(int(nproc))," ./filter > log.filter"]))
     print("./filter took", int(time_module.time() - t), "seconds")
@@ -443,22 +449,21 @@ def get_parametrized_error(obscfg):
 
     # run obs operator (through filter program)
     # creates obs_seq.final containing truth & prior Hx
-    run_Hx(time, obscfg)
-    osf = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.final")
+    osf = run_Hx(time, obscfg)
     df_obs = osf.df
 
-    if hasattr(exp, "superob_km"):
-        print("superobbing to", exp.superob_km, "km")
-        t = time_module.time()
-        df_obs = df_obs.superob(window_km=exp.superob_km)
-        print("superob took", int(time_module.time() - t), "seconds")
-
     Hx_prior = df_obs.get_prior_Hx().T
     Hx_truth = df_obs.get_truth_Hx()
 
     # compute the obs error for assimilation on the averaged grid
     # since the assimilation is done on the averaged grid
-    return calc_obserr_WV73(Hx_truth, Hx_prior)
+    if obscfg.get("sat_channel") == 6:
+        return calc_obserr_WV73(Hx_truth, Hx_prior)
+    elif obscfg.get("sat_channel") == 1:
+        return calc_obserr_VIS06(Hx_truth, Hx_prior)
+    else:
+        NotImplementedError('error_assimilate if False and sat_channel', obscfg.get("sat_channel"))
+
 
 def set_obserr_assimilate_in_obsseqout(obsseqout, outfile="./obs_seq.out"):
     for obscfg in exp.observations:
@@ -468,39 +473,56 @@ def set_obserr_assimilate_in_obsseqout(obsseqout, outfile="./obs_seq.out"):
         # modify each kind separately, one after each other
         mask_kind = obsseqout.df.kind == kind
 
-        if is_assim_error_parametrized(obscfg):
-            assim_err = get_parametrized_error(obscfg)            
+        if obscfg["error_assimilate"] == False:
+            assim_err = get_parametrized_error(obscfg)
             obsseqout.df.loc[mask_kind, 'variance'] = assim_err**2
-            #assert np.allclose(assim_err, obsseqout.df['variance']**2)
-
+            #assert np.allclose(assim_err, obsseqout.df['variance']**2)  # check
         else:
             # overwrite with user-defined values
             obsseqout.df.loc[mask_kind, 'variance'] = obscfg["error_assimilate"]**2
 
     obsseqout.to_dart(outfile)
 
+def qc_obs(oso, outfile='./obs_seq.out'):
+    # obs should be superobbed already!
+    for i, obscfg in enumerate(exp.observations): 
+        if i > 0:
+            raise NotImplementedError()
+
+        # run obs operator (through filter program)
+        # creates obs_seq.final containing truth & prior Hx
+        osf = run_Hx(time, obscfg)
+        df_final = osf.df
+        #from IPython import embed; embed()
+
+        Hx_prior = df_final.get_prior_Hx().T
+        Hx_prior_mean = np.mean(Hx_prior, axis=0)
+        #Hx_prior_spread = df_final['prior ensemble spread'].values
+        #Hx_prior_spread[Hx_prior_spread < 1e-9] = 1e-9
+        obs = oso.df.observations.values
+        n_obs_orig = len(obs)
+
+        obs_dist_to_priormean = abs(obs - Hx_prior_mean)  # /Hx_prior_spread
+        oso.df = oso.df[obs_dist_to_priormean > 0.1]
+        print('removed', n_obs_orig-len(oso.df), 'observations with fgd smaller than .1') 
+
+        # obs_dist_to_priormean = abs(obs - Hx_prior_mean)
+        # oso.df = oso.df[obs_dist_to_priormean > 5]
+        # print('removed', n_obs_orig-len(oso.df), 'observations with abs(FGD) smaller than 5')
+
+        oso.to_dart(outfile)
+        print('saved', outfile)
 
 if __name__ == "__main__":
-    """Assimilate observations (different obs types)
+    """Assimilate observations
     as defined in config/cfg.py
     for a certain timestamp (argument) of the nature run (defined in config/clusters.py)
 
     Workflow:
-    for each assimilation stage (one obs_seq.in and e.g. one observation type):
-    1) create obs_seq.in with obs-errors
-    2) prepare nature run for DART
-    3) create obs from nature (obs_seq.out) using pre-defined obs-errors
-    4) Assimilate
-      - adapt obs errors for assimilation
-        - calculate assim obs error from parametrization
-            1) create obs_seq.in with obs error=0
-            2) calculate y_nat = H(x_nature) and y_ens = H(x_ensemble)
-            3) calculate obs error as function of y_nat, y_ensmean
-        - or get assim obs error from config
-        - edit obs error in obs_seq.out
-      - assimilate
-      - write state to archive
-
+    1) prepare nature run & prior ensemble for DART
+    2) create obs_seq.in
+    3) create obs from nature (obs_seq.out) with user-defined obs-errors
+    4) Assimilate with assigned errors
 
     Note:
         assim_time (dt.datetime):           time of output
@@ -533,30 +555,22 @@ if __name__ == "__main__":
     prepare_prior_ensemble(time, prior_init_time, prior_valid_time, prior_path_exp)
 
     ################################################
-    print(" 1) generate observations with specified obs-error")
-
-    # the obs-error we use for generating obs is user-defined
-    error_generate = []
-    for i, obscfg in enumerate(exp.observations):
-        levels = obscfg.get("heights", [1,])
-        n_obs_z = len(levels)
-        n_obs_3d = obscfg["n_obs"] * n_obs_z
-        error_generate.extend(np.zeros(n_obs_3d) + obscfg["error_generate"])
-
-    osq.create_obsseqin_alltypes(time, exp.observations, error_generate)
+    print(" 1) create observations with specified obs-error")
+    osq.create_obsseqin_alltypes(time, exp.observations)
     set_DART_nml()
 
-    run_perfect_model_obs()  # actually create observations that are used to assimilate
+    run_perfect_model_obs()  # create observations
+
+    print(" 2) obs preprocessing")
 
-    # read in the actual observations
     oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out")
 
     if False:  # only refl < 6 dBz
         # oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out")
         oso.df = oso.df[oso.df['truth'].values < 6]
         oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
-    if True:
-        oso.df.loc[oso.df['observations'].values < 0.293, ('observations')] = 0.293  # set reflectance to sky clear
+    if False:  # set reflectance < surface albedo to surface albedo
+        oso.df.loc[oso.df['observations'].values < 0.293, ('observations')] = 0.293
         oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
 
     if hasattr(exp, "superob_km"):
@@ -565,11 +579,13 @@ if __name__ == "__main__":
         copy(cluster.dartrundir + "/obs_seq.out", cluster.dartrundir + "/obs_seq.out-orig")
         oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
 
+    # qc_obs(oso, outfile=cluster.dartrundir + "/obs_seq.out")
+
     ################################################
-    print(" 2) assign observation-errors for assimilation ")
+    print(" 3) assign observation-errors for assimilation ")
     set_obserr_assimilate_in_obsseqout(oso, outfile=cluster.dartrundir + "/obs_seq.out")
 
-    print(" 3) assimilate ")
+    print(" 4) assimilate ")
     archive_osq_out(time)
     
     set_DART_nml()
-- 
GitLab