Skip to content
Snippets Groups Projects
Select Git revision
  • a8d53959c6b3efd31d8726a612725253ab2c1441
  • master default protected
  • cmp_tool-improvement
  • v0.15
  • v0.14
  • v0.13
  • v0.12
  • v0.11
  • v0.09
  • v0.08
  • v0.07
  • v0.06
  • v0.05
13 results

header.html

Blame
  • assimilate.py 26.13 KiB
    import os
    import sys
    import shutil
    import warnings
    import glob
    import time as time_module
    import datetime as dt
    import numpy as np
    
    from dartwrf.utils import Config, symlink, copy, try_remove, print, shell, write_txt, obskind_read
    from dartwrf import wrfout_add_geo
    from dartwrf.obs import error_models as err
    from dartwrf.obs import obsseq
    from dartwrf import dart_nml
    
    
    def prepare_DART_grid_template(cfg):
        """Prepare DART grid template wrfinput_d01 file from a prior file
        
        DART needs a wrfinput file as a template for the grid information.
        No data except grid info will be read from this file.
        The grid information must match exactly with the prior file "wrfout_d01"
        """
        f_wrfout_dummy = cfg.dir_dart_run+'/prior_ens1/wrfout_d01'
        if os.path.exists(f_wrfout_dummy):
            copy(f_wrfout_dummy, cfg.dir_dart_run + "/wrfinput_d01")
        
            if cfg.geo_em_forecast:
                wrfout_add_geo.run(cfg, cfg.geo_em_forecast, cfg.dir_dart_run + "/wrfinput_d01")
        else:
            pass # what now?
    
    def prepare_prior_ensemble(cfg, assim_time, prior_init_time, prior_valid_time, prior_path_exp):
        """Prepares DART files for running filter
        i.e.
        - links first guess state to DART first guess filenames
        - creates wrfinput_d01 files (grid file, not a real wrfinput file)
        - adds geo-reference (xlat,xlon) coords so that DART can deal with the files
        - writes txt files so DART knows what input and output is
        - removes probably pre-existing files which could lead to problems
        """
        print("prepare prior ensemble")
        for iens in range(1, cfg.ensemble_size + 1):
    
            print("link wrfout file to DART background file")
            f_wrfout = (
                prior_path_exp
                + prior_init_time.strftime(cfg.pattern_init_time)
                + str(iens)
                + prior_valid_time.strftime(cfg.wrfout_format)
            )
            dart_ensdir = cfg.dir_dart_run + "/prior_ens" + str(iens)
            wrfout_dart = dart_ensdir + "/wrfout_d01"
    
            # copy prior ens file from archived wrfout files (prior_path_exp)
            print("link", f_wrfout, "to", wrfout_dart)
            if not os.path.isfile(f_wrfout):
                raise FileNotFoundError(f_wrfout + " does not exist")
            symlink(f_wrfout, wrfout_dart)
    
            # ensure prior time matches assim time
            # can be intentionally different, e.g. by using a prior for a different time
            if assim_time != prior_valid_time:
                copy(f_wrfout, wrfout_dart)
                print("overwriting time in prior from nature wrfout")
                shell(cfg.ncks + " -A -v XTIME,Times " +
                      cfg.dir_dart_run+"/wrfout_d01 " + wrfout_dart)
    
            # this seems to be necessary (else wrong level selection)
            #if cluster.geo_em_forecast:
            #    wrfout_add_geo.run(cluster.geo_em_forecast, wrfout_dart)
    
        use_linked_files_as_prior(cfg)
        write_list_of_outputfiles(cfg)
    
        print("removing preassim and filter_restart")
        os.system("rm -rf " + cfg.dir_dart_run + "/preassim_*")
        os.system("rm -rf " + cfg.dir_dart_run + "/filter_restart*")
        os.system("rm -rf " + cfg.dir_dart_run + "/output_mean*")
        os.system("rm -rf " + cfg.dir_dart_run + "/output_sd*")
        os.system("rm -rf " + cfg.dir_dart_run + "/perfect_output_*")
        os.system("rm -rf " + cfg.dir_dart_run + "/obs_seq.fina*")
    
    
    def use_linked_files_as_prior(cfg):
        """Instruct DART to use the prior ensemble as input
        """
        files = []
        for iens in range(1, cfg.ensemble_size+1):
            files.append("./prior_ens" + str(iens) + "/wrfout_d01")
        write_txt(files, cfg.dir_dart_run+'/input_list.txt')
    
    
    def use_filter_output_as_prior(cfg):
        """Use the last posterior as input for DART, e.g. to evaluate the analysis in observation space
        """
        files = []
        for iens in range(1, cfg.ensemble_size+1):
            f_new = cfg.dir_dart_run+'/prior_ens'+str(iens)+'/wrfout_d01'
            try:
                os.remove(f_new)
            except:
                pass
            os.rename(cfg.dir_dart_run+'/filter_restart_d01.' +
                      str(iens).zfill(4), f_new)
            files.append(f_new)
    
        write_txt(files, cfg.dir_dart_run+'/input_list.txt')
    
    
    def write_list_of_outputfiles(cfg):
        files = []
        for iens in range(1, cfg.ensemble_size+1):
            files.append("./filter_restart_d01." + str(iens).zfill(4))
        write_txt(files, cfg.dir_dart_run+'/output_list.txt')
    
    
    def filter(cfg):
        """Calls DART ./filter program
    
        Args:
            nproc (int): number of cores for use in ./filter call
    
        Returns:
            None    (writes to file)
        """
        nproc = cfg.max_nproc
            
        print("time now", dt.datetime.now())
        print("running filter")
        os.chdir(cfg.dir_dart_run)
        try_remove(cfg.dir_dart_run + "/obs_seq.final")
    
        t = time_module.time()
        if nproc > 1:
            # -genv I_MPI_PIN_PROCESSOR_LIST=0-"+str(int(nproc) - 1)
            shell(cfg.dart_modules+"; mpirun -np " +
                  str(int(nproc))+" ./filter > log.filter")
        else:
            shell(cfg.dart_modules+"; ./filter > log.filter")
        print("./filter took", int(time_module.time() - t), "seconds")
    
        if not os.path.isfile(cfg.dir_dart_run + "/obs_seq.final"):
            raise RuntimeError(
                "obs_seq.final does not exist in run_DART directory. ",
                "Check log file at " + cfg.dir_dart_run + "/log.filter")
    
    
    def archive_filteroutput(cfg, time):
        """Archive filter output files (filter_restart, preassim, postassim, output_mean, output_sd)
        """
        # archive diagnostics
        dir_out = cfg.dir_archive + time.strftime(cfg.pattern_init_time)
        os.makedirs(dir_out, exist_ok=True)
    
        # copy input.nml to archive
        copy(cfg.dir_dart_run + "/input.nml", dir_out + "/input.nml")
    
        # copy filter_restart files to archive (initial condition for next run)
        for iens in range(1, cfg.ensemble_size + 1):  # single members
            copy(
                cfg.dir_dart_run + "/filter_restart_d01." + str(iens).zfill(4),
                dir_out + "/filter_restart_d01." + str(iens).zfill(4),
            )
    
        # copy preassim/postassim files to archive (not necessary for next forecast run)
        for f in ["preassim_mean.nc", "preassim_sd.nc",
                  "postassim_mean.nc", "postassim_sd.nc",
                  "output_mean.nc", "output_sd.nc"]:
            try:
                copy(cfg.dir_dart_run + "/" + f, dir_out + "/" + f)
            except:
                warnings.warn(f+" not found")
    
        if False:  # save disk space, dont do this by default
            try:
                ftypes = ['preassim', 'postassim']
                for ftype in ftypes:
                    for iens in range(1, cfg.ensemble_size + 1):
                        fname = "/"+ftype+"_member_" + str(iens).zfill(4) + ".nc"
                        copy(cfg.dir_dart_run + fname, dir_out + fname)
            except Exception as e:
                warnings.warn(str(e))
    
    
    def get_parametrized_error(obscfg, osf_prior) -> np.ndarray: # type: ignore
        """Calculate the parametrized error for an ObsConfig (one obs type)
    
        Args:
            obscfg (object): Configuration of observations
            osf_prior (obsseq.ObsRecord): Contains truth and prior values from obs_seq.final
                                            (output of ./final in evaluate-mode (no posterior))
    
        Returns:
            np.array: observation error std-dev for assimilation
        """
        Hx_prior = osf_prior.get_prior_Hx().T
        Hx_truth = osf_prior.get_truth_Hx()
    
        # compute the obs error for assimilation on the averaged grid
        # since the assimilation is done on the averaged grid
        channel = obscfg.get("sat_channel")
    
        if channel == 5:
            return err.calc_obserr_WV('WV62', Hx_truth, Hx_prior)
        if channel == 6:
            return err.calc_obserr_WV('WV73', Hx_truth, Hx_prior)
        else:
            NotImplementedError('sat_channel not implemented',
                                obscfg.get("sat_channel"))
        
    
    def set_obserr_assimilate_in_obsseqout(cfg, oso, outfile="./obs_seq.out"):
        """"Overwrite existing variance values in obs_seq.out files
    
        Args:
            oso (ObsSeq): python representation of obs_seq.out file, will be modified and written to file
    
        Returns:
            None    (writes to file)
    
        Variables:
            osf_prior (ObsSeq): python representation of obs_seq.final (output of filter in evaluate-mode without posterior)
                            contains prior values; used for parameterized errors
        """
    
        for obscfg in cfg.assimilate_these_observations:
            kind_str = obscfg['kind']  # e.g. 'RADIOSONDE_TEMPERATURE'
            kind = cfg.obs_kind_nrs[kind_str]  # e.g. 263
    
            # modify observation error of each kind sequentially
            where_oso_iskind = oso.df.kind == kind
    
            if "error_assimilate" in obscfg:
                if obscfg["error_assimilate"]  == False:
                    print("error_assimilate is False, will compute dynamic obs-errors")
                    # get a parametrized error for this observation type
                    
                    # parametrization is state dependent => need states
                    use_external_FO = obscfg.get("external_FO", False)
                    if use_external_FO:
                        # read prior from obs_seq.out
                        pass
                        
                        # modify OE in obs_seq.out
                        pass
                    else:
                        f_osf = cfg.dir_dart_run + "/obs_seq.final"
                        if not os.path.isfile(f_osf):
                            print('computing prior as input for dynamic obs errors')
                            # generates obs_seq.final
                            evaluate(cfg, cfg.time, f_out_pattern=f_osf)
    
                        # read prior (obs_seq.final)
                        osf_prior = obsseq.ObsSeq(f_osf)
                        where_osf_iskind = osf_prior.df.kind == kind
    
                        assim_err = get_parametrized_error(
                            obscfg, osf_prior.df[where_osf_iskind])
                        oso.df.loc[where_oso_iskind, 'variance'] = assim_err**2
                        # assert np.allclose(assim_err, oso.df['variance']**2)  # check
                else:
                    # overwrite with user-defined values
                    oso.df.loc[where_oso_iskind,
                                'variance'] = obscfg["error_assimilate"]**2
    
        oso.to_dart(outfile)
    
    
    def reject_small_FGD(cfg, time, oso):
        """Quality control of observations
        We assume that the prior values have been evaluated and are in `run_DART/obs_seq.final`
    
        Args:
            time (datetime): time of the assimilation
            oso (ObsSeq): python representation of obs_seq.out file, will be modified and written to file
    
        Returns:
            None    (writes to file)
            The pre-existing obs_seq.out will be archived.
            The new obs_seq.out will be written to the DART run directory.
        """
        osf_prior = obsseq.ObsSeq(cfg.dir_dart_run + "/obs_seq.final")
    
        # obs should be superobbed already!
        for i, obscfg in enumerate(cfg.assimilate_these_observations):
            if i > 0:
                raise NotImplementedError(
                    'Multiple observation types -> might not work')
    
            obs = oso.df.observations.values
            Hx_prior_mean = osf_prior.df['prior ensemble mean']
            n_obs_orig = len(obs)
    
            if obscfg.get("sat_channel") == 1:
    
                if True:
                    print('removing obs with abs(FGD) < 0.03')
                    Hx_prior = osf_prior.df.get_prior_Hx().T
                    Hx_prior_mean = np.mean(Hx_prior, axis=0)
                    # Hx_prior_spread = osf_prior.df['prior ensemble spread'].values
                    # Hx_prior_spread[Hx_prior_spread < 1e-9] = 1e-9
    
                    abs_FGD = abs(obs - Hx_prior_mean)  # /Hx_prior_spread
                    oso.df = oso.df[abs_FGD > 0.03]  # Hx_prior_spread]
    
                    # obs_dist_to_priormean = abs(obs - Hx_prior_mean)
                    # oso.df = oso.df[obs_dist_to_priormean > 5]
                    # print('removed', n_obs_orig-len(oso.df), 'observations with abs(FGD) smaller than 5')
    
                else:
                    # set obs to prior mean => no change in mean but spread is adjusted.
                    abs_FGD = abs(obs - Hx_prior_mean)  # /Hx_prior_spread
                    Hx_prior_median = np.median(
                        osf_prior.df.get_prior_Hx(), axis=1)
                    oso.df.loc[abs_FGD <= 0.05,
                               'observations'] = Hx_prior_median[abs_FGD <= 0.05]
    
            elif obscfg.get("sat_channel") == 6:  # WV73
    
                print('removing obs with abs(FGD) smaller than 5')
                obs = oso.df.observations.values
                n_obs_orig = len(obs)
    
                abs_FGD = abs(obs - Hx_prior_mean)  # /Hx_prior_spread
                oso.df = oso.df[abs_FGD > 5]
            else:
                raise NotImplementedError(
                    'no obs QC implemented for this obs type')
    
            print('QC removed', n_obs_orig-len(oso.df), 'observations')
    
            # archive obs_seq.out before QC (contains all observations, including later removed ones)
            f_out_archive = time.strftime(cfg.pattern_obs_seq_out)+"-beforeQC"
            os.makedirs(os.path.dirname(f_out_archive), exist_ok=True)
            copy(cfg.dir_dart_run + "/obs_seq.out", f_out_archive)
    
            # for assimilation later
            f_out_dart = cfg.dir_dart_run+'/obs_seq.out'
            oso.to_dart(f_out_dart)
            print('saved', f_out_dart)
    
    
    def evaluate(cfg, assim_time,
                 obs_seq_out=False,
                 prior_is_filter_output=False,
                 f_out_pattern: str = './obs_seq.final'):
        """Calculates either prior or posterior obs space values.
    
        Note: Depends on a prepared input_list.txt, which defines the ensemble (prior or posterior).
    
        Output file: Saves obs_seq.final to `/archivedir/obs_seq_final/`
    
        Args:
            assim_time (datetime): time of assimilation
            obs_seq_out (str, optional):    use the argument as obs_seq.out file, defaults to use the existing obs_seq.out file
                                            at these observations, the posterior will be evaluated
            f_out_pattern (str, mandatory): output filename
            prior_is_filter_output (bool, optional): if True, use the filter output as prior, else use already linked ensemble files
    
        Returns
            None (writes file)
        """
        prepare_run_DART_folder(cfg)
    
        if prior_is_filter_output:
            print('using filter_restart files in run_DART as prior')
            use_filter_output_as_prior(cfg)
        else:
            print('using files linked to `run_DART/<exp>/prior_ens*/wrfout_d01` as prior')
            use_linked_files_as_prior(cfg)
    
        # the observations at which to evaluate the prior at
        if obs_seq_out:
            copy(obs_seq_out, cfg.dir_dart_run + '/obs_seq.out')  # user defined file
        else:
            # use existing obs_seq.out file currently present in the run_DART directory
            if not os.path.isfile(cfg.dir_dart_run+'/obs_seq.out'):
                raise RuntimeError(cfg.dir_dart_run +
                                   '/obs_seq.out does not exist')
    
        dart_nml.write_namelist(cfg, just_prior_values=True)
        filter(cfg)
        archive_filter_diagnostics(cfg, assim_time, f_out_pattern)
    
    
    def archive_filter_diagnostics(cfg, time, f_out_pattern):
        """Copy the filter output txt to the archive
        """
        f_archive = time.strftime(f_out_pattern)
    
        dir_out = os.path.dirname(f_archive)
        os.makedirs(dir_out, exist_ok=True)
        copy(cfg.dir_dart_run + "/obs_seq.final", f_archive)
        print(f_archive, "saved.")
    
    
    def txtlink_to_prior(cfg, time, prior_init_time, prior_path_exp):
        """For reproducibility, write the path of the prior to a txt file
        """
        os.makedirs(cfg.dir_archive +
                    time.strftime('/%Y-%m-%d_%H:%M/'), exist_ok=True)
        os.system('echo "'+prior_path_exp+'\n'+prior_init_time.strftime('/%Y-%m-%d_%H:%M/')
                  + '\n'+time.strftime('/wrfrst_d01_%Y-%m-%d_%H:%M:%S')+'" > '
                    + cfg.dir_archive + time.strftime('/%Y-%m-%d_%H:%M/')+'link_to_prior.txt')
    
    
    def prepare_adapt_inflation(cfg, time, prior_init_time):
        """Prepare inflation files (spatially varying)
    
        Recycles inflation files from previous assimilations
        or takes default files from archive.
    
        Args:
            time (datetime): time of assimilation
            prior_init_time (datetime): time of prior assimilation
        """
        dir_priorinf = cfg.dir_archive + \
            prior_init_time.strftime(cfg.pattern_init_time)
    
        f_prior = dir_priorinf + \
            time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc")
        f_new = cfg.dir_dart_run + '/input_priorinf_mean.nc'
    
        if os.path.isfile(f_prior):
            copy(f_prior, f_new)
            print(f_prior, 'copied to', f_new)
        else:  
            # no prior inflation file at the first assimilation
            warnings.warn(f_prior + ' does not exist. Using default file instead.')
            f_default = cfg.dir_archive+"/../input_priorinf_mean.nc"
            copy(f_default, f_new)
    
    
        f_prior = dir_priorinf + \
            time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_sd.nc")
        f_new = cfg.dir_dart_run + '/input_priorinf_sd.nc'
    
        if os.path.isfile(f_prior):
            copy(f_prior, f_new)
            print(f_prior, 'copied to', f_new)
        else:
            # no prior inflation file at the first assimilation
            warnings.warn(f_prior + ' does not exist. Using default file instead.')
            f_default = cfg.dir_archive + "/../input_priorinf_sd.nc"
            copy(f_default, f_new)
    
    
    def archive_adapt_inflation(cfg, time):
        dir_output = cfg.dir_archive + time.strftime(cfg.pattern_init_time)
        os.makedirs(dir_output, exist_ok=True)
    
        f_output = cfg.dir_dart_run + '/output_priorinf_sd.nc'
        f_archive = dir_output + \
            time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_sd.nc")
        copy(f_output, f_archive)
        print(f_archive, 'saved')
    
        f_output = cfg.dir_dart_run + '/output_priorinf_mean.nc'
        f_archive = dir_output + \
            time.strftime("/%Y-%m-%d_%H:%M_output_priorinf_mean.nc")
        copy(f_output, f_archive)
        print(f_archive, 'saved')
    
    
    def prepare_run_DART_folder(cfg: Config):
        # create directory to run DART in
    
        def __link_RTTOV_files():
            """Link required files for running RTTOV to run_DART
            """
            if cfg.dir_rttov_src != False:
                rttov_files = ['rttov13pred54L/rtcoef_msg_4_seviri_o3.dat',
                            'mfasis_lut/rttov_mfasis_cld_msg_4_seviri_deff.H5',
                            'cldaer_visir/sccldcoef_msg_4_seviri.dat']
    
                for f_src in rttov_files:
                    destname = os.path.basename(f_src)
                    if 'rtcoef' in f_src:
                        destname = 'rtcoef_msg_4_seviri.dat'
    
                    symlink(cfg.dir_rttov_src + f_src,
                            cfg.dir_dart_run+'/'+destname)
    
                symlink(cfg.dir_dart_run+'/rttov_mfasis_cld_msg_4_seviri_deff.H5',
                        cfg.dir_dart_run+'/rttov_mfasis_cld_msg_4_seviri.H5')  # use deff, not OPAC
    
                symlink(cfg.dir_dart_src+'/../../../observations/forward_operators/rttov_sensor_db.csv',
                        cfg.dir_dart_run+'/rttov_sensor_db.csv')
                
        def __link_DART_exe():
            """Link the DART executables to the run_DART directory
            """
            bins = ['perfect_model_obs', 'filter', 'obs_diag', 'obs_seq_to_netcdf']
            for b in bins:
                symlink(os.path.join(cfg.dir_dart_src, b),
                        os.path.join(cfg.dir_dart_run, b))
    
            symlink(cfg.dir_dart_src+'/../../../assimilation_code/programs/gen_sampling_err_table/'
                    + 'work/sampling_error_correction_table.nc',
                    cfg.dir_dart_run+'/sampling_error_correction_table.nc')
        
        #########################
        # remove any remains of a previous run
        os.makedirs(cfg.dir_dart_run, exist_ok=True)
        os.chdir(cfg.dir_dart_run)
        os.system("rm -f input.nml obs_seq.in obs_seq.out-orig obs_seq.final")
        
        __link_DART_exe()
        
        for obscfg in cfg.assimilate_these_observations:
            if 'sat_channel' in obscfg:
                __link_RTTOV_files()
                continue  # only need to link RTTOV files once
    
    
    def get_obsseq_out(cfg, time, prior_path_exp, prior_init_time, prior_valid_time):
        """Prepares an obs_seq.out file in the run_DART folder
    
        Options:
        1) Use existing obs_seq.out file
        2) Use precomputed FO (e.g. cloud fraction)
        3) Generate new observations from nature run with new noise
    
        Args:
            time (datetime): time of assimilation
            prior_path_exp (str): path to the experiment folder
            prior_init_time (datetime): time of the prior forecast init
            prior_valid_time (datetime): time of the prior
    
        Returns:
            obsseq.ObsSeq
        """
        use_ACF = False
        if 'assimilate_cloudfractions' in cfg:
            if cfg.assimilate_cloudfractions:
                use_ACF = True
    
        oso = None
        if isinstance(cfg.assimilate_existing_obsseq, str):
            # assume that the user wants to use an existing obs_seq.out file
    
            f_obsseq = time.strftime(cfg.assimilate_existing_obsseq)
            if os.path.isfile(f_obsseq):
                # copy to run_DART folder
                copy(f_obsseq, cfg.dir_dart_run+'/obs_seq.out')
                print(f_obsseq, 'copied to', cfg.dir_dart_run+'/obs_seq.out')
    
            else:
                # explain the error if the file does not exist
                raise IOError('cfg.assimilate_existing_obsseq is not False. \n'
                              + 'In this case, assimilate_existing_obsseq should be a file path (wildcards %H, %M allowed)!\n'
                              + 'But there is no file with this pattern: '+str(cfg.assimilate_existing_obsseq))
    
        elif use_ACF:
            # prepare observations with precomputed FO
            CF_config = cfg.CF_config.copy()
            f_prior_pattern = CF_config.pop('first_guess_pattern')
            f_obs_pattern = time.strftime(CF_config.pop('f_obs_pattern'))
            f_obs = glob.glob(f_obs_pattern)
            if len(f_obs) == 0:
                raise FileNotFoundError(f_obs_pattern + ' not found')
            f_obs = f_obs[0]
            
            pattern_prior = '/'.join([prior_path_exp,
                                      prior_init_time.strftime('/%Y-%m-%d_%H:%M/'),
                                      '<iens>/',
                                      prior_valid_time.strftime(f_prior_pattern),
                                      ])
    
            from CloudFractionDA import obsseqout as cfoso
            cfoso.write_obsseq(time, pattern_prior, f_obs,
                               cfg.obs_kind_nrs,
                               path_output = cfg.dir_dart_run + "/obs_seq.out",
                               **CF_config,
                               
            )
    
        else:
            # do NOT use an existing obs_seq.out file
            # but generate observations with new observation noise
            from dartwrf.obs import create_obsseq_out as osq_out
            oso = osq_out.generate_new_obsseq_out(cfg)
    
        # copy to sim_archive
        f_obsseq_archive = time.strftime(cfg.pattern_obs_seq_out)
        os.makedirs(os.path.dirname(f_obsseq_archive), exist_ok=True)
        copy(cfg.dir_dart_run+'/obs_seq.out', f_obsseq_archive)
    
        # read so that we can return it
        if oso is None:
            oso = obsseq.ObsSeq(cfg.dir_dart_run + "/obs_seq.out")
        return oso
    
    
    def main(cfg: Config):
        """Assimilate observations
        as defined in config/cfg.py
        for a certain timestamp (argument) of the nature run (defined in config/clusters.py)
    
        Workflow:
        1) prepare nature run & prior ensemble for DART
        2) create obs_seq.in
        3) create obs from nature (obs_seq.out) or use existing one
        optional: quality-control
        4) Assimilate with assigned errors
        5) Evaluate posterior (after DART clamping, e.g. setting negative cloud water to zero)
    
        Args:
            assim_time (dt.datetime):           time of output
            prior_init_time (dt.datetime):      forecast start of prior
            prior_valid_time (dt.datetime):     valid time of prior (may be different to assim_time)
            prior_path_exp (str):               path to prior experiment
    
        Returns:
            None
        """
        # read config
        time = cfg.time
        prior_init_time = cfg.prior_init_time
        prior_valid_time = cfg.prior_valid_time
        prior_path_exp = cfg.prior_path_exp
        
        cfg.obs_kind_nrs = obskind_read(cfg.dir_dart_src)
        
        # do_reject_smallFGD: triggers additional evaluations of prior & posterior
        do_reject_smallFGD = getattr(cfg, "do_reject_smallFGD", False)
        prepare_run_DART_folder(cfg)
        nml = dart_nml.write_namelist(cfg)
    
        print(" get observations with specified obs-error")
        oso = get_obsseq_out(cfg, time, prior_path_exp, prior_init_time, prior_valid_time)
        
        # prepare for assimilation
        prepare_prior_ensemble(cfg, time, prior_init_time, prior_valid_time, prior_path_exp)
        prepare_DART_grid_template(cfg)
    
        # additional evaluation of prior (in case some observations are rejected)
        if do_reject_smallFGD:
            print(" evaluate prior for all observations (incl rejected) ")
            evaluate(cfg, time, f_out_pattern=cfg.pattern_obs_seq_final+"-evaluate_prior")
    
        print(" assign observation-errors for assimilation ")
        set_obserr_assimilate_in_obsseqout(cfg, oso, outfile=cfg.dir_dart_run + "/obs_seq.out")
    
        if do_reject_smallFGD:
            print(" reject observations? ")
            reject_small_FGD(cfg, time, oso)
    
        prior_inflation_type = nml['&filter_nml']['inf_flavor'][0][0]
        if prior_inflation_type != '0':
            prepare_adapt_inflation(cfg, time, prior_init_time)
    
        print(" run filter ")
        dart_nml.write_namelist(cfg)
        filter(cfg)
        archive_filteroutput(cfg, time)
        archive_filter_diagnostics(cfg, time, cfg.pattern_obs_seq_final)
        txtlink_to_prior(cfg, time, prior_init_time, prior_path_exp)
    
        if prior_inflation_type != '0':
            archive_adapt_inflation(cfg, time)
    
        if 'evaluate_posterior_in_obs_space' in cfg:
            if cfg.evaluate_posterior_in_obs_space:
                print(" evaluate posterior in observation-space")
                f_oso = time.strftime(cfg.pattern_obs_seq_out)
                if do_reject_smallFGD:
                    # includes all observations (including rejected ones in reject_small_FGD())
                    f_oso += '-beforeQC'
    
                # evaluate() separately after ./filter is crucial when assimilating cloud variables
                # as the obs_seq.final returned by ./filter was produced using un-clamped cloud variables
                evaluate(cfg, time,
                        obs_seq_out=f_oso,
                        prior_is_filter_output=True,
                        f_out_pattern=cfg.pattern_obs_seq_final+"-evaluate")
    
    
    if __name__ == "__main__":
        """Assimilate observations
    
        Example:
            python assimilate.py 2008-08-07_13:00 2008-08_12:00 2008-08-07_13:00 /path/to/experiment/
        """
        cfg = Config.from_file(sys.argv[1])
        main(cfg)