diff --git a/source/python/classes/EcFlexpart.py b/source/python/classes/EcFlexpart.py index 3ba408c5981dab4559f4a5ce5e47da1cd54d820c..790cb709759fcb9f51d8c034c111660437a01918 100644 --- a/source/python/classes/EcFlexpart.py +++ b/source/python/classes/EcFlexpart.py @@ -36,6 +36,8 @@ # - applied minor code changes (style) # - removed "dead code" , e.g. retrieval of Q since it is not needed # - removed "times" parameter from retrieve-method since it is not used +# - seperated function "retrieve" into smaller functions (less code +# duplication, easier testing) # # @License: # (C) Copyright 2014-2018. @@ -109,15 +111,8 @@ class EcFlexpart(object): The current object of the class. c: instance of class ControlFile - Contains all the parameters of CONTROL file, which are e.g.: - DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, - STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, - LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, - OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, - ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, - MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME - DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS - + Contains all the parameters of CONTROL file and + command line. For more information about format and content of the parameter see documentation. @@ -296,6 +291,33 @@ class EcFlexpart(object): return + def _mk_targetname(self, ftype, param, date): + ''' + @Description: + Creates the filename for the requested grib data to be stored in. + This name is passed as the "target" parameter in the request. + + @Input: + ftype: string + Shortcut name of the type of the field. E.g. AN, FC, PF, ... + + param: string + Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML, + GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL + + date: string + The date period of the grib data to be stored in this file. + + @Return: + targetname: string + The target filename for the grib data. + ''' + targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' + + str(os.getppid()) + '.' + str(os.getpid()) + '.grb') + + return targetname + + def _start_retrievement(self, request, par_dict): ''' @Description: @@ -359,97 +381,6 @@ class EcFlexpart(object): return - def _mk_targetname(self, ftype, param, date): - ''' - @Description: - Creates the filename for the requested grib data to be stored in. - This name is passed as the "target" parameter in the request. - - @Input: - ftype: string - Shortcut name of the type of the field. E.g. AN, FC, PF, ... - - param: string - Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML, - GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL - - date: string - The date period of the grib data to be stored in this file. - - @Return: - targetname: string - The target filename for the grib data. - ''' - targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' + - str(os.getppid()) + '.' + str(os.getpid()) + '.grb') - - return targetname - - def write_namelist(self, c, filename): - ''' - @Description: - Creates a namelist file in the temporary directory and writes - the following values to it: maxl, maxb, mlevel, - mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1, - momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta - - @Input: - self: instance of EcFlexpart - The current object of the class. - - c: instance of class ControlFile - Contains all the parameters of CONTROL files, which are e.g.: - DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, - STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, - LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, - OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, - ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, - MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME - DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS - - For more information about format and content of the parameter - see documentation. - - filename: string - Name of the namelist file. - - @Return: - <nothing> - ''' - - self.inputdir = c.inputdir - area = np.asarray(self.area.split('/')).astype(float) - grid = np.asarray(self.grid.split('/')).astype(float) - - if area[1] > area[3]: - area[1] -= 360 - maxl = int((area[3] - area[1]) / grid[1]) + 1 - maxb = int((area[0] - area[2]) / grid[0]) + 1 - - with open(self.inputdir + '/' + filename, 'w') as f: - f.write('&NAMGEN\n') - f.write(',\n '.join(['maxl = ' + str(maxl), 'maxb = ' + str(maxb), - 'mlevel = ' + str(self.level), - 'mlevelist = ' + '"' + str(self.levelist) - + '"', - 'mnauf = ' + str(self.resol), - 'metapar = ' + '77', - 'rlo0 = ' + str(area[1]), - 'rlo1 = ' + str(area[3]), - 'rla0 = ' + str(area[2]), - 'rla1 = ' + str(area[0]), - 'momega = ' + str(c.omega), - 'momegadiff = ' + str(c.omegadiff), - 'mgauss = ' + str(c.gauss), - 'msmooth = ' + str(c.smooth), - 'meta = ' + str(c.eta), - 'metadiff = ' + str(c.etadiff), - 'mdpdeta = ' + str(c.dpdeta)])) - - f.write('\n/\n') - - return - def retrieve(self, server, dates, request, inputdir='.'): ''' @Description: @@ -655,166 +586,73 @@ class EcFlexpart(object): return - def process_output(self, c): + def write_namelist(self, c, filename): ''' @Description: - The grib files are postprocessed depending on the selection in - CONTROL file. The resulting files are moved to the output - directory if its not equla to the input directory. - The following modifications might be done if - properly switched in CONTROL file: - GRIB2 - Conversion to GRIB2 - ECTRANS - Transfer of files to gateway server - ECSTORAGE - Storage at ECMWF server - GRIB2FLEXPART - Conversion of GRIB files to FLEXPART binary format + Creates a namelist file in the temporary directory and writes + the following values to it: maxl, maxb, mlevel, + mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1, + momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta @Input: self: instance of EcFlexpart The current object of the class. c: instance of class ControlFile - Contains all the parameters of CONTROL file, which are e.g.: - DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, - STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, - LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, - OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, - ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, - MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME - DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS - + Contains all the parameters of CONTROL file and + command line. For more information about format and content of the parameter see documentation. + filename: string + Name of the namelist file. + @Return: <nothing> - ''' - print('\n\nPostprocessing:\n Format: {}\n'.format(c.format)) - - if not c.ecapi: - print('ecstorage: {}\n ecfsdir: {}\n'. - format(c.ecstorage, c.ecfsdir)) - #if not hasattr(c, 'gateway'): - # c.gateway = os.getenv('GATEWAY') - #if not hasattr(c, 'destination'): - # c.destination = os.getenv('DESTINATION') - print('ectrans: {}\n gateway: {}\n destination: {}\n ' - .format(c.ectrans, c.gateway, c.destination)) - - print('Output filelist: \n') - print(self.outputfilelist) - - if c.format.lower() == 'grib2': - for ofile in self.outputfilelist: - p = subprocess.check_call(['grib_set', '-s', 'edition=2, \ - productDefinitionTemplateNumber=8', - ofile, ofile + '_2']) - p = subprocess.check_call(['mv', ofile + '_2', ofile]) - - if c.ectrans and not c.ecapi: - for ofile in self.outputfilelist: - p = subprocess.check_call(['ectrans', '-overwrite', '-gateway', - c.gateway, '-remote', c.destination, - '-source', ofile]) - #print('ectrans:', p) - - if c.ecstorage and not c.ecapi: - for ofile in self.outputfilelist: - p = subprocess.check_call(['ecp', '-o', ofile, - os.path.expandvars(c.ecfsdir)]) - - if c.outputdir != c.inputdir: - for ofile in self.outputfilelist: - p = subprocess.check_call(['mv', ofile, c.outputdir]) - - # prepare environment for the grib2flexpart run - # to convert grib to flexpart binary - if c.grib2flexpart: - - # generate AVAILABLE file - # Example of AVAILABLE file data: - # 20131107 000000 EN13110700 ON DISC - clist = [] - for ofile in self.outputfilelist: - fname = ofile.split('/') - if '.' in fname[-1]: - l = fname[-1].split('.') - timestamp = datetime.strptime(l[0][-6:] + l[1], - '%y%m%d%H') - timestamp += timedelta(hours=int(l[2])) - cdate = datetime.strftime(timestamp, '%Y%m%d') - chms = datetime.strftime(timestamp, '%H%M%S') - else: - cdate = '20' + fname[-1][-8:-2] - chms = fname[-1][-2:] + '0000' - clist.append(cdate + ' ' + chms + ' '*6 + - fname[-1] + ' '*14 + 'ON DISC') - clist.sort() - with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f: - f.write('\n'.join(clist) + '\n') - - # generate pathnames file - pwd = os.path.abspath(c.outputdir) - with open(pwd + '/pathnames', 'w') as f: - f.write(pwd + '/Options/\n') - f.write(pwd + '/\n') - f.write(pwd + '/\n') - f.write(pwd + '/AVAILABLE\n') - f.write(' = == = == = == = == = == == = \n') - - # create Options dir if necessary - if not os.path.exists(pwd + '/Options'): - os.makedirs(pwd+'/Options') - - # read template COMMAND file - with open(os.path.expandvars(os.path.expanduser( - c.flexpart_root_scripts)) + '/../Options/COMMAND', 'r') as f: - lflist = f.read().split('\n') - - # find index of list where to put in the - # date and time information - # usually after the LDIRECT parameter - i = 0 - for l in lflist: - if 'LDIRECT' in l.upper(): - break - i += 1 + self.inputdir = c.inputdir + area = np.asarray(self.area.split('/')).astype(float) + grid = np.asarray(self.grid.split('/')).astype(float) - # insert the date and time information of run start and end - # into the list of lines of COMMAND file - lflist = lflist[:i+1] + \ - [clist[0][:16], clist[-1][:16]] + \ - lflist[i+3:] + if area[1] > area[3]: + area[1] -= 360 + maxl = int((area[3] - area[1]) / grid[1]) + 1 + maxb = int((area[0] - area[2]) / grid[0]) + 1 - # write the new COMMAND file - with open(pwd + '/Options/COMMAND', 'w') as g: - g.write('\n'.join(lflist) + '\n') + with open(self.inputdir + '/' + filename, 'w') as f: + f.write('&NAMGEN\n') + f.write(',\n '.join(['maxl = ' + str(maxl), 'maxb = ' + str(maxb), + 'mlevel = ' + str(self.level), + 'mlevelist = ' + '"' + str(self.levelist) + + '"', + 'mnauf = ' + str(self.resol), + 'metapar = ' + '77', + 'rlo0 = ' + str(area[1]), + 'rlo1 = ' + str(area[3]), + 'rla0 = ' + str(area[2]), + 'rla1 = ' + str(area[0]), + 'momega = ' + str(c.omega), + 'momegadiff = ' + str(c.omegadiff), + 'mgauss = ' + str(c.gauss), + 'msmooth = ' + str(c.smooth), + 'meta = ' + str(c.eta), + 'metadiff = ' + str(c.etadiff), + 'mdpdeta = ' + str(c.dpdeta)])) - # change to outputdir and start the grib2flexpart run - # afterwards switch back to the working dir - os.chdir(c.outputdir) - p = subprocess.check_call([ - os.path.expandvars(os.path.expanduser(c.flexpart_root_scripts)) - + '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']) - os.chdir(pwd) + f.write('\n/\n') return - def create(self, inputfiles, c): + + def deacc_fluxes(self, inputfiles, c): ''' @Description: - This method is based on the ECMWF example index.py - https://software.ecmwf.int/wiki/display/GRIB/index.py - - An index file will be created which depends on the combination - of "date", "time" and "stepRange" values. This is used to iterate - over all messages in each grib file which were passed through the - parameter "inputfiles" to seperate specific parameters into fort.* - files. Afterwards the FORTRAN program is called to convert - the data fields all to the same grid and put them in one file - per unique time step (combination of "date", "time" and - "stepRange"). + Goes through all flux fields in ordered time and de-accumulate + the fields. Afterwards the fields are disaggregated in time. + Different versions of disaggregation is provided for rainfall + data (darain, modified linear) and the surface fluxes and + stress data (dapoly, cubic polynomial). @Input: self: instance of EcFlexpart @@ -824,15 +662,8 @@ class EcFlexpart(object): Contains a list of files. c: instance of class ControlFile - Contains all the parameters of CONTROL files, which are e.g.: - DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, - STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, - LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, - OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, - ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, - MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME - DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS - + Contains all the parameters of CONTROL file and + command line. For more information about format and content of the parameter see documentation. @@ -841,10 +672,7 @@ class EcFlexpart(object): ''' table128 = init128(_config.PATH_GRIBTABLE) - wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\ - stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4', - table128) - + pars = to_param_id(self.params['OG_acc_SL'][0], table128) index_keys = ["date", "time", "step"] indexfile = os.path.join(c.inputdir, _config.FILE_GRIB_INDEX) silent_remove(indexfile) @@ -855,28 +683,36 @@ class EcFlexpart(object): # read values of index keys index_vals = [] for key in index_keys: - index_vals.append(grib_index_get(iid, key)) - print(index_vals[-1]) + key_vals = grib_index_get(iid, key) + print(key_vals) + # have to sort the steps for disaggregation, + # therefore convert to int first + if key == 'step': + key_vals = [int(k) for k in key_vals] + key_vals.sort() + key_vals = [str(k) for k in key_vals] + index_vals.append(key_vals) # index_vals looks for example like: # index_vals[0]: ('20171106', '20171107', '20171108') ; date - # index_vals[1]: ('0', '1200', '1800', '600') ; time - # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange + # index_vals[1]: ('0', '1200') ; time + # index_vals[2]: (3', '6', '9', '12') ; stepRange - fdict = {'10':None, '11':None, '12':None, '13':None, '16':None, - '17':None, '19':None, '21':None, '22':None, '20':None} + valsdict = {} + svalsdict = {} + stepsdict = {} + for p in pars: + valsdict[str(p)] = [] + svalsdict[str(p)] = [] + stepsdict[str(p)] = [] + + print('maxstep: ', c.maxstep) for prod in product(*index_vals): # e.g. prod = ('20170505', '0', '12') # ( date ,time, step) # per date e.g. time = 0, 1200 # per time e.g. step = 3, 6, 9, 12 - # flag for Fortran program and file merging - convertFlag = False print('current prod: ', prod) - # e.g. prod = ('20170505', '0', '12') - # ( date ,time, step) - # per date e.g. time = 0, 600, 1200, 1800 - # per time e.g. step = 0, 3, 6, 9, 12 for i in range(len(index_keys)): grib_index_select(iid, index_keys[i], prod[i]) @@ -886,11 +722,247 @@ class EcFlexpart(object): # if there is data for this product combination # prepare some date and time parameter before reading the data if gid is not None: - # Combine all temporary data files into final grib file if - # gid is at least one time not None. Therefore set convertFlag - # to save information. The Fortran program is also - # only executed if convertFlag is True - convertFlag = True + cdate = grib_get(gid, 'date') + time = grib_get(gid, 'time') + step = grib_get(gid, 'step') + # date+time+step-2*dtime + # (since interpolated value valid for step-2*dtime) + sdate = datetime(year=cdate/10000, + month=(cdate % 10000)/100, + day=(cdate % 100), + hour=time/100) + fdate = sdate + timedelta(hours=step-2*int(c.dtime)) + sdates = sdate + timedelta(hours=step) + elimit = None + else: + break + + if c.maxstep > 12: + fnout = os.path.join(c.inputdir, 'flux' + + sdate.strftime('%Y%m%d') + + '.{:0>2}'.format(time/100) + + '.{:0>3}'.format(step-2*int(c.dtime))) + gnout = os.path.join(c.inputdir, 'flux' + + sdate.strftime('%Y%m%d') + + '.{:0>2}'.format(time/100) + + '.{:0>3}'.format(step-int(c.dtime))) + hnout = os.path.join(c.inputdir, 'flux' + + sdate.strftime('%Y%m%d') + + '.{:0>2}'.format(time/100) + + '.{:0>3}'.format(step)) + else: + fnout = os.path.join(c.inputdir, 'flux' + + fdate.strftime('%Y%m%d%H')) + gnout = os.path.join(c.inputdir, 'flux' + + (fdate + timedelta(hours=int(c.dtime))). + strftime('%Y%m%d%H')) + hnout = os.path.join(c.inputdir, 'flux' + + sdates.strftime('%Y%m%d%H')) + + print("outputfile = " + fnout) + f_handle = open(fnout, 'w') + g_handle = open(gnout, 'w') + h_handle = open(hnout, 'w') + + # read message for message and store relevant data fields + # data keywords are stored in pars + while 1: + if gid is None: + break + cparamId = str(grib_get(gid, 'paramId')) + step = grib_get(gid, 'step') + atime = grib_get(gid, 'time') + ni = grib_get(gid, 'Ni') + nj = grib_get(gid, 'Nj') + if cparamId in valsdict.keys(): + values = grib_get_values(gid) + vdp = valsdict[cparamId] + svdp = svalsdict[cparamId] + sd = stepsdict[cparamId] + + if cparamId == '142' or cparamId == '143': + fak = 1. / 1000. + else: + fak = 3600. + + values = (np.reshape(values, (nj, ni))).flatten() / fak + vdp.append(values[:]) # save the accumulated values + if step <= int(c.dtime): + svdp.append(values[:] / int(c.dtime)) + else: # deaccumulate values + svdp.append((vdp[-1] - vdp[-2]) / int(c.dtime)) + + print(cparamId, atime, step, len(values), + values[0], np.std(values)) + # save the 1/3-hourly or specific values + # svdp.append(values[:]) + sd.append(step) + # len(svdp) correspond to the time + if len(svdp) >= 3: + if len(svdp) > 3: + if cparamId == '142' or cparamId == '143': + values = disaggregation.darain(svdp) + else: + values = disaggregation.dapoly(svdp) + + if not (step == c.maxstep and c.maxstep > 12 \ + or sdates == elimit): + vdp.pop(0) + svdp.pop(0) + else: + if c.maxstep > 12: + values = svdp[1] + else: + values = svdp[0] + + grib_set_values(gid, values) + if c.maxstep > 12: + grib_set(gid, 'step', max(0, step-2*int(c.dtime))) + else: + grib_set(gid, 'step', 0) + grib_set(gid, 'time', fdate.hour*100) + grib_set(gid, 'date', fdate.year*10000 + + fdate.month*100+fdate.day) + grib_write(gid, f_handle) + + if c.basetime: + elimit = datetime.strptime(c.end_date + + c.basetime, '%Y%m%d%H') + else: + elimit = sdate + timedelta(2*int(c.dtime)) + + # squeeze out information of last two steps contained + # in svdp + # if step+int(c.dtime) == c.maxstep and c.maxstep>12 + # or sdates+timedelta(hours = int(c.dtime)) + # >= elimit: + # Note that svdp[0] has not been popped in this case + + if step == c.maxstep and c.maxstep > 12 or \ + sdates == elimit: + + values = svdp[3] + grib_set_values(gid, values) + grib_set(gid, 'step', 0) + truedatetime = fdate + timedelta(hours= + 2*int(c.dtime)) + grib_set(gid, 'time', truedatetime.hour * 100) + grib_set(gid, 'date', truedatetime.year * 10000 + + truedatetime.month * 100 + + truedatetime.day) + grib_write(gid, h_handle) + + #values = (svdp[1]+svdp[2])/2. + if cparamId == '142' or cparamId == '143': + values = disaggregation.darain(list(reversed(svdp))) + else: + values = disaggregation.dapoly(list(reversed(svdp))) + + grib_set(gid, 'step', 0) + truedatetime = fdate + timedelta(hours=int(c.dtime)) + grib_set(gid, 'time', truedatetime.hour * 100) + grib_set(gid, 'date', truedatetime.year * 10000 + + truedatetime.month * 100 + + truedatetime.day) + grib_set_values(gid, values) + grib_write(gid, g_handle) + + grib_release(gid) + + gid = grib_new_from_index(iid) + + f_handle.close() + g_handle.close() + h_handle.close() + + grib_index_release(iid) + + return + + + def create(self, inputfiles, c): + ''' + @Description: + This method is based on the ECMWF example index.py + https://software.ecmwf.int/wiki/display/GRIB/index.py + + An index file will be created which depends on the combination + of "date", "time" and "stepRange" values. This is used to iterate + over all messages in each grib file which were passed through the + parameter "inputfiles" to seperate specific parameters into fort.* + files. Afterwards the FORTRAN program is called to convert + the data fields all to the same grid and put them in one file + per unique time step (combination of "date", "time" and + "stepRange"). + + @Input: + self: instance of EcFlexpart + The current object of the class. + + inputfiles: instance of UioFiles + Contains a list of files. + + c: instance of class ControlFile + Contains all the parameters of CONTROL file and + command line. + For more information about format and content of the parameter + see documentation. + + @Return: + <nothing> + ''' + + table128 = init128(_config.PATH_GRIBTABLE) + wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\ + stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4', + table128) + + index_keys = ["date", "time", "step"] + indexfile = os.path.join(c.inputdir, _config.FILE_GRIB_INDEX) + silent_remove(indexfile) + grib = GribTools(inputfiles.files) + # creates new index file + iid = grib.index(index_keys=index_keys, index_file=indexfile) + + # read values of index keys + index_vals = [] + for key in index_keys: + index_vals.append(grib_index_get(iid, key)) + print(index_vals[-1]) + # index_vals looks for example like: + # index_vals[0]: ('20171106', '20171107', '20171108') ; date + # index_vals[1]: ('0', '1200', '1800', '600') ; time + # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange + + fdict = {'10':None, '11':None, '12':None, '13':None, '16':None, + '17':None, '19':None, '21':None, '22':None, '20':None} + + for prod in product(*index_vals): + # e.g. prod = ('20170505', '0', '12') + # ( date ,time, step) + # per date e.g. time = 0, 1200 + # per time e.g. step = 3, 6, 9, 12 + # flag for Fortran program and file merging + convertFlag = False + print('current prod: ', prod) + # e.g. prod = ('20170505', '0', '12') + # ( date ,time, step) + # per date e.g. time = 0, 600, 1200, 1800 + # per time e.g. step = 0, 3, 6, 9, 12 + for i in range(len(index_keys)): + grib_index_select(iid, index_keys[i], prod[i]) + + # get first id from current product + gid = grib_new_from_index(iid) + + # if there is data for this product combination + # prepare some date and time parameter before reading the data + if gid is not None: + # Combine all temporary data files into final grib file if + # gid is at least one time not None. Therefore set convertFlag + # to save information. The Fortran program is also + # only executed if convertFlag is True + convertFlag = True # remove old fort.* files and open new ones # they are just valid for a single product for k, f in fdict.iteritems(): @@ -1059,246 +1131,151 @@ class EcFlexpart(object): return - def deacc_fluxes(self, inputfiles, c): + + def process_output(self, c): ''' @Description: - Goes through all flux fields in ordered time and de-accumulate - the fields. Afterwards the fields are disaggregated in time. - Different versions of disaggregation is provided for rainfall - data (darain, modified linear) and the surface fluxes and - stress data (dapoly, cubic polynomial). + The grib files are postprocessed depending on the selection in + CONTROL file. The resulting files are moved to the output + directory if its not equal to the input directory. + The following modifications might be done if + properly switched in CONTROL file: + GRIB2 - Conversion to GRIB2 + ECTRANS - Transfer of files to gateway server + ECSTORAGE - Storage at ECMWF server @Input: self: instance of EcFlexpart The current object of the class. - inputfiles: instance of UioFiles - Contains a list of files. - c: instance of class ControlFile - Contains all the parameters of CONTROL file, which are e.g.: - DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, - STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, - LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, - OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, - ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, - MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME - DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS - + Contains all the parameters of CONTROL file and + command line. For more information about format and content of the parameter see documentation. @Return: <nothing> - ''' - - table128 = init128(_config.PATH_GRIBTABLE) - pars = to_param_id(self.params['OG_acc_SL'][0], table128) - index_keys = ["date", "time", "step"] - indexfile = os.path.join(c.inputdir, _config.FILE_GRIB_INDEX) - silent_remove(indexfile) - grib = GribTools(inputfiles.files) - # creates new index file - iid = grib.index(index_keys=index_keys, index_file=indexfile) - - # read values of index keys - index_vals = [] - for key in index_keys: - key_vals = grib_index_get(iid, key) - print(key_vals) - # have to sort the steps for disaggregation, - # therefore convert to int first - if key == 'step': - key_vals = [int(k) for k in key_vals] - key_vals.sort() - key_vals = [str(k) for k in key_vals] - index_vals.append(key_vals) - # index_vals looks for example like: - # index_vals[0]: ('20171106', '20171107', '20171108') ; date - # index_vals[1]: ('0', '1200') ; time - # index_vals[2]: (3', '6', '9', '12') ; stepRange - - valsdict = {} - svalsdict = {} - stepsdict = {} - for p in pars: - valsdict[str(p)] = [] - svalsdict[str(p)] = [] - stepsdict[str(p)] = [] - - print('maxstep: ', c.maxstep) - - for prod in product(*index_vals): - # e.g. prod = ('20170505', '0', '12') - # ( date ,time, step) - # per date e.g. time = 0, 1200 - # per time e.g. step = 3, 6, 9, 12 - print('current prod: ', prod) - for i in range(len(index_keys)): - grib_index_select(iid, index_keys[i], prod[i]) - - # get first id from current product - gid = grib_new_from_index(iid) - - # if there is data for this product combination - # prepare some date and time parameter before reading the data - if gid is not None: - cdate = grib_get(gid, 'date') - time = grib_get(gid, 'time') - step = grib_get(gid, 'step') - # date+time+step-2*dtime - # (since interpolated value valid for step-2*dtime) - sdate = datetime(year=cdate/10000, - month=(cdate % 10000)/100, - day=(cdate % 100), - hour=time/100) - fdate = sdate + timedelta(hours=step-2*int(c.dtime)) - sdates = sdate + timedelta(hours=step) - elimit = None - else: - break - - if c.maxstep > 12: - fnout = os.path.join(c.inputdir, 'flux' + - sdate.strftime('%Y%m%d') + - '.{:0>2}'.format(time/100) + - '.{:0>3}'.format(step-2*int(c.dtime))) - gnout = os.path.join(c.inputdir, 'flux' + - sdate.strftime('%Y%m%d') + - '.{:0>2}'.format(time/100) + - '.{:0>3}'.format(step-int(c.dtime))) - hnout = os.path.join(c.inputdir, 'flux' + - sdate.strftime('%Y%m%d') + - '.{:0>2}'.format(time/100) + - '.{:0>3}'.format(step)) - else: - fnout = os.path.join(c.inputdir, 'flux' + - fdate.strftime('%Y%m%d%H')) - gnout = os.path.join(c.inputdir, 'flux' + - (fdate + timedelta(hours=int(c.dtime))). - strftime('%Y%m%d%H')) - hnout = os.path.join(c.inputdir, 'flux' + - sdates.strftime('%Y%m%d%H')) - - print("outputfile = " + fnout) - f_handle = open(fnout, 'w') - g_handle = open(gnout, 'w') - h_handle = open(hnout, 'w') - # read message for message and store relevant data fields - # data keywords are stored in pars - while 1: - if gid is None: - break - cparamId = str(grib_get(gid, 'paramId')) - step = grib_get(gid, 'step') - atime = grib_get(gid, 'time') - ni = grib_get(gid, 'Ni') - nj = grib_get(gid, 'Nj') - if cparamId in valsdict.keys(): - values = grib_get_values(gid) - vdp = valsdict[cparamId] - svdp = svalsdict[cparamId] - sd = stepsdict[cparamId] - - if cparamId == '142' or cparamId == '143': - fak = 1. / 1000. - else: - fak = 3600. - - values = (np.reshape(values, (nj, ni))).flatten() / fak - vdp.append(values[:]) # save the accumulated values - if step <= int(c.dtime): - svdp.append(values[:] / int(c.dtime)) - else: # deaccumulate values - svdp.append((vdp[-1] - vdp[-2]) / int(c.dtime)) + ''' - print(cparamId, atime, step, len(values), - values[0], np.std(values)) - # save the 1/3-hourly or specific values - # svdp.append(values[:]) - sd.append(step) - # len(svdp) correspond to the time - if len(svdp) >= 3: - if len(svdp) > 3: - if cparamId == '142' or cparamId == '143': - values = disaggregation.darain(svdp) - else: - values = disaggregation.dapoly(svdp) + print('\n\nPostprocessing:\n Format: {}\n'.format(c.format)) - if not (step == c.maxstep and c.maxstep > 12 \ - or sdates == elimit): - vdp.pop(0) - svdp.pop(0) - else: - if c.maxstep > 12: - values = svdp[1] - else: - values = svdp[0] + if not c.ecapi: + print('ecstorage: {}\n ecfsdir: {}\n'. + format(c.ecstorage, c.ecfsdir)) + print('ectrans: {}\n gateway: {}\n destination: {}\n ' + .format(c.ectrans, c.gateway, c.destination)) - grib_set_values(gid, values) - if c.maxstep > 12: - grib_set(gid, 'step', max(0, step-2*int(c.dtime))) - else: - grib_set(gid, 'step', 0) - grib_set(gid, 'time', fdate.hour*100) - grib_set(gid, 'date', fdate.year*10000 + - fdate.month*100+fdate.day) - grib_write(gid, f_handle) + print('Output filelist: \n') + print(self.outputfilelist) - if c.basetime: - elimit = datetime.strptime(c.end_date + - c.basetime, '%Y%m%d%H') - else: - elimit = sdate + timedelta(2*int(c.dtime)) + if c.format.lower() == 'grib2': + for ofile in self.outputfilelist: + p = subprocess.check_call(['grib_set', '-s', 'edition=2, \ + productDefinitionTemplateNumber=8', + ofile, ofile + '_2']) + p = subprocess.check_call(['mv', ofile + '_2', ofile]) - # squeeze out information of last two steps contained - # in svdp - # if step+int(c.dtime) == c.maxstep and c.maxstep>12 - # or sdates+timedelta(hours = int(c.dtime)) - # >= elimit: - # Note that svdp[0] has not been popped in this case + if c.ectrans and not c.ecapi: + for ofile in self.outputfilelist: + p = subprocess.check_call(['ectrans', '-overwrite', '-gateway', + c.gateway, '-remote', c.destination, + '-source', ofile]) - if step == c.maxstep and c.maxstep > 12 or \ - sdates == elimit: + if c.ecstorage and not c.ecapi: + for ofile in self.outputfilelist: + p = subprocess.check_call(['ecp', '-o', ofile, + os.path.expandvars(c.ecfsdir)]) - values = svdp[3] - grib_set_values(gid, values) - grib_set(gid, 'step', 0) - truedatetime = fdate + timedelta(hours= - 2*int(c.dtime)) - grib_set(gid, 'time', truedatetime.hour * 100) - grib_set(gid, 'date', truedatetime.year * 10000 + - truedatetime.month * 100 + - truedatetime.day) - grib_write(gid, h_handle) + if c.outputdir != c.inputdir: + for ofile in self.outputfilelist: + p = subprocess.check_call(['mv', ofile, c.outputdir]) - #values = (svdp[1]+svdp[2])/2. - if cparamId == '142' or cparamId == '143': - values = disaggregation.darain(list(reversed(svdp))) - else: - values = disaggregation.dapoly(list(reversed(svdp))) + return - grib_set(gid, 'step', 0) - truedatetime = fdate + timedelta(hours=int(c.dtime)) - grib_set(gid, 'time', truedatetime.hour * 100) - grib_set(gid, 'date', truedatetime.year * 10000 + - truedatetime.month * 100 + - truedatetime.day) - grib_set_values(gid, values) - grib_write(gid, g_handle) - grib_release(gid) + def prepare_fp_files(self, c): + ''' + @Description: + Conversion of GRIB files to FLEXPART binary format. - gid = grib_new_from_index(iid) + @Input: + c: instance of class ControlFile + Contains all the parameters of CONTROL file and + command line. + For more information about format and content of the parameter + see documentation. - f_handle.close() - g_handle.close() - h_handle.close() - grib_index_release(iid) + @Return: + <nothing> + ''' + # generate AVAILABLE file + # Example of AVAILABLE file data: + # 20131107 000000 EN13110700 ON DISC + clist = [] + for ofile in self.outputfilelist: + fname = ofile.split('/') + if '.' in fname[-1]: + l = fname[-1].split('.') + timestamp = datetime.strptime(l[0][-6:] + l[1], + '%y%m%d%H') + timestamp += timedelta(hours=int(l[2])) + cdate = datetime.strftime(timestamp, '%Y%m%d') + chms = datetime.strftime(timestamp, '%H%M%S') + else: + cdate = '20' + fname[-1][-8:-2] + chms = fname[-1][-2:] + '0000' + clist.append(cdate + ' ' + chms + ' '*6 + + fname[-1] + ' '*14 + 'ON DISC') + clist.sort() + with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f: + f.write('\n'.join(clist) + '\n') + + # generate pathnames file + pwd = os.path.abspath(c.outputdir) + with open(pwd + '/pathnames', 'w') as f: + f.write(pwd + '/Options/\n') + f.write(pwd + '/\n') + f.write(pwd + '/\n') + f.write(pwd + '/AVAILABLE\n') + f.write(' = == = == = == = == = == == = \n') + + # create Options dir if necessary + if not os.path.exists(pwd + '/Options'): + os.makedirs(pwd+'/Options') + + # read template COMMAND file + with open(os.path.expandvars(os.path.expanduser( + c.flexpart_root_scripts)) + '/../Options/COMMAND', 'r') as f: + lflist = f.read().split('\n') + + # find index of list where to put in the + # date and time information + # usually after the LDIRECT parameter + i = 0 + for l in lflist: + if 'LDIRECT' in l.upper(): + break + i += 1 + + # insert the date and time information of run start and end + # into the list of lines of COMMAND file + lflist = lflist[:i+1] + \ + [clist[0][:16], clist[-1][:16]] + \ + lflist[i+3:] + + # write the new COMMAND file + with open(pwd + '/Options/COMMAND', 'w') as g: + g.write('\n'.join(lflist) + '\n') + + # change to outputdir and start the grib2flexpart run + # afterwards switch back to the working dir + os.chdir(c.outputdir) + p = subprocess.check_call([ + os.path.expandvars(os.path.expanduser(c.flexpart_root_scripts)) + + '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']) + os.chdir(pwd) return - - - diff --git a/source/python/install.py b/source/python/install.py index da9ac582c58c3025d37650fac780a53ccab6a405..b94ae31907dc481c80fccb03f5565ea63dda6674 100755 --- a/source/python/install.py +++ b/source/python/install.py @@ -146,14 +146,10 @@ def install_via_gateway(c): @Input: c: instance of class ControlFile - Contains all necessary information of a CONTROL file. The parameters - are: DAY1, DAY2, DTIME, MAXSTEP, TYPE, TIME, STEP, CLASS, STREAM, - NUMBER, EXPVER, GRID, LEFT, LOWER, UPPER, RIGHT, LEVEL, LEVELIST, - RESOL, GAUSS, ACCURACY, OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, - SMOOTH, FORMAT, ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, - ECFSDIR, MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR - For more information about format and content of the parameter see - documentation. + Contains all the parameters of CONTROL file and + command line. + For more information about format and content of the parameter + see documentation. @Return: <nothing> diff --git a/source/python/mods/get_mars_data.py b/source/python/mods/get_mars_data.py index 4659b172525ec906671abf87562358e4ed1a92c9..a5456a1b2db24ce8b5acf63407521cc91dd9b523 100755 --- a/source/python/mods/get_mars_data.py +++ b/source/python/mods/get_mars_data.py @@ -101,15 +101,8 @@ def get_mars_data(c): @Input: c: instance of class ControlFile - Contains all the parameters of CONTROL file, which are e.g.: - DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, - STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, - LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, - OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, - ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, - MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME - DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS - + Contains all the parameters of CONTROL file and + command line. For more information about format and content of the parameter see documentation. @@ -218,16 +211,9 @@ def do_retrievement(c, server, start, end, delta_t, fluxes=False): retrieves the data from MARS. @Input: - c: instance of ControlFile - Contains all the parameters of CONTROL file, which are e.g.: - DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, - STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, - LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, - OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, - ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, - MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME - DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS - + c: instance of class ControlFile + Contains all the parameters of CONTROL file and + command line. For more information about format and content of the parameter see documentation. diff --git a/source/python/mods/prepare_flexpart.py b/source/python/mods/prepare_flexpart.py index 1e9a442b1a522d58cb9be7ce48ca6c0580f50b25..9015ec12479cfa54402e9e1c3630bcbe33fd96f3 100755 --- a/source/python/mods/prepare_flexpart.py +++ b/source/python/mods/prepare_flexpart.py @@ -114,15 +114,8 @@ def prepare_flexpart(ppid, c): from this script, it is "None". c: instance of class ControlFile - Contains all the parameters of CONTROL file, which are e.g.: - DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, - STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, - LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, - OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, - ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, - MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME - DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS - + Contains all the parameters of CONTROL file and + command line. For more information about format and content of the parameter see documentation. @@ -175,10 +168,14 @@ def prepare_flexpart(ppid, c): flexpart = EcFlexpart(c, fluxes=False) flexpart.create(inputfiles, c) flexpart.process_output(c) + if c.grib2flexpart: + # prepare environment for a FLEXPART run + # to convert grib to flexpart binary format + flexpart.prepare_fp_files(c) # check if in debugging mode, then store all files # otherwise delete temporary files - if int(c.debug) != 0: + if c.debug: print('\nTemporary files left intact') else: clean_up(c) diff --git a/source/python/mods/tools.py b/source/python/mods/tools.py index b2cd1dc33dfcadd2dbf20a908998b87d12a2472a..17c74024926de902bf35b10b5c2f926681c81995 100644 --- a/source/python/mods/tools.py +++ b/source/python/mods/tools.py @@ -163,15 +163,8 @@ def clean_up(c): @Input: c: instance of class ControlFile - Contains all the parameters of CONTROL file, which are e.g.: - DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, - STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, - LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, - OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, - ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, - MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME - DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS - + Contains all the parameters of CONTROL file and + command line. For more information about format and content of the parameter see documentation. diff --git a/source/python/submit.py b/source/python/submit.py index 75f81376d0bb58c5f3b2bcd382a7ff087df84395..37342d341a9bb2ecbe5cc7fc742c2f9f6f7b4f23 100755 --- a/source/python/submit.py +++ b/source/python/submit.py @@ -117,15 +117,8 @@ def submit(jtemplate, c, queue): Default is "job.temp". c: instance of class ControlFile - Contains all the parameters of CONTROL file, which are e.g.: - DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, - STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, - LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, - OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, - ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, - MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME - DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS - + Contains all the parameters of CONTROL file and + command line. For more information about format and content of the parameter see documentation.