From d727af2617458832402d7f9f6d928ebdc43f7fb7 Mon Sep 17 00:00:00 2001 From: Anne Philipp <anne.philipp@univie.ac.at> Date: Sun, 10 Mar 2019 17:48:16 +0100 Subject: [PATCH] updated rrint processes (added missing rr for ensembles and wrote all precip fields into the output files with different steps to distingush) --- source/python/classes/EcFlexpart.py | 324 +++++++++++++++++++--------- source/python/mods/checks.py | 2 +- source/python/mods/get_mars_data.py | 2 +- source/python/mods/tools.py | 31 ++- source/python/submit.py | 2 + 5 files changed, 254 insertions(+), 107 deletions(-) diff --git a/source/python/classes/EcFlexpart.py b/source/python/classes/EcFlexpart.py index c52aa68..8ac706c 100644 --- a/source/python/classes/EcFlexpart.py +++ b/source/python/classes/EcFlexpart.py @@ -72,12 +72,13 @@ from eccodes import (codes_index_select, codes_new_from_index, codes_get, # software specific classes and modules from flex_extract sys.path.append('../') import _config -from .GribUtil import GribUtil +from classes.GribUtil import GribUtil from mods.tools import (init128, to_param_id, silent_remove, product, my_error, make_dir, get_informations, get_dimensions, - execute_subprocess, to_param_id_with_tablenumber) -from .MarsRetrieval import MarsRetrieval -from .UioFiles import UioFiles + execute_subprocess, to_param_id_with_tablenumber, + generate_retrieval_period_boundary) +from classes.MarsRetrieval import MarsRetrieval +from classes.UioFiles import UioFiles import mods.disaggregation as disaggregation # ------------------------------------------------------------------------------ @@ -585,7 +586,7 @@ class EcFlexpart(object): index_vals = [] for key in index_keys: key_vals = codes_index_get(iid, key) - # have to sort the key values for correct disaggregation, + # have to sort the key values for correct order, # therefore convert to int first key_vals = [int(k) for k in key_vals] key_vals.sort() @@ -921,18 +922,35 @@ class EcFlexpart(object): ''' table128 = init128(_config.PATH_GRIBTABLE) + # get ids from the flux parameter names pars = to_param_id(self.params['OG_acc_SL'][0], table128) iid = None index_vals = None # get the values of the keys which are used for distinct access - # of grib messages via product + # of grib messages via product and save the maximum number of + # ensemble members if there is more than one if '/' in self.number: # more than one ensemble member is selected index_keys = ["number", "date", "time", "step"] + # maximum ensemble number retrieved + # + 1 for the control run (ensemble number 0) + maxnum = int(self.number.split('/')[-1]) + 1 + # remember the index of the number values + index_number = index_keys.index('number') + # empty set to save ensemble numbers which were already processed + ens_numbers = set() + # index for the ensemble number + inumb = 0 else: index_keys = ["date", "time", "step"] + # maximum ensemble number + maxnum = None + + # get sorted lists of the index values + # this is very important for disaggregating + # the flux data in correct order iid, index_vals = self._mk_index_values(c.inputdir, inputfiles, index_keys) @@ -942,6 +960,7 @@ class EcFlexpart(object): # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange if c.rrint: + # set start and end timestamps for retrieval period if not c.purefc: start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H') end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H') @@ -952,19 +971,30 @@ class EcFlexpart(object): end_date = datetime.strptime(edate_str, '%Y%m%d%H') end_date = end_date + timedelta(hours=c.maxstep) + # get necessary grid dimensions from grib files for storing the + # precipitation fields info = get_informations(os.path.join(c.inputdir, inputfiles.files[0])) dims = get_dimensions(info, c.purefc, c.dtime, index_vals, start_date, end_date) - # create numpy array - lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64) - cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64) + + # create empty numpy arrays + if not maxnum: + lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64) + cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64) + else: + lsp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64) + cp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64) + + # index counter for time line it_lsp = 0 it_cp = 0 + + # store the order of date and step date_list = [] step_list = [] - # initialize dictionaries to store values + # initialize dictionaries to store flux values per parameter orig_vals = {} deac_vals = {} for p in pars: @@ -975,10 +1005,26 @@ class EcFlexpart(object): # values of the index keys for prod in product(*index_vals): # e.g. prod = ('20170505', '0', '12') - # ( date ,time, step) + # ( date ,time, step) print('CURRENT PRODUCT: ', prod) + # the whole process has to be done for each seperate ensemble member + # therefore, for each new ensemble member we delete old flux values + # and start collecting flux data from the beginning time step + if maxnum and prod[index_number] not in ens_numbers: + ens_numbers.add(prod[index_number]) + inumb = len(ens_numbers) - 1 + # re-initialize dictionaries to store flux values per parameter + # for the next ensemble member + it_lsp = 0 + it_cp = 0 + orig_vals = {} + deac_vals = {} + for p in pars: + orig_vals[p] = [] + deac_vals[p] = [] + for i in range(len(index_keys)): codes_index_select(iid, index_keys[i], prod[i]) @@ -1009,10 +1055,8 @@ class EcFlexpart(object): # if necessary, add ensemble member number to filename suffix # otherwise, add empty string - if 'number' in index_keys: - index_number = index_keys.index('number') - if len(index_vals[index_number]) > 1: - numbersuffix = '.N{:0>3}'.format(int(prod[index_number])) + if maxnum: + numbersuffix = '.N{:0>3}'.format(int(prod[index_number])) else: numbersuffix = '' @@ -1091,11 +1135,18 @@ class EcFlexpart(object): date_list.append(t_date) if step not in step_list: step_list.append(step) - if c.rrint and parId == 142: - lsp_np[:,it_lsp] = deac_vals[parId][-1][:] + # store precipitation values + if maxnum and parId == 142: + lsp_np[inumb, :, it_lsp] = deac_vals[parId][-1][:] + it_lsp += 1 + elif not maxnum and parId == 142: + lsp_np[:, it_lsp] = deac_vals[parId][-1][:] it_lsp += 1 - elif c.rrint and parId == 143: - cp_np[:,it_cp] = deac_vals[parId][-1][:] + elif maxnum and parId == 143: + cp_np[inumb, :, it_cp] = deac_vals[parId][-1][:] + it_cp += 1 + elif not maxnum and parId == 143: + cp_np[:, it_cp] = deac_vals[parId][-1][:] it_cp += 1 # information printout @@ -1194,17 +1245,18 @@ class EcFlexpart(object): if c.rrint: self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir) - self._prep_new_rrint(ni, nj, lsp_np.shape[1], lsp_np, - cp_np, date_list, step_list, c) + self._prep_new_rrint(dims[0], dims[1], dims[2], lsp_np, + cp_np, maxnum, index_keys, index_vals, c) return - def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, date_list, step_list, c): + def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, maxnum, index_keys, index_vals, c): '''Calculates and writes out the disaggregated precipitation fields. Disaggregation is done in time and original times are written to the flux files, while the additional subgrid times are written to - extra files. + extra files output files. They are named like the original files with + suffixes "_1" and "_2" for the first and second subgrid point. Parameters ---------- @@ -1225,12 +1277,20 @@ class EcFlexpart(object): The convective precipitation fields for each time step. Shape (ni * nj, nt). - date_list : list of datetime - The list of dates for which the disaggregation is to be done. + maxnum : int + The maximum number of ensemble members. It is None + if there are no or just one ensemble. - step_list : list of int - The list of steps for a single forecast time. - Only necessary for pure forecasts. + index_keys : dictionary + List of parameter names which serves as index. + + index_vals : list of list of str + Contains the values from the keys used for a distinct selection + of grib messages in processing the grib files. + Content looks like e.g.: + index_vals[0]: ('20171106', '20171107', '20171108') ; date + index_vals[1]: ('0', '1200', '1800', '600') ; time + index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange c : ControlFile Contains all the parameters of CONTROL file and @@ -1241,84 +1301,135 @@ class EcFlexpart(object): ''' print('... disaggregation of precipitation with new method.') - lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64) - cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64) + + tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb') + + # initialize new numpy arrays for disaggregated fields + if maxnum: + lsp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64) + cp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64) + else: + lsp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64) + cp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64) # do the disaggregation, but neglect the last value of the - # time series. This one corresponds for example to 24 hour, - # which we don't need. - for ix in range(ni*nj): - lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1] - cp_new_np[ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1] + # original time series. This one corresponds for example to + # 24 hour, which we don't need. we use 0 - 23 UTC for a day. + if maxnum: + for inum in range(maxnum): + for ix in range(ni*nj): + lsp_new_np[inum,ix,:] = disaggregation.IA3(lsp_np[inum,ix,:])[:-1] + cp_new_np[inum,ix,:] = disaggregation.IA3(cp_np[inum,ix,:])[:-1] + else: + for ix in range(ni*nj): + lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[0,ix,:])[:-1] + cp_new_np[ix,:] = disaggregation.IA3(cp_np[0,ix,:])[:-1] # write to grib files (full/orig times to flux file and inbetween # times into seperate end files) print('... write disaggregated precipitation to files.') + + if maxnum: + # remember the index of the number values + index_number = index_keys.index('number') + # empty set to save unique ensemble numbers which were already processed + ens_numbers = set() + # index for the ensemble number + inumb = 0 + else: + inumb = 0 + # index variable of disaggregated fields it = 0 - # loop over times and write original time step and the two newly - # generated sub time steps for each loop - for date in date_list: - for step in step_list: - tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb') - - if c.purefc: - fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + \ - '.{:0>3}'.format(step) - filename1 = c.prefix + date.strftime('%y%m%d.%H') + \ - '.{:0>3}'.format(step) + '_1' - filename2 = c.prefix + date.strftime('%y%m%d.%H') + \ - '.{:0>3}'.format(step) + '_2' - else: - fluxfilename = 'flux' + date.strftime('%Y%m%d%H') - filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1' - filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2' - - # write original time step to flux file as usual - fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename)) - fluxfile.set_keys(tmpfile, filemode='a', strict=True, - wherekeynames=['paramId'], wherekeyvalues=[142], - keynames=['date','time','stepRange','values'], - keyvalues=[int(date.strftime('%Y%m%d')), - date.hour*100, step, lsp_new_np[:,it]], - ) - fluxfile.set_keys(tmpfile, filemode='a', strict=True, - wherekeynames=['paramId'], wherekeyvalues=[143], - keynames=['date','time','stepRange','values'], - keyvalues=[int(date.strftime('%Y%m%d')), - date.hour*100, step, cp_new_np[:,it]] - ) - - # write first subgrid time step - endfile1 = GribUtil(os.path.join(c.inputdir, filename1)) - endfile1.set_keys(tmpfile, filemode='w', strict=True, - wherekeynames=['paramId'], wherekeyvalues=[142], - keynames=['date','time','stepRange','values'], - keyvalues=[int(date.strftime('%Y%m%d')), - date.hour*100, step, lsp_new_np[:,it+1]] - ) - endfile1.set_keys(tmpfile, filemode='a', strict=True, - wherekeynames=['paramId'], wherekeyvalues=[143], - keynames=['date','time','stepRange','values'], - keyvalues=[int(date.strftime('%Y%m%d')), - date.hour*100, step, cp_new_np[:,it+1]] - ) - - # write second subgrid time step - endfile2 = GribUtil(os.path.join(c.inputdir, filename2)) - endfile2.set_keys(tmpfile, filemode='w', strict=True, - wherekeynames=['paramId'], wherekeyvalues=[142], - keynames=['date','time','stepRange','values'], - keyvalues=[int(date.strftime('%Y%m%d')), - date.hour*100, step, lsp_new_np[:,it+2]] - ) - endfile2.set_keys(tmpfile, filemode='a', strict=True, - wherekeynames=['paramId'], wherekeyvalues=[143], - keynames=['date','time','stepRange','values'], - keyvalues=[int(date.strftime('%Y%m%d')), - date.hour*100, step, cp_new_np[:,it+2]] - ) - it = it + 3 # jump to next original time step + + # "product" genereates each possible combination between the + # values of the index keys + for prod in product(*index_vals): + # e.g. prod = ('20170505', '0', '12') + # ( date ,time, step) + # or prod = ('0' , '20170505', '0', '12') + # (number, date ,time, step) + + cdate = prod[index_keys.index('date')] + ctime = '{:0>2}'.format(int(prod[index_keys.index('time')])//100) + cstep = '{:0>3}'.format(int(prod[index_keys.index('step')])) + + date = datetime.strptime(cdate + ctime, '%Y%m%d%H') + date += timedelta(hours=int(cstep)) + + start_period, end_period = generate_retrieval_period_boundary(c) + # skip all temporary times + # which are outside the retrieval period + if date < start_period or \ + date > end_period: + continue + + # the whole process has to be done for each seperate ensemble member + # therefore, for each new ensemble member we delete old flux values + # and start collecting flux data from the beginning time step + if maxnum and prod[index_number] not in ens_numbers: + ens_numbers.add(prod[index_number]) + inumb = int(prod[index_number]) + it = 0 + + # if necessary, add ensemble member number to filename suffix + # otherwise, add empty string + if maxnum: + numbersuffix = '.N{:0>3}'.format(int(prod[index_number])) + else: + numbersuffix = '' + + # per original time stamp: write original time step and + # the two newly generated sub time steps + if c.purefc: + fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + '.' + cstep + else: + fluxfilename = 'flux' + date.strftime('%Y%m%d%H') + numbersuffix + + # write original time step to flux file as usual + fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename)) + fluxfile.set_keys(tmpfile, filemode='a', strict=True, + wherekeynames=['paramId'], wherekeyvalues=[142], + keynames=['perturbationNumber','date','time','stepRange','values'], + keyvalues=[inumb, int(date.strftime('%Y%m%d')), + date.hour*100, 0, lsp_new_np[inumb,:,it]], + ) + fluxfile.set_keys(tmpfile, filemode='a', strict=True, + wherekeynames=['paramId'], wherekeyvalues=[143], + keynames=['perturbationNumber','date','time','stepRange','values'], + keyvalues=[inumb,int(date.strftime('%Y%m%d')), + date.hour*100, 0, cp_new_np[inumb,:,it]] + ) + + # rr for first subgrid point is identified by step = 1 + fluxfile.set_keys(tmpfile, filemode='a', strict=True, + wherekeynames=['paramId'], wherekeyvalues=[142], + keynames=['perturbationNumber','date','time','stepRange','values'], + keyvalues=[inumb,int(date.strftime('%Y%m%d')), + date.hour*100, '1', lsp_new_np[inumb,:,it+1]] + ) + fluxfile.set_keys(tmpfile, filemode='a', strict=True, + wherekeynames=['paramId'], wherekeyvalues=[143], + keynames=['perturbationNumber','date','time','stepRange','values'], + keyvalues=[inumb,int(date.strftime('%Y%m%d')), + date.hour*100, '1', cp_new_np[inumb,:,it+1]] + ) + + # rr for second subgrid point is identified by step = 2 + fluxfile.set_keys(tmpfile, filemode='a', strict=True, + wherekeynames=['paramId'], wherekeyvalues=[142], + keynames=['perturbationNumber','date','time','stepRange','values'], + keyvalues=[inumb,int(date.strftime('%Y%m%d')), + date.hour*100, '2', lsp_new_np[inumb,:,it+2]] + ) + fluxfile.set_keys(tmpfile, filemode='a', strict=True, + wherekeynames=['paramId'], wherekeyvalues=[143], + keynames=['perturbationNumber','date','time','stepRange','values'], + keyvalues=[inumb,int(date.strftime('%Y%m%d')), + date.hour*100, '2', cp_new_np[inumb,:,it+2]] + ) + + it = it + 3 # jump to next original time step in rr fields return def _create_rr_grib_dummy(self, ifile, inputdir): @@ -1577,10 +1688,10 @@ class EcFlexpart(object): print("outputfile = " + fnout) # collect for final processing self.outputfilelist.append(os.path.basename(fnout)) - # get additional precipitation subgrid data if available - if c.rrint: - self.outputfilelist.append(os.path.basename(fnout + '_1')) - self.outputfilelist.append(os.path.basename(fnout + '_2')) + # # get additional precipitation subgrid data if available + # if c.rrint: + # self.outputfilelist.append(os.path.basename(fnout + '_1')) + # self.outputfilelist.append(os.path.basename(fnout + '_2')) # ============================================================================================ # create outputfile and copy all data from intermediate files # to the outputfile (final GRIB input files for FLEXPART) @@ -1613,6 +1724,9 @@ class EcFlexpart(object): def calc_extra_elda(self, path, prefix): ''' Calculates extra ensemble members for ELDA - Stream. + This is a specific feature which doubles the number of ensemble members + for the ELDA Stream. + Parameters ---------- path : str @@ -1642,7 +1756,7 @@ class EcFlexpart(object): codes_release(fid) filename = cffile.split('N000')[0] - for i in range(1, maxnum+1): # max number nehmen + for i in range(1, maxnum + 1): # read an ensemble member g = open(filename + 'N{:0>3}'.format(i), 'rb') @@ -1655,6 +1769,8 @@ class EcFlexpart(object): if gid is None: break values = codes_get_array(gid, 'values') + # generate a new ensemble member by subtracting + # 2 * ( current time step value - last time step value ) codes_set_array(gid, 'values', values-2*(values-cfvalues[j])) codes_set(gid, 'number', i+maxnum) @@ -1703,7 +1819,7 @@ class EcFlexpart(object): .format(c.ectrans, c.gateway, c.destination)) print('Output filelist: ') - print(self.outputfilelist) + print(sorted(self.outputfilelist)) for ofile in self.outputfilelist: ofile = os.path.join(self.inputdir, ofile) diff --git a/source/python/mods/checks.py b/source/python/mods/checks.py index 434a42a..58c66d2 100644 --- a/source/python/mods/checks.py +++ b/source/python/mods/checks.py @@ -32,7 +32,7 @@ except ImportError: import builtins as exceptions from datetime import datetime import numpy as np -from .tools import my_error, silent_remove +from mods.tools import my_error, silent_remove # ------------------------------------------------------------------------------ # FUNCTIONS # ------------------------------------------------------------------------------ diff --git a/source/python/mods/get_mars_data.py b/source/python/mods/get_mars_data.py index 56e6b3c..f862f80 100755 --- a/source/python/mods/get_mars_data.py +++ b/source/python/mods/get_mars_data.py @@ -66,7 +66,7 @@ from datetime import datetime, timedelta sys.path.append(os.path.dirname(os.path.abspath( inspect.getfile(inspect.currentframe()))) + '/../') import _config -from .tools import (setup_controldata, my_error, normal_exit, get_cmdline_args, +from mods.tools import (setup_controldata, my_error, normal_exit, get_cmdline_args, read_ecenv, make_dir) from classes.EcFlexpart import EcFlexpart from classes.UioFiles import UioFiles diff --git a/source/python/mods/tools.py b/source/python/mods/tools.py index f8eb0ab..a29d15c 100644 --- a/source/python/mods/tools.py +++ b/source/python/mods/tools.py @@ -68,7 +68,7 @@ try: import exceptions except ImportError: import builtins as exceptions -from datetime import datetime +from datetime import datetime, timedelta from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter @@ -868,3 +868,32 @@ def execute_subprocess(cmd_list, error_msg='SUBPROCESS FAILED!'): sys.exit('... ' + error_msg) return + + +def generate_retrieval_period_boundary(c): + '''Generates retrieval period boundary datetimes from CONTROL information. + + Parameters + ---------- + c : ControlFile + Contains all the parameters of CONTROL file and + command line. + + Return + ------ + start_period : datetime + The first timestamp of the actual retrieval period disregarding + the temporary times which were used for processing reasons. + + end_period : datetime + The last timestamp of the actual retrieval period disregarding + the temporary times which were used for processing reasons. + ''' + # generate start and end timestamp of the retrieval period + start_period = datetime.strptime(c.start_date + c.time[0], '%Y%m%d%H') + start_period = start_period + timedelta(hours=int(c.step[0])) + end_period = datetime.strptime(c.end_date + c.time[-1], '%Y%m%d%H') + end_period = end_period + timedelta(hours=int(c.step[-1])) + + + return start_period, end_period diff --git a/source/python/submit.py b/source/python/submit.py index 488ac5e..cd528a0 100755 --- a/source/python/submit.py +++ b/source/python/submit.py @@ -72,6 +72,7 @@ from mods.get_mars_data import get_mars_data from mods.prepare_flexpart import prepare_flexpart from classes.ControlFile import ControlFile + # ------------------------------------------------------------------------------ # METHODS # ------------------------------------------------------------------------------ @@ -85,6 +86,7 @@ def main(): Parameters ---------- + Return ------ -- GitLab