From eed5163475d27b7b8ca0761296b01e183355f775 Mon Sep 17 00:00:00 2001
From: lkugler <lukas.kugler@gmail.com>
Date: Tue, 20 Jun 2023 11:04:21 +0200
Subject: [PATCH] history_interval_s, prep_nature, docs

---
 dartwrf/assim_synth_obs.py | 123 +++++++++++++++++++++----------------
 1 file changed, 71 insertions(+), 52 deletions(-)

diff --git a/dartwrf/assim_synth_obs.py b/dartwrf/assim_synth_obs.py
index 6c51da0..c89d07f 100755
--- a/dartwrf/assim_synth_obs.py
+++ b/dartwrf/assim_synth_obs.py
@@ -12,7 +12,12 @@ from dartwrf import dart_nml
 
 from dartwrf.exp_config import exp
 from dartwrf.server_config import cluster
-wrfout_format = 'wrfout_d01_%Y-%m-%d_%H:%M:%S'
+
+
+wrfout_format = '/wrfout_d01_%Y-%m-%d_%H:%M:%S'  # WRF file format, will only change if WRF changes
+pattern_init_time = "/%Y-%m-%d_%H:%M/"  # pattern for the init_timme folder in sim_archive
+pattern_obs_seq_final = "/%Y-%m-%d_%H:%M:%S_obs_seq.final"  # how an obs_seq.final file is archived
+pattern_obs_seq_out = "/%Y-%m-%d_%H:%M:%S_obs_seq.out"  # how an obs_seq.out file is archived
 
 
 def _prepare_DART_grid_template():
@@ -21,8 +26,8 @@ def _prepare_DART_grid_template():
     symlink(cluster.dart_rundir + "/prior_ens1/wrfout_d01", 
             cluster.dart_rundir + "/wrfinput_d01")
 
-def _copy_nature_to_dart(time):
-    """Copies wrfout_d01 from nature run to DART directory
+def _find_nature(time):
+    """Find the path to the nature file for the given time
     """
     glob_pattern = time.strftime(exp.nature_wrfout_pattern)  # replace time in pattern
     print('searching for nature in pattern:', glob_pattern)
@@ -30,14 +35,9 @@ def _copy_nature_to_dart(time):
 
     # check user input
     if not 'wrfout' in f_nat.split('/')[-1]:
-        warnings.warn(f+" does not contain 'wrfout' in filename, are you sure this is a valid nature file?")
-
-    # copy nature wrfout to DART directory
-    shutil.copy(f_nat, cluster.dart_rundir + "/wrfout_d01")
-
-    # add coordinates if necessary
-    if cluster.geo_em_for_WRF_ideal:
-        wrfout_add_geo.run(cluster.geo_em_for_WRF_ideal, cluster.dart_rundir + "/wrfout_d01")
+        warnings.warn(f_nat+" does not contain 'wrfout' in filename, are you sure this is a valid nature file?")
+    assert os.path.exists(f_nat), f_nat+" does not exist"
+    return f_nat
 
 def prepare_nature_dart(time):
     """Prepares DART nature (wrfout_d01) if available
@@ -46,16 +46,22 @@ def prepare_nature_dart(time):
         time (dt.datetime): Time at which observations will be made
     """
     try:
-        _copy_nature_to_dart(time)
-    except (FileNotFoundError, AttributeError) as e:  # if nature is not available due to any reason
-        print('-> has no nature, not copying nature')
+        f_nat = _find_nature(time)
+    except:
+        print('-> no nature available')
+        return
+    
+    shutil.copy(f_nat, cluster.dart_rundir + "/wrfout_d01")  # copy nature wrfout to DART directory
 
+    # add coordinates if necessary
+    if cluster.geo_em_for_WRF_ideal:
+        wrfout_add_geo.run(cluster.geo_em_for_WRF_ideal, cluster.dart_rundir + "/wrfout_d01")
 
 def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_path_exp):
     """Prepares DART files for running filter
     i.e.
     - links first guess state to DART first guess filenames
-    - creates wrfinput_d01 files
+    - creates wrfinput_d01 files (grid file, not a real wrfinput file)
     - adds geo-reference (xlat,xlon) coords so that DART can deal with the files
     - writes txt files so DART knows what input and output is
     - removes probably pre-existing files which could lead to problems
@@ -66,19 +72,23 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
         print("link wrfout file to DART background file")
         wrfout_run = (
             prior_path_exp
-            + prior_init_time.strftime("/%Y-%m-%d_%H:%M/")
+            + prior_init_time.strftime(pattern_init_time)
             + str(iens)
-            + prior_valid_time.strftime("/"+wrfout_format)
+            + prior_valid_time.strftime(wrfout_format)
         )
         dart_ensdir = cluster.dart_rundir + "/prior_ens" + str(iens)
         wrfout_dart = dart_ensdir + "/wrfout_d01"
 
+        # copy prior ens file from archived wrfout files (prior_path_exp)
         os.makedirs(dart_ensdir, exist_ok=True)
         print("copy", wrfout_run, "to", wrfout_dart)
         copy(wrfout_run, wrfout_dart)
+
+        # DART needs a grid file for each ensemble member (this is no real wrfinput file)
         symlink(wrfout_dart, dart_ensdir + "/wrfinput_d01")
 
-        # ensure prior time matches assim time (can be intentionally different)
+        # ensure prior time matches assim time
+        # can be intentionally different, e.g. by using a prior for a different time
         if assim_time != prior_valid_time:
             print("overwriting time in prior from nature wrfout")
             shell(cluster.ncks+ " -A -v XTIME,Times "+ 
@@ -88,7 +98,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
         if cluster.geo_em_for_WRF_ideal:
             wrfout_add_geo.run(cluster.geo_em_for_WRF_ideal, wrfout_dart)
 
-    write_list_of_inputfiles_prior()
+    use_linked_files_as_prior()
     write_list_of_outputfiles()
 
     print("removing preassim and filter_restart")
@@ -99,7 +109,7 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
     os.system("rm -rf " + cluster.dart_rundir + "/perfect_output_*")
     os.system("rm -rf " + cluster.dart_rundir + "/obs_seq.fina*")
 
-def write_list_of_inputfiles_prior():
+def use_linked_files_as_prior():
     """Instruct DART to use the prior ensemble as input
     """
     files = []
@@ -107,10 +117,10 @@ def write_list_of_inputfiles_prior():
         files.append("./prior_ens" + str(iens) + "/wrfout_d01")
     write_txt(files, cluster.dart_rundir+'/input_list.txt')
 
-def write_list_of_inputfiles_posterior(assim_time):
+def use_filter_output_as_prior(assim_time):
     """Use posterior as input for DART, e.g. to evaluate the analysis in observation space
     """
-    filedir = cluster.archivedir+assim_time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/")
+    filedir = cluster.archivedir+assim_time.strftime(pattern_init_time+"/assim_stage0/")
 
     files = []
     for iens in range(1, exp.n_ens+1):
@@ -150,11 +160,11 @@ def archive_filteroutput(time):
 
     archive_dir = cluster.archivedir + "/obs_seq_final/"
     mkdir(archive_dir)
-    fout = archive_dir + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.final")
+    fout = archive_dir + time.strftime(pattern_obs_seq_final)
     copy(cluster.dart_rundir + "/obs_seq.final", fout)
     print(fout, "saved.")
 
-    archive_assim = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/")
+    archive_assim = cluster.archivedir + time.strftime(pattern_init_time+"/assim_stage0/")
     mkdir(archive_assim)
     copy(cluster.dart_rundir + "/input.nml", archive_assim + "/input.nml")
 
@@ -240,12 +250,24 @@ def set_obserr_assimilate_in_obsseqout(oso, outfile="./obs_seq.out"):
     oso.to_dart(outfile)
 
 def qc_obs(time, oso):
+    """Quality control of observations
+    We assume that the prior values have been evaluated and are in `run_DART/obs_seq.final`
+    
+    Args:
+        time (datetime): time of the assimilation
+        oso (ObsSeq): python representation of obs_seq.out file, will be modified and written to file
+    
+    Returns:
+        None    (writes to file)
+        The pre-existing obs_seq.out will be archived.
+        The new obs_seq.out will be written to the DART run directory.
+    """
     osf_prior = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.final")
 
     # obs should be superobbed already!
     for i, obscfg in enumerate(exp.observations): 
         if i > 0:
-            raise NotImplementedError()
+            raise NotImplementedError('Multiple observation types -> might not work')
 
         obs = oso.df.observations.values
         Hx_prior_mean = osf_prior.df['prior ensemble mean']
@@ -253,15 +275,15 @@ def qc_obs(time, oso):
 
         if obscfg.get("sat_channel") == 1:
 
-            if False:
-                print('removing obs with abs(FGD) < 0.05')
+            if True:
+                print('removing obs with abs(FGD) < 0.03')
                 Hx_prior = osf_prior.df.get_prior_Hx().T
-                # Hx_prior_mean = np.mean(Hx_prior, axis=0)
+                Hx_prior_mean = np.mean(Hx_prior, axis=0)
                 #Hx_prior_spread = osf_prior.df['prior ensemble spread'].values
                 #Hx_prior_spread[Hx_prior_spread < 1e-9] = 1e-9
 
                 abs_FGD = abs(obs - Hx_prior_mean)  # /Hx_prior_spread
-                oso.df = oso.df[abs_FGD > 0.05]  # Hx_prior_spread]
+                oso.df = oso.df[abs_FGD > 0.03]  # Hx_prior_spread]
 
                 # obs_dist_to_priormean = abs(obs - Hx_prior_mean)
                 # oso.df = oso.df[obs_dist_to_priormean > 5]
@@ -286,8 +308,8 @@ def qc_obs(time, oso):
         
         print('QC removed', n_obs_orig-len(oso.df), 'observations') 
 
-        # for archiving
-        f_out_archive = cluster.archivedir + "/obs_seq_out/" + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC")
+        # archive obs_seq.out before QC (contains all observations, including later removed ones)
+        f_out_archive = cluster.archivedir + "/obs_seq_out/" + time.strftime(pattern_obs_seq_out+"-beforeQC")
         os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True)
         copy(cluster.dart_rundir + "/obs_seq.out", f_out_archive)
 
@@ -299,9 +321,8 @@ def qc_obs(time, oso):
 
 def evaluate(assim_time, 
              obs_seq_out=False,
-             prior=True, 
-             posterior=False,
-             output_format="%Y-%m-%d_%H:%M_obs_seq.final-evaluate"):
+             prior_is_filter_output=False,
+             output_format=pattern_obs_seq_final+"-evaluate"):
     """Calculates either prior or posterior obs space values.
 
     Note: Depends on a prepared input_list.txt, which defines the ensemble (prior or posterior).
@@ -316,9 +337,6 @@ def evaluate(assim_time,
     Returns:
         obsseq.ObsSeq
     """
-    if prior == posterior:
-        raise ValueError('either prior or posterior must be True, the other must be False')
-
     os.makedirs(cluster.dart_rundir, exist_ok=True)  # create directory to run DART in
     os.chdir(cluster.dart_rundir)
 
@@ -330,10 +348,12 @@ def evaluate(assim_time,
     print("prepare nature")
     prepare_nature_dart(assim_time)  # link WRF files to DART directory
 
-    if posterior:
-        write_list_of_inputfiles_posterior(time)
-    if prior:
-        write_list_of_inputfiles_prior()
+    if prior_is_filter_output:
+        print('using filter_restart files from last assimilation as prior')
+        use_filter_output_as_prior(time)
+    else:
+        print('using files linked to `run_DART/<exp>/prior_ens*/wrfout_d01` as prior')
+        use_linked_files_as_prior()
 
     if obs_seq_out:
         copy(obs_seq_out, cluster.dart_rundir+'/obs_seq.out')
@@ -368,7 +388,7 @@ def get_obsseq_out(time):
 
         # copy to sim_archive
         os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True)
-        copy(f_obsseq,  time.strftime(cluster.archivedir+'/obs_seq_out/%Y-%m-%d_%H:%M_obs_seq.out'))
+        copy(f_obsseq,  time.strftime(cluster.archivedir+'/obs_seq_out/'+pattern_obs_seq_out))
 
         oso = obsseq.ObsSeq(cluster.dart_rundir + "/obs_seq.out")  # read the obs_seq.out file
     else: 
@@ -387,7 +407,7 @@ def prepare_inflation_2(time, prior_init_time):
         time (datetime): time of assimilation
         prior_init_time (datetime): time of prior assimilation
     """
-    dir_priorinf = cluster.archivedir + prior_init_time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/") 
+    dir_priorinf = cluster.archivedir + prior_init_time.strftime(pattern_init_time+"/assim_stage0/") 
 
     f_default = cluster.archive_base + "/input_priorinf_mean.nc"
     f_prior = dir_priorinf + time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc")
@@ -412,7 +432,7 @@ def prepare_inflation_2(time, prior_init_time):
         copy(f_default, f_new)
 
 def archive_inflation_2(time):
-    dir_output = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/")
+    dir_output = cluster.archivedir + time.strftime(pattern_init_time+"/assim_stage0/")
     os.makedirs(dir_output, exist_ok=True)
 
     f_output = cluster.dart_rundir + '/output_priorinf_sd.nc'
@@ -496,7 +516,7 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp):
         None
     """
     nproc = cluster.max_nproc
-    do_QC = getattr(exp, "reject_smallFGD", False)  # True: triggers additional evaluations of prior & posterior
+    do_QC = getattr(exp, "do_quality_control", False)  # True: triggers additional evaluations of prior & posterior
 
     # for which observation type do we have a parametrized observation error?
     error_is_parametrized = [obscfg["error_assimilate"] == False for obscfg in exp.observations]
@@ -517,8 +537,7 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp):
     # is any observation error parametrized?
     if any(error_is_parametrized) or do_QC:
         print(" (optional) evaluate prior for all observations (incl rejected) ")
-        evaluate(time, prior=True,
-                 output_format="%Y-%m-%d_%H:%M_obs_seq.final-evaluate_prior")
+        evaluate(time, output_format=pattern_obs_seq_final+"-evaluate_prior")
 
     print(" assign observation-errors for assimilation ")
     set_obserr_assimilate_in_obsseqout(oso, outfile=cluster.dart_rundir + "/obs_seq.out")
@@ -539,15 +558,15 @@ def main(time, prior_init_time, prior_valid_time, prior_path_exp):
         archive_inflation_2(time)
 
     print(" evaluate posterior in observation-space")
+    f_oso = pattern_obs_seq_out
     if do_QC:
-        f_oso = '%Y-%m-%d_%H:%M_obs_seq.out-beforeQC'  # includes all observations (including rejected ones in qc_obs())
-    else:
-        f_oso = '%Y-%m-%d_%H:%M_obs_seq.out'
+        f_oso += '-beforeQC'  # includes all observations (including rejected ones in qc_obs())
+
     # evaluate() separately after ./filter is crucial when assimilating cloud variables
     # as the obs_seq.final returned by ./filter was produced using un-clamped cloud variables
     evaluate(time, obs_seq_out=cluster.archivedir+'/obs_seq_out/'+time.strftime(f_oso),
-             prior=False, posterior=True,
-             output_format="%Y-%m-%d_%H:%M_obs_seq.final-evaluate_posterior")
+             prior_is_filter_output=True,
+             output_format=pattern_obs_seq_final+"-evaluate")
 
 
 if __name__ == "__main__":
-- 
GitLab