From 45b99e6bc60707545341917e1e34642b1e354c6b Mon Sep 17 00:00:00 2001
From: Anne Philipp <anne.philipp@univie.ac.at>
Date: Fri, 15 Feb 2019 16:48:31 +0100
Subject: [PATCH] added possibility to extract ensemble members with ELDA
 stream (and ENFO)

---
 source/python/classes/EcFlexpart.py    | 138 +++++++++++++++++++++----
 source/python/mods/get_mars_data.py    |  14 +++
 source/python/mods/prepare_flexpart.py |   2 +
 3 files changed, 134 insertions(+), 20 deletions(-)

diff --git a/source/python/classes/EcFlexpart.py b/source/python/classes/EcFlexpart.py
index 5d0e32b..9835199 100644
--- a/source/python/classes/EcFlexpart.py
+++ b/source/python/classes/EcFlexpart.py
@@ -66,7 +66,8 @@ import numpy as np
 from eccodes import (codes_index_select, codes_new_from_index, codes_get,
                      codes_get_values, codes_set_values, codes_set,
                      codes_write, codes_release, codes_new_from_index,
-                     codes_index_release, codes_index_get)
+                     codes_index_release, codes_index_get, codes_get_array,
+                     codes_set_array, codes_grib_new_from_file)
 
 # software specific classes and modules from flex_extract
 sys.path.append('../')
@@ -76,6 +77,7 @@ from mods.tools import (init128, to_param_id, silent_remove, product,
                         my_error, make_dir, get_informations, get_dimensions,
                         execute_subprocess)
 from MarsRetrieval import MarsRetrieval
+from UioFiles import UioFiles
 import mods.disaggregation as disaggregation
 
 # ------------------------------------------------------------------------------
@@ -662,10 +664,8 @@ class EcFlexpart(object):
                            'param':None}
 
         for ftype in self.types:
-            # fk contains field types such as
+            # ftype contains field types such as
             #     [AN, FC, PF, CV]
-            # fv contains all of the items of the belonging key
-            #     [times, steps]
             for pk, pv in self.params.iteritems():
                 # pk contains one of these keys of params
                 #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
@@ -706,6 +706,23 @@ class EcFlexpart(object):
                 if pk == 'GG__SL' and pv[0] == 'Q':
                     retr_param_dict['area'] = ""
                     retr_param_dict['gaussian'] = 'reduced'
+                if ftype.upper() == 'FC' and \
+                    'acc' not in retr_param_dict['target']:
+                    if (int(retr_param_dict['time'][0]) +
+                        int(retr_param_dict['step'][0])) > 23:
+                        dates = retr_param_dict['date'].split('/')
+                        sdate = datetime.strptime(dates[0], '%Y%m%d%H')
+                        sdate = sdate - timedelta(days=1)
+                        retr_param_dict['date'] = '/'.join(
+                            [sdate.strftime("%Y%m%d")] +
+                            retr_param_dict['date'][1:])
+
+                        print('CHANGED FC start date to ' +
+                              sdate.strftime("%Y%m%d") +
+                              ' to accomodate TIME=' +
+                              retr_param_dict['time'][0] +
+                              ', STEP=' +
+                              retr_param_dict['time'][0])
 
     # ------  on demand path  --------------------------------------------------
                 if self.basetime is None:
@@ -759,21 +776,14 @@ class EcFlexpart(object):
                             'acc' not in pk ) :
                             times = retr_param_dict['time'].split('/')
                             steps = retr_param_dict['step'].split('/')
-                            print 'times', times, int(times[0]), times[1:]
-                            print 'steps', steps, int(steps[0])
+
                             while int(times[0]) + int(steps[0]) <= 12:
-                                print 'HELLO'
                                 times = times[1:]
-                                print 'in while 1 ', times
-
                                 if len(times) > 1:
                                     retr_param_dict['time'] = '/'.join(times)
                                 else:
                                     retr_param_dict['time'] = times[0]
 
-                                print 'in while 2 ', times
-                                print retr_param_dict['time']
-
                         if (pk != 'OG_OROLSM__SL' and
                             int(retr_param_dict['step'].split('/')[0]) == 0 and
                             int(timesave.split('/')[0]) == 0):
@@ -915,7 +925,11 @@ class EcFlexpart(object):
 
         # get the values of the keys which are used for distinct access
         # of grib messages via product
-        index_keys = ["date", "time", "step"]
+        if '/' in self.number:
+            # more than one ensemble member is selected
+            index_keys = ["number", "date", "time", "step"]
+        else:
+            index_keys = ["date", "time", "step"]
         iid, index_vals = self._mk_index_values(c.inputdir,
                                                 inputfiles,
                                                 index_keys)
@@ -990,23 +1004,35 @@ class EcFlexpart(object):
             else:
                 t_enddate = t_date + timedelta(2*int(c.dtime))
 
+            # if necessary, add ensemble member number to filename suffix
+            # otherwise, add empty string
+            if 'number' in index_keys:
+                index_number = index_keys.index('number')
+                if len(index_vals[index_number]) > 1:
+                    numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
+            else:
+                numbersuffix = ''
+
             if c.purefc:
                 fnout = os.path.join(c.inputdir, 'flux' +
                                      t_date.strftime('%Y%m%d.%H') +
-                                     '.{:0>3}'.format(step-2*int(c.dtime)))
+                                     '.{:0>3}'.format(step-2*int(c.dtime)) +
+                                     numbersuffix)
                 gnout = os.path.join(c.inputdir, 'flux' +
                                      t_date.strftime('%Y%m%d.%H') +
-                                     '.{:0>3}'.format(step-int(c.dtime)))
+                                     '.{:0>3}'.format(step-int(c.dtime)) +
+                                     numbersuffix)
                 hnout = os.path.join(c.inputdir, 'flux' +
                                      t_date.strftime('%Y%m%d.%H') +
-                                     '.{:0>3}'.format(step))
+                                     '.{:0>3}'.format(step) +
+                                     numbersuffix)
             else:
                 fnout = os.path.join(c.inputdir, 'flux' +
-                                     t_m2dt.strftime('%Y%m%d%H'))
+                                     t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
                 gnout = os.path.join(c.inputdir, 'flux' +
-                                     t_m1dt.strftime('%Y%m%d%H'))
+                                     t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
                 hnout = os.path.join(c.inputdir, 'flux' +
-                                     t_dt.strftime('%Y%m%d%H'))
+                                     t_dt.strftime('%Y%m%d%H') + numbersuffix)
 
             print("outputfile = " + fnout)
             f_handle = open(fnout, 'w')
@@ -1369,7 +1395,11 @@ class EcFlexpart(object):
 
         # get the values of the keys which are used for distinct access
         # of grib messages via product
-        index_keys = ["date", "time", "step"]
+        if '/' in self.number:
+            # more than one ensemble member is selected
+            index_keys = ["number", "date", "time", "step"]
+        else:
+            index_keys = ["date", "time", "step"]
         iid, index_vals = self._mk_index_values(c.inputdir,
                                                 inputfiles,
                                                 index_keys)
@@ -1523,6 +1553,13 @@ class EcFlexpart(object):
                 suffix = cdate[2:8] + '.' + ctime + '.' + cstep
             else:
                 suffix = cdate_hour[2:10]
+
+            # if necessary, add ensemble member number to filename suffix
+            if 'number' in index_keys:
+                index_number = index_keys.index('number')
+                if len(index_vals[index_number]) > 1:
+                    suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
+
             fnout = os.path.join(c.inputdir, c.prefix + suffix)
             print("outputfile = " + fnout)
             # collect for final processing
@@ -1556,6 +1593,67 @@ class EcFlexpart(object):
         return
 
 
+    def calc_extra_elda(self, path, prefix):
+        ''' Calculates extra ensemble members for ELDA - Stream.
+
+        Parameters
+        ----------
+        path : str
+            Path to the output files.
+
+        prefix : str
+            The prefix of the output filenames as defined in Control file.
+
+        Return
+        ------
+
+        '''
+        # max number
+        maxnum = int(self.number.split('/')[-1])
+
+        # get a list of all prepared output files with control forecast (CF)
+        CF_filelist = UioFiles(path, prefix + '*.N000')
+
+        for cffile in CF_filelist.files:
+            with open(cffile, 'rb') as f:
+                cfvalues=[]
+                while True:
+                    fid = codes_grib_new_from_file(f)
+                    if fid is None:
+                        break
+                    cfvalues.append(codes_get_array(fid, 'values'))
+                    codes_release(fid)
+
+            filename = cffile.split('N000')[0]
+            for i in range(1, maxnum+1): # max number nehmen
+
+                # read an ensemble member
+                g = open(filename + 'N{:0>3}'.format(i), 'rb')
+                # create file for newly calculated ensemble member
+                h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
+                # number of message in grib file
+                j = 0
+                while True:
+                    gid = codes_grib_new_from_file(g)
+                    if gid is None:
+                        break
+                    values = codes_get_array(gid, 'values')
+                    codes_set_array(gid, 'values',
+                                    values-2*(values-cfvalues[j]))
+                    codes_set(gid, 'number', i+maxnum)
+                    codes_write(gid, h)
+                    codes_release(gid)
+                    j += 1
+
+                g.close()
+                h.close()
+                print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
+                self.outputfilelist.append(
+                    os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
+
+        return
+
+
     def process_output(self, c):
         '''Postprocessing of FLEXPART input files.
 
diff --git a/source/python/mods/get_mars_data.py b/source/python/mods/get_mars_data.py
index 4601787..4d8aa5e 100755
--- a/source/python/mods/get_mars_data.py
+++ b/source/python/mods/get_mars_data.py
@@ -208,6 +208,15 @@ def mk_server(c):
     return server
 
 
+def check_dates_for_nonflux_fc_times(types, times):
+    '''
+    '''
+    for ty, ti in zip(types,times):
+        if ty.upper() == 'FC' and int(ti) == 18:
+            return True
+    return False
+
+
 def mk_dates(c, fluxes):
     '''Prepares start and end date depending on flux or non flux data.
 
@@ -258,6 +267,11 @@ def mk_dates(c, fluxes):
         start = start - timedelta(days=1)
         end = end + timedelta(days=1)
 
+    # if we have non-flux forecast data starting at 18 UTC
+    # we need to start retrieving data one day in advance
+    if not fluxes and check_dates_for_nonflux_fc_times(c.type, c.time):
+        start = start - timedelta(days=1)
+
     return start, end, chunk
 
 def remove_old(pattern, inputdir):
diff --git a/source/python/mods/prepare_flexpart.py b/source/python/mods/prepare_flexpart.py
index 26a4aeb..2269b9a 100755
--- a/source/python/mods/prepare_flexpart.py
+++ b/source/python/mods/prepare_flexpart.py
@@ -176,6 +176,8 @@ def prepare_flexpart(ppid, c):
     # copy/transfer/interpolate them or make them GRIB2
     flexpart = EcFlexpart(c, fluxes=False)
     flexpart.create(inputfiles, c)
+    if c.stream.lower() == 'elda':
+        flexpart.calc_extra_elda(os.path.join(c.inputdir, c.prefix))
     flexpart.process_output(c)
 
     # make use of a possible conversion to a
-- 
GitLab