From ca867de340aa450bfbc241b4cae4e578464d6633 Mon Sep 17 00:00:00 2001 From: Anne Philipp <anne.philipp@univie.ac.at> Date: Fri, 5 Oct 2018 17:35:18 +0200 Subject: [PATCH] refactored functions in EcFlexpart and did some minor changes --- run/control/CONTROL.req.test | 37 ++ run/control/CONTROL.temp | 2 +- run/jobscripts/job.ksh | 31 +- run/run.sh | 100 ++++- source/python/classes/ControlFile.py | 18 +- source/python/classes/EcFlexpart.py | 528 +++++++++++++------------ source/python/classes/MarsRetrieval.py | 5 +- source/python/classes/UioFiles.py | 2 + source/python/mods/get_mars_data.py | 45 +-- source/python/mods/prepare_flexpart.py | 2 + source/python/mods/tools.py | 98 +++-- 11 files changed, 544 insertions(+), 324 deletions(-) create mode 100644 run/control/CONTROL.req.test mode change 100644 => 100755 run/run.sh diff --git a/run/control/CONTROL.req.test b/run/control/CONTROL.req.test new file mode 100644 index 0000000..8bde8b8 --- /dev/null +++ b/run/control/CONTROL.req.test @@ -0,0 +1,37 @@ +DAY1 20100809 +DAY2 +DTIME 3 +TYPE AN FC FC FC FC FC AN FC FC FC FC FC AN FC FC FC FC FC AN FC FC FC FC FC +TIME 00 00 00 00 00 00 06 00 00 00 00 00 12 12 12 12 12 12 18 12 12 12 12 12 +STEP 00 01 02 03 04 05 00 07 08 09 10 11 00 01 02 03 04 05 00 07 08 09 10 11 +CLASS EI +STREAM OPER +NUMBER OFF +EXPVER 1 +GRID 1000 +LEFT -50000 +LOWER -90000 +UPPER 90000 +RIGHT 50000 +LEVEL 60 +LEVELIST 1/to/60 +RESOL 63 +GAUSS 1 +ACCURACY 16 +OMEGA 0 +OMEGADIFF 0 +ETA 0 +ETADIFF 0 +DPDETA 1 +SMOOTH 0 +FORMAT GRIB1 +ADDPAR 186/187/188/235/139/39 +PREFIX EN +ECSTORAGE 1 +ECTRANS 0 +ECFSDIR ectmp:/${USER}/econdemand/ +MAILFAIL ${USER} +MAILOPS ${USER} +GRIB2FLEXPART 0 +EOF + diff --git a/run/control/CONTROL.temp b/run/control/CONTROL.temp index b972567..0bc7a23 100644 --- a/run/control/CONTROL.temp +++ b/run/control/CONTROL.temp @@ -14,7 +14,7 @@ LOWER 30000 UPPER 75000 RIGHT 45000 LEVEL 60 -LEVELIST 55/to/60 +LEVELIST 40/to/60 RESOL 63 GAUSS 1 ACCURACY 16 diff --git a/run/jobscripts/job.ksh b/run/jobscripts/job.ksh index b659d5a..7a7b1e4 100644 --- a/run/jobscripts/job.ksh +++ b/run/jobscripts/job.ksh @@ -32,7 +32,7 @@ case $HOST in module unload emos module load grib_api/1.14.5 module load emos/437-r64 - export PATH=${PATH}:${HOME}/flex_extract_v7.1/python + export PATH=${PATH}:${HOME}/flex_extract_v7.1/source/python ;; *cca*) module switch PrgEnv-cray PrgEnv-intel @@ -40,7 +40,7 @@ case $HOST in module load emos module load python export SCRATCH=$TMPDIR - export PATH=${PATH}:${HOME}/flex_extract_v7.1/python + export PATH=${PATH}:${HOME}/flex_extract_v7.1/source/python ;; esac @@ -48,14 +48,14 @@ cd $SCRATCH mkdir -p python$$ cd python$$ -export CONTROL=$PWD/CONTROL +export CONTROL=CONTROL cat >$CONTROL<<EOF -accuracy 24 +accuracy 16 addpar 186 187 188 235 139 39 area basetime None -controlfile CONTROL.test +controlfile CONTROL.temp cwc 0 date_chunk 3 debug 1 @@ -67,7 +67,7 @@ ecgid at ecstorage 0 ectrans 1 ecuid km4a -end_date 20160606 +end_date 20120908 eta 0 etadiff 0 etapar 77 @@ -77,12 +77,12 @@ gateway srvx8.img.univie.ac.at gauss 1 grib2flexpart 0 grid 5000 -inputdir ../work +inputdir ../../run/workspace/test install_target None job_template job.temp -left -10000 +left -15000 level 60 -levelist 59/to/60 +levelist 40/to/60 logicals gauss omega omegadiff eta etadiff dpdeta cwc wrf grib2flexpart ecstorage ectrans debug request lower 30000 mailfail ${USER} @@ -93,19 +93,20 @@ maxstep 11 number OFF omega 0 omegadiff 0 -outputdir ../work -prefix EItest_ -queue ecgate +outputdir ../../run/workspace/test +ppid 41511 +prefix EI +queue local request 1 resol 63 -right 10000 +right 45000 smooth 0 -start_date 20160606 +start_date 20120908 step 00 01 02 03 04 05 00 07 08 09 10 11 00 01 02 03 04 05 00 07 08 09 10 11 stream OPER time 00 00 00 00 00 00 06 00 00 00 00 00 12 12 12 12 12 12 18 12 12 12 12 12 type AN FC FC FC FC FC AN FC FC FC FC FC AN FC FC FC FC FC AN FC FC FC FC FC -upper 40000 +upper 75000 wrf 0 EOF diff --git a/run/run.sh b/run/run.sh old mode 100644 new mode 100755 index afabaa8..f192210 --- a/run/run.sh +++ b/run/run.sh @@ -1,4 +1,100 @@ #!/bin/bash - -pyscript = ../python/submit.py +# +# @Author: Anne Philipp +# +# @Date: October, 4 2018 +# +# @Description: +# + + +# ----------------------------------------------------------------- +# AVAILABLE COMMANDLINE ARGUMENTS TO SET +# +# THE USER HAS TO SPECIFY THESE PARAMETER +# + +QUEUE=None +START_DATE='20120908' +END_DATE=None +DATE_CHUNK=None +BASETIME=None +STEP=None +LEVELIST=None +AREA=None +INPUTDIR='../../run/workspace/test' +OUTPUTDIR=None +FLEXPART_ROOT_SCRIPTS=None +PP_ID=None +JOB_TEMPLATE='job.temp' +CONTROLFILE='CONTROL.temp' +DEBUG=1 +REQUEST=1 + +# ----------------------------------------------------------------- +# +# AFTER THIS LINE THE USER DOES NOT HAVE TO CHANGE ANYTHING !!! +# +# ----------------------------------------------------------------- + +# PATH TO SUBMISSION SCRIPT +pyscript=../source/python/submit.py + +# INITIALIZE EMPTY PARAMETERLIST +parameterlist="" + +# CHECK FOR MORE PARAMETER +if [ -n "$START_DATE" ]; then + parameterlist+=" --start_date=$START_DATE" +fi +if [ -n "$END_DATE" ]; then + parameterlist+=" --end_date=$END_DATE" +fi +if [ -n "$DATE_CHUNK" ]; then + parameterlist+=" --date_chunk=$DATE_CHUNK" +fi +if [ -n "$BASETIME" ]; then + parameterlist+=" --basetime=$BASETIME" +fi +if [ -n "$STEP" ]; then + parameterlist+=" --step=$STEP" +fi +if [ -n "$LEVELIST" ]; then + parameterlist+=" --levelist=$LEVELIST" +fi +if [ -n "$AREA" ]; then + parameterlist+=" --area=$AREA" +fi +if [ -n "$INPUTDIR" ]; then + parameterlist+=" --inputdir=$INPUTDIR" +fi +if [ -n "$OUTPUTDIR" ]; then + parameterlist+=" --outputdir=$OUTPUTDIR" +fi +if [ -n "$FLEXPART_ROOT_SCRIPTS" ]; then + parameterlist+=" --flexpart_root_scripts=$FLEXPART_ROOT_SCRIPTS" +fi +if [ -n "$PP_ID" ]; then + parameterlist+=" --ppid=$PP_ID" +fi +if [ -n "$JOB_TEMPLATE" ]; then + parameterlist+=" --job_template=$JOB_TEMPLATE" +fi +if [ -n "$QUEUE" ]; then + parameterlist+=" --queue=$QUEUE" +fi +if [ -n "$CONTROLFILE" ]; then + parameterlist+=" --controlfile=$CONTROLFILE" +fi +if [ -n "$DEBUG" ]; then + parameterlist+=" --debug=$DEBUG" +fi +if [ -n "$REQUEST" ]; then + parameterlist+=" --request=$REQUEST" +fi + +# ----------------------------------------------------------------- +# CALL INSTALLATION SCRIPT WITH DETERMINED COMMANDLINE ARGUMENTS + +$pyscript $parameterlist diff --git a/source/python/classes/ControlFile.py b/source/python/classes/ControlFile.py index 21c12e2..0b8a62b 100644 --- a/source/python/classes/ControlFile.py +++ b/source/python/classes/ControlFile.py @@ -56,7 +56,10 @@ import re import sys import inspect +# software specific classes and modules from flex_extract +sys.path.append('../') import _config +from mods.tools import my_error # ------------------------------------------------------------------------------ # CLASS @@ -143,7 +146,7 @@ class ControlFile(object): self.ecstorage = 0 self.ectrans = 0 self.inputdir = _config.PATH_INPUT_DIR - self.outputdir = self.inputdir + self.outputdir = None self.ecmwfdatadir = _config.PATH_FLEXEXTRACT_DIR self.exedir = _config.PATH_FORTRAN_SRC self.flexpart_root_scripts = None @@ -176,9 +179,6 @@ class ControlFile(object): @Return: <nothing> ''' - from mods.tools import my_error - - # read whole CONTROL file try: with open(os.path.join(_config.PATH_CONTROLFILES, @@ -354,6 +354,13 @@ class ControlFile(object): if self.end_date is None: self.end_date = self.start_date + # basetime has only two possible values + if self.basetime: + if int(self.basetime) != 0 and int(self.basetime) != 12: + print('Basetime has an invalid value!') + print('Basetime = ' + str(self.basetime)) + sys.exit(1) + # assure consistency of levelist and level if self.levelist is None and self.level is None: print('Warning: neither levelist nor level \ @@ -406,6 +413,9 @@ class ControlFile(object): if not self.flexpart_root_scripts: self.flexpart_root_scripts = self.ecmwfdatadir + if not self.outputdir: + self.outputdir = self.inputdir + if not isinstance(self.mailfail, list): if ',' in self.mailfail: self.mailfail = self.mailfail.split(',') diff --git a/source/python/classes/EcFlexpart.py b/source/python/classes/EcFlexpart.py index 790cb70..3bec913 100644 --- a/source/python/classes/EcFlexpart.py +++ b/source/python/classes/EcFlexpart.py @@ -73,10 +73,11 @@ # ------------------------------------------------------------------------------ # MODULES # ------------------------------------------------------------------------------ -import subprocess -import shutil import os +import sys import glob +import shutil +import subprocess from datetime import datetime, timedelta import numpy as np from gribapi import grib_set, grib_index_select, grib_new_from_index, grib_get,\ @@ -84,11 +85,12 @@ from gribapi import grib_set, grib_index_select, grib_new_from_index, grib_get,\ grib_index_release, grib_index_get # software specific classes and modules from flex_extract +sys.path.append('../') import _config from GribTools import GribTools from mods.tools import init128, to_param_id, silent_remove, product, my_error from MarsRetrieval import MarsRetrieval -import mods.disaggregation +import mods.disaggregation as disaggregation # ------------------------------------------------------------------------------ # CLASS @@ -286,8 +288,6 @@ class EcFlexpart(object): self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \ 'SFC', '1', self.grid] - # if needed, add additional WRF specific parameters here - return @@ -381,6 +381,63 @@ class EcFlexpart(object): return + def _mk_index_values(self, inputdir, inputfiles, keys): + ''' + @Description: + Creates an index file for a set of grib parameter keys. + The values from the index keys are returned in a list. + + @Input: + keys: dictionary + List of parameter names which serves as index. + + inputfiles: instance of UioFiles + Contains a list of files. + + @Return: + iid: grib_index + This is a grib specific index structure to access + messages in a file. + + index_vals: list + Contains the values from the keys used for a distinct selection + of grib messages in processing the grib files. + Content looks like e.g.: + index_vals[0]: ('20171106', '20171107', '20171108') ; date + index_vals[1]: ('0', '1200', '1800', '600') ; time + index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange + ''' + iid = None + index_keys = keys + + indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX) + silent_remove(indexfile) + grib = GribTools(inputfiles.files) + # creates new index file + iid = grib.index(index_keys=index_keys, index_file=indexfile) + + # read the values of index keys + index_vals = [] + for key in index_keys: + #index_vals.append(grib_index_get(iid, key)) + #print(index_vals[-1]) + key_vals = grib_index_get(iid, key) + print(key_vals) + # have to sort the steps for disaggregation, + # therefore convert to int first + if key == 'step': + key_vals = [int(k) for k in key_vals] + key_vals.sort() + key_vals = [str(k) for k in key_vals] + index_vals.append(key_vals) + # index_vals looks for example like: + # index_vals[0]: ('20171106', '20171107', '20171108') ; date + # index_vals[1]: ('0', '1200') ; time + # index_vals[2]: (3', '6', '9', '12') ; stepRange + + return iid, index_vals + + def retrieve(self, server, dates, request, inputdir='.'): ''' @Description: @@ -426,7 +483,7 @@ class EcFlexpart(object): t12h = timedelta(hours=12) t24h = timedelta(hours=24) - # dictionary which contains all parameter for the mars request + # dictionary which contains all parameter for the mars request, # entries with a "None" will change in different requests and will # therefore be set in each request seperately retr_param_dict = {'marsclass':self.marsclass, @@ -673,112 +730,95 @@ class EcFlexpart(object): table128 = init128(_config.PATH_GRIBTABLE) pars = to_param_id(self.params['OG_acc_SL'][0], table128) - index_keys = ["date", "time", "step"] - indexfile = os.path.join(c.inputdir, _config.FILE_GRIB_INDEX) - silent_remove(indexfile) - grib = GribTools(inputfiles.files) - # creates new index file - iid = grib.index(index_keys=index_keys, index_file=indexfile) - # read values of index keys - index_vals = [] - for key in index_keys: - key_vals = grib_index_get(iid, key) - print(key_vals) - # have to sort the steps for disaggregation, - # therefore convert to int first - if key == 'step': - key_vals = [int(k) for k in key_vals] - key_vals.sort() - key_vals = [str(k) for k in key_vals] - index_vals.append(key_vals) - # index_vals looks for example like: - # index_vals[0]: ('20171106', '20171107', '20171108') ; date - # index_vals[1]: ('0', '1200') ; time - # index_vals[2]: (3', '6', '9', '12') ; stepRange + iid = None + index_vals = None + + # get the values of the keys which are used for distinct access + # of grib messages via product + index_keys = ["date", "time", "step"] + iid, index_vals = self._mk_index_values(c.inputdir, + inputfiles, + index_keys) + # index_vals looks like e.g.: + # index_vals[0]: ('20171106', '20171107', '20171108') ; date + # index_vals[1]: ('0', '1200', '1800', '600') ; time + # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange valsdict = {} svalsdict = {} - stepsdict = {} +# stepsdict = {} for p in pars: valsdict[str(p)] = [] svalsdict[str(p)] = [] - stepsdict[str(p)] = [] +# stepsdict[str(p)] = [] print('maxstep: ', c.maxstep) + # "product" genereates each possible combination between the + # values of the index keys for prod in product(*index_vals): # e.g. prod = ('20170505', '0', '12') # ( date ,time, step) - # per date e.g. time = 0, 1200 - # per time e.g. step = 3, 6, 9, 12 - print('current prod: ', prod) + + print('current product: ', prod) + for i in range(len(index_keys)): grib_index_select(iid, index_keys[i], prod[i]) # get first id from current product gid = grib_new_from_index(iid) - # if there is data for this product combination - # prepare some date and time parameter before reading the data - if gid is not None: - cdate = grib_get(gid, 'date') - time = grib_get(gid, 'time') - step = grib_get(gid, 'step') - # date+time+step-2*dtime - # (since interpolated value valid for step-2*dtime) - sdate = datetime(year=cdate/10000, - month=(cdate % 10000)/100, - day=(cdate % 100), - hour=time/100) - fdate = sdate + timedelta(hours=step-2*int(c.dtime)) - sdates = sdate + timedelta(hours=step) - elimit = None - else: - break + # if there is no data for this specific time combination / product + # skip the rest of the for loop and start with next timestep/product + if not gid: + continue + + # create correct timestamp from the three time informations + cdate = str(grib_get(gid, 'date')) + ctime = '{:0>2}'.format(grib_get(gid, 'time')/100) + cstep = '{:0>3}'.format(grib_get(gid, 'step')) + t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H') + t_dt = t_date + timedelta(hours=int(cstep)) + t_m1dt = t_date + timedelta(hours=int(cstep)-int(c.dtime)) + t_m2dt = t_date + timedelta(hours=int(cstep)-2*int(c.dtime)) + t_enddate = None if c.maxstep > 12: fnout = os.path.join(c.inputdir, 'flux' + - sdate.strftime('%Y%m%d') + - '.{:0>2}'.format(time/100) + + t_date.strftime('%Y%m%d.%H') + '.{:0>3}'.format(step-2*int(c.dtime))) gnout = os.path.join(c.inputdir, 'flux' + - sdate.strftime('%Y%m%d') + - '.{:0>2}'.format(time/100) + + t_date.strftime('%Y%m%d.%H') + '.{:0>3}'.format(step-int(c.dtime))) hnout = os.path.join(c.inputdir, 'flux' + - sdate.strftime('%Y%m%d') + - '.{:0>2}'.format(time/100) + + t_date.strftime('%Y%m%d.%H') + '.{:0>3}'.format(step)) else: fnout = os.path.join(c.inputdir, 'flux' + - fdate.strftime('%Y%m%d%H')) + t_m2dt.strftime('%Y%m%d%H')) gnout = os.path.join(c.inputdir, 'flux' + - (fdate + timedelta(hours=int(c.dtime))). - strftime('%Y%m%d%H')) + t_m1dt.strftime('%Y%m%d%H')) hnout = os.path.join(c.inputdir, 'flux' + - sdates.strftime('%Y%m%d%H')) + t_dt.strftime('%Y%m%d%H')) print("outputfile = " + fnout) - f_handle = open(fnout, 'w') - g_handle = open(gnout, 'w') - h_handle = open(hnout, 'w') # read message for message and store relevant data fields # data keywords are stored in pars while 1: - if gid is None: + if not gid: break cparamId = str(grib_get(gid, 'paramId')) step = grib_get(gid, 'step') - atime = grib_get(gid, 'time') + time = grib_get(gid, 'time') ni = grib_get(gid, 'Ni') nj = grib_get(gid, 'Nj') if cparamId in valsdict.keys(): values = grib_get_values(gid) vdp = valsdict[cparamId] svdp = svalsdict[cparamId] - sd = stepsdict[cparamId] + # sd = stepsdict[cparamId] if cparamId == '142' or cparamId == '143': fak = 1. / 1000. @@ -792,11 +832,9 @@ class EcFlexpart(object): else: # deaccumulate values svdp.append((vdp[-1] - vdp[-2]) / int(c.dtime)) - print(cparamId, atime, step, len(values), + print(cparamId, time, step, len(values), values[0], np.std(values)) - # save the 1/3-hourly or specific values - # svdp.append(values[:]) - sd.append(step) + # len(svdp) correspond to the time if len(svdp) >= 3: if len(svdp) > 3: @@ -806,7 +844,7 @@ class EcFlexpart(object): values = disaggregation.dapoly(svdp) if not (step == c.maxstep and c.maxstep > 12 \ - or sdates == elimit): + or t_dt == t_enddate): vdp.pop(0) svdp.pop(0) else: @@ -816,41 +854,45 @@ class EcFlexpart(object): values = svdp[0] grib_set_values(gid, values) + if c.maxstep > 12: grib_set(gid, 'step', max(0, step-2*int(c.dtime))) else: grib_set(gid, 'step', 0) - grib_set(gid, 'time', fdate.hour*100) - grib_set(gid, 'date', fdate.year*10000 + - fdate.month*100+fdate.day) - grib_write(gid, f_handle) + grib_set(gid, 'time', t_m2dt.hour*100) + grib_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d'))) + + with open(fnout, 'w') as f_handle: + grib_write(gid, f_handle) if c.basetime: - elimit = datetime.strptime(c.end_date + - c.basetime, '%Y%m%d%H') + t_enddate = datetime.strptime(c.end_date + + c.basetime, + '%Y%m%d%H') else: - elimit = sdate + timedelta(2*int(c.dtime)) + t_enddate = t_date + timedelta(2*int(c.dtime)) # squeeze out information of last two steps contained # in svdp # if step+int(c.dtime) == c.maxstep and c.maxstep>12 - # or sdates+timedelta(hours = int(c.dtime)) - # >= elimit: + # or t_dt+timedelta(hours = int(c.dtime)) + # >= t_enddate: # Note that svdp[0] has not been popped in this case if step == c.maxstep and c.maxstep > 12 or \ - sdates == elimit: + t_dt == t_enddate: values = svdp[3] grib_set_values(gid, values) grib_set(gid, 'step', 0) - truedatetime = fdate + timedelta(hours= + truedatetime = t_m2dt + timedelta(hours= 2*int(c.dtime)) grib_set(gid, 'time', truedatetime.hour * 100) grib_set(gid, 'date', truedatetime.year * 10000 + truedatetime.month * 100 + truedatetime.day) - grib_write(gid, h_handle) + with open(hnout, 'w') as h_handle: + grib_write(gid, h_handle) #values = (svdp[1]+svdp[2])/2. if cparamId == '142' or cparamId == '143': @@ -859,21 +901,18 @@ class EcFlexpart(object): values = disaggregation.dapoly(list(reversed(svdp))) grib_set(gid, 'step', 0) - truedatetime = fdate + timedelta(hours=int(c.dtime)) + truedatetime = t_m2dt + timedelta(hours=int(c.dtime)) grib_set(gid, 'time', truedatetime.hour * 100) grib_set(gid, 'date', truedatetime.year * 10000 + truedatetime.month * 100 + truedatetime.day) grib_set_values(gid, values) - grib_write(gid, g_handle) + with open(gnout, 'w') as g_handle: + grib_write(gid, g_handle) - grib_release(gid) - - gid = grib_new_from_index(iid) + grib_release(gid) - f_handle.close() - g_handle.close() - h_handle.close() + gid = grib_new_from_index(iid) grib_index_release(iid) @@ -912,144 +951,128 @@ class EcFlexpart(object): <nothing> ''' - table128 = init128(_config.PATH_GRIBTABLE) - wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\ - stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4', - table128) - - index_keys = ["date", "time", "step"] - indexfile = os.path.join(c.inputdir, _config.FILE_GRIB_INDEX) - silent_remove(indexfile) - grib = GribTools(inputfiles.files) - # creates new index file - iid = grib.index(index_keys=index_keys, index_file=indexfile) - - # read values of index keys - index_vals = [] - for key in index_keys: - index_vals.append(grib_index_get(iid, key)) - print(index_vals[-1]) - # index_vals looks for example like: - # index_vals[0]: ('20171106', '20171107', '20171108') ; date - # index_vals[1]: ('0', '1200', '1800', '600') ; time - # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange - + if c.wrf: + table128 = init128(_config.PATH_GRIBTABLE) + wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\ + stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4', + table128) + + # these numbers are indices for the temporary files "fort.xx" + # which are used to seperate the grib fields to, + # for the Fortran program input + # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields + # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc fdict = {'10':None, '11':None, '12':None, '13':None, '16':None, - '17':None, '19':None, '21':None, '22':None, '20':None} + '17':None, '18':None, '19':None, '21':None, '22':None} + iid = None + index_vals = None + + # get the values of the keys which are used for distinct access + # of grib messages via product + index_keys = ["date", "time", "step"] + iid, index_vals = self._mk_index_values(c.inputdir, + inputfiles, + index_keys) + # index_vals looks like e.g.: + # index_vals[0]: ('20171106', '20171107', '20171108') ; date + # index_vals[1]: ('0', '1200', '1800', '600') ; time + # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange + + # "product" genereates each possible combination between the + # values of the index keys for prod in product(*index_vals): # e.g. prod = ('20170505', '0', '12') # ( date ,time, step) - # per date e.g. time = 0, 1200 - # per time e.g. step = 3, 6, 9, 12 - # flag for Fortran program and file merging - convertFlag = False - print('current prod: ', prod) - # e.g. prod = ('20170505', '0', '12') - # ( date ,time, step) - # per date e.g. time = 0, 600, 1200, 1800 - # per time e.g. step = 0, 3, 6, 9, 12 + + print('current product: ', prod) + for i in range(len(index_keys)): grib_index_select(iid, index_keys[i], prod[i]) # get first id from current product gid = grib_new_from_index(iid) - # if there is data for this product combination - # prepare some date and time parameter before reading the data - if gid is not None: - # Combine all temporary data files into final grib file if - # gid is at least one time not None. Therefore set convertFlag - # to save information. The Fortran program is also - # only executed if convertFlag is True - convertFlag = True - # remove old fort.* files and open new ones - # they are just valid for a single product - for k, f in fdict.iteritems(): - fortfile = os.path.join(c.inputdir, 'fort.' + k) - silent_remove(fortfile) - fdict[k] = open(fortfile, 'w') - - cdate = str(grib_get(gid, 'date')) - time = grib_get(gid, 'time') - step = grib_get(gid, 'step') - # create correct timestamp from the three time informations - # date, time, step - timestamp = datetime.strptime(cdate + '{:0>2}'.format(time/100), - '%Y%m%d%H') - timestamp += timedelta(hours=int(step)) - cdateH = datetime.strftime(timestamp, '%Y%m%d%H') - - if c.basetime: - slimit = datetime.strptime(c.start_date + '00', '%Y%m%d%H') - bt = '23' - if c.basetime == '00': - bt = '00' - slimit = datetime.strptime(c.end_date + bt, '%Y%m%d%H')\ - - timedelta(hours=12-int(c.dtime)) - if c.basetime == '12': - bt = '12' - slimit = datetime.strptime(c.end_date + bt, '%Y%m%d%H')\ - - timedelta(hours=12-int(c.dtime)) - - elimit = datetime.strptime(c.end_date + bt, '%Y%m%d%H') - - if timestamp < slimit or timestamp > elimit: - continue - else: - pass - - try: - if c.wrf: - if 'olddate' not in locals() or cdate != olddate: - fwrf = open(os.path.join(c.outputdir, - 'WRF' + cdate + '.{:0>2}'.format(time) + - '.000.grb2'), 'w') - olddate = cdate[:] - except AttributeError: - pass - - # helper variable to remember which fields were already used. + # if there is no data for this specific time combination / product + # skip the rest of the for loop and start with next timestep/product + if not gid: + continue + + # remove old fort.* files and open new ones + # they are just valid for a single product + for k, f in fdict.iteritems(): + fortfile = os.path.join(c.inputdir, 'fort.' + k) + silent_remove(fortfile) + fdict[k] = open(fortfile, 'w') + + # create correct timestamp from the three time informations + cdate = str(grib_get(gid, 'date')) + ctime = '{:0>2}'.format(grib_get(gid, 'time')/100) + cstep = '{:0>3}'.format(grib_get(gid, 'step')) + timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H') + timestamp += timedelta(hours=int(cstep)) + cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H') + + # if the timestamp is out of basetime start/end date period, + # skip this specific product + if c.basetime: + start_time = datetime.strptime(c.end_date + c.basetime, + '%Y%m%d%H') - time_delta + end_time = datetime.strptime(c.end_date + c.basetime, + '%Y%m%d%H') + if timestamp < start_time or timestamp > end_time: + continue + + if c.wrf: + if 'olddate' not in locals() or cdate != olddate: + fwrf = open(os.path.join(c.outputdir, + 'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w') + olddate = cdate[:] + + # savedfields remembers which fields were already used. savedfields = [] + # sum of cloud liquid and ice water content + scwc = None while 1: - if gid is None: + if not gid: break paramId = grib_get(gid, 'paramId') gridtype = grib_get(gid, 'gridType') levtype = grib_get(gid, 'typeOfLevel') - if paramId == 133 and gridtype == 'reduced_gg': - # Specific humidity (Q.grb) is used as a template only - # so we need the first we "meet" - with open(os.path.join(c.inputdir, 'fort.18'), 'w') as fout: - grib_write(gid, fout) - elif paramId == 131 or paramId == 132: - grib_write(gid, fdict['10']) - elif paramId == 130: + if paramId == 77: # ETADOT + grib_write(gid, fdict['21']) + elif paramId == 130: # T grib_write(gid, fdict['11']) - elif paramId == 133 and gridtype != 'reduced_gg': + elif paramId == 131 or paramId == 132: # U, V wind component + grib_write(gid, fdict['10']) + elif paramId == 133 and gridtype != 'reduced_gg': # Q grib_write(gid, fdict['17']) - elif paramId == 152: + elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian + grib_write(gid, fdict['18']) + elif paramId == 135: # W + grib_write(gid, fdict['19']) + elif paramId == 152: # LNSP grib_write(gid, fdict['12']) - elif paramId == 155 and gridtype == 'sh': + elif paramId == 155 and gridtype == 'sh': # D grib_write(gid, fdict['13']) - elif paramId in [129, 138, 155] and levtype == 'hybrid' \ - and c.wrf: - pass - elif paramId == 246 or paramId == 247: - # cloud liquid water and ice - if paramId == 246: - clwc = grib_get_values(gid) + elif paramId == 246 or paramId == 247: # CLWC, CIWC + # sum cloud liquid water and ice + if not scwc: + scwc = grib_get_values(gid) else: - clwc += grib_get_values(gid) - grib_set_values(gid, clwc) + scwc += grib_get_values(gid) + grib_set_values(gid, scwc) grib_set(gid, 'paramId', 201031) grib_write(gid, fdict['22']) - elif paramId == 135: - grib_write(gid, fdict['19']) - elif paramId == 77: - grib_write(gid, fdict['21']) + elif c.wrf and paramId in [129, 138, 155] and \ + levtype == 'hybrid': # Z, VO, D + # do not do anything right now + # these are specific parameter for WRF + pass else: if paramId not in savedfields: + # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR + # and all ADDPAR parameter grib_write(gid, fdict['16']) savedfields.append(paramId) else: @@ -1073,56 +1096,51 @@ class EcFlexpart(object): for f in fdict.values(): f.close() - # call for Fortran program if flag is True - if convertFlag: - pwd = os.getcwd() - os.chdir(c.inputdir) - if os.stat('fort.21').st_size == 0 and c.eta: - print('Parameter 77 (etadot) is missing, most likely it is \ - not available for this type or date/time\n') - print('Check parameters CLASS, TYPE, STREAM, START_DATE\n') - my_error(c.mailfail, 'fort.21 is empty while parameter eta \ - is set to 1 in CONTROL file') - - # create the corresponding output file fort.15 - # (generated by Fortran program) + fort.16 (paramId 167 and 168) - p = subprocess.check_call([os.path.join( - c.exedir, _config.FORTRAN_EXECUTABLE)], shell=True) - os.chdir(pwd) - - # create final output filename, e.g. EN13040500 (ENYYMMDDHH) - fnout = os.path.join(c.inputdir, c.prefix) - if c.maxstep > 12: - suffix = cdate[2:8] + '.{:0>2}'.format(time/100) + \ - '.{:0>3}'.format(step) - else: - suffix = cdateH[2:10] - fnout += suffix - print("outputfile = " + fnout) - self.outputfilelist.append(fnout) # needed for final processing - - # create outputfile and copy all data from intermediate files - # to the outputfile (final GRIB files) - orolsm = os.path.basename(glob.glob( - c.inputdir + '/OG_OROLSM__SL.*.' + c.ppid + '*')[0]) - fluxfile = 'flux' + cdate[0:2] + suffix - if not c.cwc: - flist = ['fort.15', fluxfile, 'fort.16', orolsm] - else: - flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm] - - with open(fnout, 'wb') as fout: - for f in flist: - shutil.copyfileobj(open(os.path.join(c.inputdir, f), - 'rb'), fout) - - if c.omega: - with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout: - shutil.copyfileobj( - open(os.path.join(c.inputdir, 'fort.25'), - 'rb'), fout) + # call for Fortran program to convert e.g. reduced_gg grids to + # regular_ll and calculate detadot/dp + pwd = os.getcwd() + os.chdir(c.inputdir) + if os.stat('fort.21').st_size == 0 and c.eta: + print('Parameter 77 (etadot) is missing, most likely it is \ + not available for this type or date/time\n') + print('Check parameters CLASS, TYPE, STREAM, START_DATE\n') + my_error(c.mailfail, 'fort.21 is empty while parameter eta \ + is set to 1 in CONTROL file') + + # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q) + p = subprocess.check_call([os.path.join( + c.exedir, _config.FORTRAN_EXECUTABLE)], shell=True) + os.chdir(pwd) + + # create name of final output file, e.g. EN13040500 (ENYYMMDDHH) + if c.maxstep > 12: + suffix = cdate[2:8] + '.' + ctime + '.' + cstep + else: + suffix = cdate_hour[2:10] + fnout = os.path.join(c.inputdir, c.prefix + suffix) + print("outputfile = " + fnout) + # collect for final processing + self.outputfilelist.append(os.path.basename(fnout)) + + # create outputfile and copy all data from intermediate files + # to the outputfile (final GRIB input files for FLEXPART) + orolsm = os.path.basename(glob.glob(c.inputdir + + '/OG_OROLSM__SL.*.' + c.ppid + '*')[0]) + fluxfile = 'flux' + cdate[0:2] + suffix + if not c.cwc: + flist = ['fort.15', fluxfile, 'fort.16', orolsm] else: - pass + flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm] + + with open(fnout, 'wb') as fout: + for f in flist: + shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'), + fout) + + if c.omega: + with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout: + shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'), + 'rb'), fout) if c.wrf: fwrf.close() @@ -1167,7 +1185,7 @@ class EcFlexpart(object): print('ectrans: {}\n gateway: {}\n destination: {}\n ' .format(c.ectrans, c.gateway, c.destination)) - print('Output filelist: \n') + print('Output filelist: ') print(self.outputfilelist) if c.format.lower() == 'grib2': @@ -1190,7 +1208,9 @@ class EcFlexpart(object): if c.outputdir != c.inputdir: for ofile in self.outputfilelist: - p = subprocess.check_call(['mv', ofile, c.outputdir]) + p = subprocess.check_call(['mv', + os.path.join(c.inputdir, ofile), + c.outputdir]) return diff --git a/source/python/classes/MarsRetrieval.py b/source/python/classes/MarsRetrieval.py index d01cf50..b507518 100644 --- a/source/python/classes/MarsRetrieval.py +++ b/source/python/classes/MarsRetrieval.py @@ -61,9 +61,12 @@ # ------------------------------------------------------------------------------ # MODULES # ------------------------------------------------------------------------------ -import subprocess import os +import sys +import subprocess +# software specific classes and modules from flex_extract +sys.path.append('../') import _config # ------------------------------------------------------------------------------ # CLASS diff --git a/source/python/classes/UioFiles.py b/source/python/classes/UioFiles.py index e9eaf86..e891608 100644 --- a/source/python/classes/UioFiles.py +++ b/source/python/classes/UioFiles.py @@ -47,9 +47,11 @@ # MODULES # ------------------------------------------------------------------------------ import os +import sys import fnmatch # software specific module from flex_extract +sys.path.append('../') #import profiling from mods.tools import silent_remove, get_list_as_string diff --git a/source/python/mods/get_mars_data.py b/source/python/mods/get_mars_data.py index a5456a1..17ac358 100755 --- a/source/python/mods/get_mars_data.py +++ b/source/python/mods/get_mars_data.py @@ -47,19 +47,21 @@ # ------------------------------------------------------------------------------ import os import sys -import datetime import inspect -try: - ecapi = True - import ecmwfapi -except ImportError: - ecapi = False +from datetime import datetime, timedelta # software specific classes and modules from flex_extract +sys.path.append('../') import _config from tools import my_error, normal_exit, get_cmdline_arguments, read_ecenv from classes.EcFlexpart import EcFlexpart from classes.UioFiles import UioFiles + +try: + ecapi = True + import ecmwfapi +except ImportError: + ecapi = False # ------------------------------------------------------------------------------ # FUNCTION # ------------------------------------------------------------------------------ @@ -146,30 +148,23 @@ def get_mars_data(c): # also sobald es Tagesüberschneidungen gibt # allerdings ist das relevant und ersichtlich an den NICHT FLUSS DATEN - - # set start date of retrieval period - start = datetime.date(year=int(c.start_date[:4]), - month=int(c.start_date[4:6]), - day=int(c.start_date[6:])) - startm1 = start - datetime.timedelta(days=1) - - # set end date of retrieval period - end = datetime.date(year=int(c.end_date[:4]), - month=int(c.end_date[4:6]), - day=int(c.end_date[6:])) - - # set time period for one single retrieval - datechunk = datetime.timedelta(days=int(c.date_chunk)) + start = datetime.strptime(c.start_date, '%Y%m%d') + end = datetime.strptime(c.end_date, '%Y%m%d') + # time period for one single retrieval + datechunk = timedelta(days=int(c.date_chunk)) if c.basetime == '00': - start = startm1 + start = start - timedelta(days=1) + + if c.maxstep <= 24: + startm1 = start - timedelta(days=1) if c.basetime == '00' or c.basetime == '12': - # endp1 = end + datetime.timedelta(days=1) + # endp1 = end + timedelta(days=1) endp1 = end else: - # endp1 = end + datetime.timedelta(days=2) - endp1 = end + datetime.timedelta(days=1) + # endp1 = end + timedelta(days=2) + endp1 = end + timedelta(days=1) # -------------- flux data ------------------------------------------------ if c.request == 0 or c.request == 2: @@ -242,7 +237,7 @@ def do_retrievement(c, server, start, end, delta_t, fluxes=False): # since actual day also counts as one day, # we only need to add datechunk - 1 days to retrieval # for a period - delta_t_m1 = delta_t - datetime.timedelta(days=1) + delta_t_m1 = delta_t - timedelta(days=1) day = start while day <= end: diff --git a/source/python/mods/prepare_flexpart.py b/source/python/mods/prepare_flexpart.py index 9015ec1..ade11a8 100755 --- a/source/python/mods/prepare_flexpart.py +++ b/source/python/mods/prepare_flexpart.py @@ -58,8 +58,10 @@ import sys import socket # software specific classes and modules from flex_extract +sys.path.append('../') import _config from classes.UioFiles import UioFiles +from classes.ControlFile import ControlFile from tools import clean_up, get_cmdline_arguments, read_ecenv from classes.EcFlexpart import EcFlexpart diff --git a/source/python/mods/tools.py b/source/python/mods/tools.py index 17c7402..cda9c15 100644 --- a/source/python/mods/tools.py +++ b/source/python/mods/tools.py @@ -59,6 +59,45 @@ from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter # FUNCTIONS # ------------------------------------------------------------------------------ +def none_or_str(value): + ''' + @Description: + Converts the input string into pythons None-type if the string + contains "None". + + @Input: + value: string + String to be checked for the "None" word. + + @Return: + None or value: + Return depends on the content of the input value. If it was "None", + then the python type None is returned. Otherwise the string itself. + ''' + if value == 'None': + return None + return value + +def none_or_int(value): + ''' + @Description: + Converts the input string into pythons None-type if the string + contains "None". Otherwise it is converted to an integer value. + + @Input: + value: string + String to be checked for the "None" word. + + @Return: + None or int(value): + Return depends on the content of the input value. If it was "None", + then the python type None is returned. Otherwise the string is + converted into an integer value. + ''' + if value == 'None': + return None + return int(value) + def get_cmdline_arguments(): ''' @Description: @@ -78,54 +117,69 @@ def get_cmdline_arguments(): formatter_class=ArgumentDefaultsHelpFormatter) # the most important arguments - parser.add_argument("--start_date", dest="start_date", default=None, + parser.add_argument("--start_date", dest="start_date", + type=none_or_str, default=None, help="start date YYYYMMDD") - parser.add_argument("--end_date", dest="end_date", default=None, + parser.add_argument("--end_date", dest="end_date", + type=none_or_str, default=None, help="end_date YYYYMMDD") - parser.add_argument("--date_chunk", dest="date_chunk", default=None, + parser.add_argument("--date_chunk", dest="date_chunk", + type=none_or_int, default=None, help="# of days to be retrieved at once") + parser.add_argument("--controlfile", dest="controlfile", + type=none_or_str, default='CONTROL.temp', + help="file with CONTROL parameters") + + # parameter for extra output information + parser.add_argument("--debug", dest="debug", + type=none_or_int, default=None, + help="debug mode - leave temporary files intact") + parser.add_argument("--request", dest="request", + type=none_or_int, default=None, + help="list all mars request in file mars_requests.dat \ + and skip submission to mars") # some arguments that override the default in the CONTROL file - parser.add_argument("--basetime", dest="basetime", default=None, - help="base such as 00/12 (for half day retrievals)") - parser.add_argument("--step", dest="step", default=None, + parser.add_argument("--basetime", dest="basetime", + type=none_or_int, default=None, + help="base such as 00 or 12 (for half day retrievals)") + parser.add_argument("--step", dest="step", + type=none_or_str, default=None, help="steps such as 00/to/48") - parser.add_argument("--levelist", dest="levelist", default=None, + parser.add_argument("--levelist", dest="levelist", + type=none_or_str, default=None, help="Vertical levels to be retrieved, e.g. 30/to/60") - parser.add_argument("--area", dest="area", default=None, + parser.add_argument("--area", dest="area", + type=none_or_str, default=None, help="area defined as north/west/south/east") # set the working directories - parser.add_argument("--inputdir", dest="inputdir", default=None, + parser.add_argument("--inputdir", dest="inputdir", + type=none_or_str, default=None, help="root directory for storing intermediate files") - parser.add_argument("--outputdir", dest="outputdir", default=None, + parser.add_argument("--outputdir", dest="outputdir", + type=none_or_str, default=None, help="root directory for storing output files") parser.add_argument("--flexpart_root_scripts", dest="flexpart_root_scripts", - default=None, + type=none_or_str, default=None, help="FLEXPART root directory (to find grib2flexpart \ and COMMAND file)\n Normally flex_extract resides in \ the scripts directory of the FLEXPART distribution") # this is only used by prepare_flexpart.py to rerun a postprocessing step - parser.add_argument("--ppid", dest="ppid", default=None, + parser.add_argument("--ppid", dest="ppid", + type=none_or_int, default=None, help="specify parent process id for \ rerun of prepare_flexpart") # arguments for job submission to ECMWF, only needed by submit.py parser.add_argument("--job_template", dest='job_template', - default="job.temp", + type=none_or_str, default="job.temp", help="job template file for submission to ECMWF") - parser.add_argument("--queue", dest="queue", default=None, + parser.add_argument("--queue", dest="queue", + type=none_or_str, default=None, help="queue for submission to ECMWF \ (e.g. ecgate or cca )") - parser.add_argument("--controlfile", dest="controlfile", - default='CONTROL.temp', - help="file with CONTROL parameters") - parser.add_argument("--debug", dest="debug", default=None, - help="debug mode - leave temporary files intact") - parser.add_argument("--request", dest="request", default=None, - help="list all mars request in file mars_requests.dat \ - and skip submission to mars") args = parser.parse_args() -- GitLab