From d4696e0eebad213794352c454f011ba6dc050ddc Mon Sep 17 00:00:00 2001 From: Anne Philipp <anne.philipp@univie.ac.at> Date: Fri, 18 Jan 2019 23:50:19 +0100 Subject: [PATCH] bugfix retrievement with basetime parameter --- run/jobscripts/joboper.ksh | 145 ++++++++++++++++++++++ source/python/classes/ControlFile.py | 18 ++- source/python/classes/EcFlexpart.py | 159 ++++++++++++------------- source/python/mods/checks.py | 48 ++++---- source/python/mods/get_mars_data.py | 7 +- source/python/mods/prepare_flexpart.py | 2 +- source/python/mods/tools.py | 6 +- source/python/submit.py | 4 +- 8 files changed, 273 insertions(+), 116 deletions(-) create mode 100644 run/jobscripts/joboper.ksh diff --git a/run/jobscripts/joboper.ksh b/run/jobscripts/joboper.ksh new file mode 100644 index 0000000..4136f98 --- /dev/null +++ b/run/jobscripts/joboper.ksh @@ -0,0 +1,145 @@ +#!/bin/ksh + +# ON ECGB: +# start with ecaccess-job-submit -queueName ecgb NAME_OF_THIS_FILE on gateway server +# start with sbatch NAME_OF_THIS_FILE directly on machine + +#SBATCH --workdir=/scratch/ms/at/km4a +#SBATCH --qos=normal +#SBATCH --job-name=flex_ecmwf +#SBATCH --output=flex_ecmwf.%j.out +#SBATCH --error=flex_ecmwf.%j.out +#SBATCH --mail-type=FAIL +#SBATCH --time=12:00:00 + +## CRAY specific batch requests +##PBS -N flex_ecmwf +##PBS -q np +##PBS -S /usr/bin/ksh +## -o /scratch/ms/at/km4a/flex_ecmwf.${PBS_JOBID}.out +## job output is in .ecaccess_DO_NOT_REMOVE +##PBS -j oe +##PBS -V +##PBS -l EC_threads_per_task=24 +##PBS -l EC_memory_per_task=32000MB + +set -x +export VERSION=7.1 +case ${HOST} in + *ecg*) + module load python + module unload grib_api + module unload eccodes + module load eccodes + module unload emos + module load emos/455-r64 + export PATH=${PATH}:${HOME}/flex_extract_v7.1/source/python + ;; + *cca*) + module switch PrgEnv-cray PrgEnv-intel + module load eccodes + module load emos + module load python + export SCRATCH=${TMPDIR} + export PATH=${PATH}:${HOME}/flex_extract_v7.1/source/python + ;; +esac + +cd ${SCRATCH} +mkdir -p python$$ +cd python$$ + +export CONTROL=CONTROL + +cat >${CONTROL}<<EOF +accmaxstep 12 +acctime 00/12 +acctype FC +accuracy 24 +addpar /186/187/188/235/139/39 +area 90.0/-179.0/-90.0/180.0 +base_time ${MSJ_BASETIME} +basetime 0 +controlfile CONTROL_OPS +cwc 1 +dataset None +date_chunk 3 +debug 1 +destination annep@genericSftp +dpdeta 1 +dtime 3 +ecapi None +ecfsdir ectmp:/${USER}/ecops +ecgid at +ecstorage 0 +ectrans 1 +ecuid km4a +end_date ${MSJ_YEAR}${MSJ_MONTH}${MSJ_DAY} +eta 1 +etadiff 0 +etapar 77 +expver 1 +format GRIB2 +gateway srvx8.img.univie.ac.at +gauss 0 +gaussian +grib2flexpart 0 +grid 1.0/1.0 +inputdir /raid60/nas/tmc/Anne/Interpolation/flexextract/flex_extract_v7.1/run/workspace +install_target None +job_chunk 3 +job_template job.temp +left -179000 +level 137 +levelist 130/to/137 +logicals gauss omega omegadiff eta etadiff dpdeta cwc wrf grib2flexpart ecstorage ectrans debug request public purefc rrint +lower -90000 +mailfail ${USER} +mailops ${USER} +makefile Makefile.gfortran +marsclass OD +maxstep 11 +number OFF +omega 0 +omegadiff 0 +outputdir /raid60/nas/tmc/Anne/Interpolation/flexextract/flex_extract_v7.1/run/workspace +prefix EN +public 0 +purefc 0 +queue ecgate +request 2 +resol 159 +right 180000 +rrint 0 +smooth 0 +start_date ${MSJ_YEAR}${MSJ_MONTH}${MSJ_DAY} +step 00 01 02 03 04 05 06 07 08 09 10 11 00 01 02 03 04 05 06 07 08 09 10 11 +stream OPER +time 00 00 00 00 00 00 00 00 00 00 00 00 12 12 12 12 12 12 12 12 12 12 12 12 +type AN FC FC FC FC FC FC FC FC FC FC FC AN FC FC FC FC FC FC FC FC FC FC FC +upper 90000 +wrf 0 + +EOF + + +submit.py --controlfile=${CONTROL} --inputdir=./work --outputdir=./work 1> prot 2>&1 + +if [ $? -eq 0 ] ; then + l=0 + for muser in `grep -i MAILOPS ${CONTROL}`; do + if [ ${l} -gt 0 ] ; then + mail -s flex.${HOST}.$$ ${muser} <prot + fi + l=$((${l}+1)) + done +else + l=0 + for muser in `grep -i MAILFAIL ${CONTROL}`; do + if [ ${l} -gt 0 ] ; then + mail -s "ERROR! flex.${HOST}.$$" ${muser} <prot + fi + l=$((${l}+1)) + done +fi + diff --git a/source/python/classes/ControlFile.py b/source/python/classes/ControlFile.py index c3a55fa..b625299 100644 --- a/source/python/classes/ControlFile.py +++ b/source/python/classes/ControlFile.py @@ -92,7 +92,7 @@ class ControlFile(object): dtime :str The time step in hours. Default value is None. - basetime : str + basetime : int The time for a half day retrieval. The 12 hours upfront are to be retrieved. Default value is None. @@ -302,6 +302,11 @@ class ControlFile(object): to delete all temporary files except the final output files (0). Default value is 0. + oper : int + Switch to prepare the operational job script. Start date, end date and + basetime will be prepared with environment variables. + Default value is 0. + request : int Switch to select between just retrieving the data (0), writing the mars parameter values to a csv file (1) or doing both (2). @@ -408,6 +413,7 @@ class ControlFile(object): self.ecgid = None self.install_target = None self.debug = 0 + self.oper = 0 self.request = 0 self.public = 0 self.ecapi = None @@ -416,8 +422,8 @@ class ControlFile(object): self.logicals = ['gauss', 'omega', 'omegadiff', 'eta', 'etadiff', 'dpdeta', 'cwc', 'wrf', 'grib2flexpart', 'ecstorage', - 'ectrans', 'debug', 'request', 'public', 'purefc', - 'rrint'] + 'ectrans', 'debug', 'oper', 'request', 'public', + 'purefc', 'rrint'] self._read_controlfile() @@ -598,7 +604,7 @@ class ControlFile(object): self.start_date, self.end_date = check_dates(self.start_date, self.end_date) - check_basetime(self.basetime) + self.basetime = check_basetime(self.basetime) self.levelist, self.level = check_levels(self.levelist, self.level) @@ -623,9 +629,9 @@ class ControlFile(object): self.acctype = check_acctype(self.acctype, self.type) - self.acctime = check_acctime(self.acctime, self.acctype, self.purefc) + self.acctime = check_acctime(self.acctime, self.marsclass, self.purefc) - self.accmaxstep = check_accmaxstep(self.accmaxstep, self.acctype, + self.accmaxstep = check_accmaxstep(self.accmaxstep, self.marsclass, self.purefc, self.maxstep) self.purefc = check_purefc(self.type) diff --git a/source/python/classes/EcFlexpart.py b/source/python/classes/EcFlexpart.py index 70f4fd8..5d0e32b 100644 --- a/source/python/classes/EcFlexpart.py +++ b/source/python/classes/EcFlexpart.py @@ -105,7 +105,7 @@ class EcFlexpart(object): dataset which has to be used to characterize the type of data. - basetime : str + basetime : int The time for a half day retrieval. The 12 hours upfront are to be retrieved. @@ -275,10 +275,10 @@ class EcFlexpart(object): ''' i = 0 for ty, st, ti in zip(ftype, fstep, ftime): - btlist = range(24) - if self.basetime == '12': + btlist = range(len(ftime)) + if self.basetime == 12: btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] - if self.basetime == '00': + if self.basetime == 0: btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0] # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or @@ -286,7 +286,7 @@ class EcFlexpart(object): # (int(c.step[i]) % int(c.dtime) == 0)) ) and \ # (int(c.time[i]) in btlist or c.purefc): - if (int(ti) in btlist) or self.purefc: + if (i in btlist) or self.purefc: if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)): @@ -304,6 +304,7 @@ class EcFlexpart(object): self.types[ty]['steps'] += '/' self.types[ty]['steps'] += st i += 1 + return def _create_field_types_fluxes(self): @@ -707,7 +708,7 @@ class EcFlexpart(object): retr_param_dict['gaussian'] = 'reduced' # ------ on demand path -------------------------------------------------- - if not self.basetime: + if self.basetime is None: # ******* start retrievement self._start_retrievement(request, retr_param_dict) # ------ operational path ------------------------------------------------ @@ -717,20 +718,12 @@ class EcFlexpart(object): # be accessible with user's credentials enddate = retr_param_dict['date'].split('/')[-1] - elimit = datetime.strptime(enddate + self.basetime, + elimit = datetime.strptime(enddate + str(self.basetime), '%Y%m%d%H') - if self.basetime == '12': + if self.basetime == 12: # -------------- flux data ---------------------------- if 'acc' in pk: - - # Strategy: - # if maxtime-elimit >= 24h reduce date by 1, - # if 12h <= maxtime-elimit<12h reduce time for last date - # if maxtime-elimit<12h reduce step for last time - # A split of the MARS job into 2 is likely necessary. - - startdate = retr_param_dict['date'].split('/')[0] enddate = datetime.strftime(elimit - t24h,'%Y%m%d') retr_param_dict['date'] = '/'.join([startdate, @@ -755,27 +748,31 @@ class EcFlexpart(object): # ******* start retrievement self._start_retrievement(request, retr_param_dict) - else: # basetime = 0 + elif self.basetime == 0: retr_param_dict['date'] = \ datetime.strftime(elimit - t24h, '%Y%m%d') timesave = ''.join(retr_param_dict['time']) - if '/' in retr_param_dict['time']: + if ('/' in retr_param_dict['time'] and + pk != 'OG_OROLSM__SL' and + 'acc' not in pk ) : times = retr_param_dict['time'].split('/') steps = retr_param_dict['step'].split('/') - while (pk != 'OG_OROLSM__SL' and - 'acc' not in pk and - (int(times[0]) + int(steps[0])) <= 12): + print 'times', times, int(times[0]), times[1:] + print 'steps', steps, int(steps[0]) + while int(times[0]) + int(steps[0]) <= 12: + print 'HELLO' times = times[1:] + print 'in while 1 ', times - if len(times) > 1: - retr_param_dict['time'] = '/'.join(times) - else: - retr_param_dict['time'] = times[0] + if len(times) > 1: + retr_param_dict['time'] = '/'.join(times) + else: + retr_param_dict['time'] = times[0] - # ******* start retrievement - self._start_retrievement(request, retr_param_dict) + print 'in while 2 ', times + print retr_param_dict['time'] if (pk != 'OG_OROLSM__SL' and int(retr_param_dict['step'].split('/')[0]) == 0 and @@ -789,8 +786,11 @@ class EcFlexpart(object): self._mk_targetname(ftype, pk, retr_param_dict['date']) - # ******* start retrievement - self._start_retrievement(request, retr_param_dict) + # ******* start retrievement + self._start_retrievement(request, retr_param_dict) + else: + raise ValueError('ERROR: Basetime has an invalid value ' + '-> {}'.format(str(basetime))) if request == 0 or request == 2: print('MARS retrieve done ... ') @@ -984,7 +984,11 @@ class EcFlexpart(object): t_dt = t_date + timedelta(hours=step) t_m1dt = t_date + timedelta(hours=step-int(c.dtime)) t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime)) - t_enddate = None + if c.basetime is not None: + t_enddate = datetime.strptime(c.end_date + str(c.basetime), + '%Y%m%d%H') + else: + t_enddate = t_date + timedelta(2*int(c.dtime)) if c.purefc: fnout = os.path.join(c.inputdir, 'flux' + @@ -1106,53 +1110,47 @@ class EcFlexpart(object): codes_write(gid, f_handle) - if c.basetime: - t_enddate = datetime.strptime(c.end_date + c.basetime, - '%Y%m%d%H') - else: - t_enddate = t_date + timedelta(2*int(c.dtime)) - - # squeeze out information of last two steps - # contained in deac_vals[parId] - # Note that deac_vals[parId][0] has not been popped - # in this case - - if step == c.maxstep and c.purefc or \ - t_dt == t_enddate: - # last step - if c.purefc: - values = deac_vals[parId][3] - codes_set_values(gid, values) - codes_set(gid, 'stepRange', step) - #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime)) - codes_write(gid, h_handle) - else: - values = deac_vals[parId][3] - codes_set_values(gid, values) - codes_set(gid, 'stepRange', 0) - truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime)) - codes_set(gid, 'time', truedatetime.hour * 100) - codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d'))) - codes_write(gid, h_handle) - - if parId == 142 or parId == 143: - values = disaggregation.darain(list(reversed(deac_vals[parId]))) - else: - values = disaggregation.dapoly(list(reversed(deac_vals[parId]))) - - # step before last step - if c.purefc: - codes_set(gid, 'stepRange', step-int(c.dtime)) - #truedatetime = t_m2dt + timedelta(hours=int(c.dtime)) - codes_set_values(gid, values) - codes_write(gid, g_handle) - else: - codes_set(gid, 'stepRange', 0) - truedatetime = t_m2dt + timedelta(hours=int(c.dtime)) - codes_set(gid, 'time', truedatetime.hour * 100) - codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d'))) - codes_set_values(gid, values) - codes_write(gid, g_handle) + # squeeze out information of last two steps + # contained in deac_vals[parId] + # Note that deac_vals[parId][0] has not been popped + # in this case + + if step == c.maxstep and c.purefc or \ + t_dt == t_enddate: + # last step + if c.purefc: + values = deac_vals[parId][3] + codes_set_values(gid, values) + codes_set(gid, 'stepRange', step) + #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime)) + codes_write(gid, h_handle) + else: + values = deac_vals[parId][3] + codes_set_values(gid, values) + codes_set(gid, 'stepRange', 0) + truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime)) + codes_set(gid, 'time', truedatetime.hour * 100) + codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d'))) + codes_write(gid, h_handle) + + if parId == 142 or parId == 143: + values = disaggregation.darain(list(reversed(deac_vals[parId]))) + else: + values = disaggregation.dapoly(list(reversed(deac_vals[parId]))) + + # step before last step + if c.purefc: + codes_set(gid, 'stepRange', step-int(c.dtime)) + #truedatetime = t_m2dt + timedelta(hours=int(c.dtime)) + codes_set_values(gid, values) + codes_write(gid, g_handle) + else: + codes_set(gid, 'stepRange', 0) + truedatetime = t_m2dt + timedelta(hours=int(c.dtime)) + codes_set(gid, 'time', truedatetime.hour * 100) + codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d'))) + codes_set_values(gid, values) + codes_write(gid, g_handle) codes_release(gid) @@ -1416,10 +1414,11 @@ class EcFlexpart(object): # if the timestamp is out of basetime start/end date period, # skip this specific product - if c.basetime: - start_time = datetime.strptime(c.end_date + c.basetime, + if c.basetime is not None: + time_delta = timedelta(hours=12-int(c.dtime)) + start_time = datetime.strptime(c.end_date + str(c.basetime), '%Y%m%d%H') - time_delta - end_time = datetime.strptime(c.end_date + c.basetime, + end_time = datetime.strptime(c.end_date + str(c.basetime), '%Y%m%d%H') if timestamp < start_time or timestamp > end_time: continue diff --git a/source/python/mods/checks.py b/source/python/mods/checks.py index e57259a..3aa50f3 100644 --- a/source/python/mods/checks.py +++ b/source/python/mods/checks.py @@ -586,19 +586,22 @@ def check_basetime(basetime): Parameters ---------- - basetime : str + basetime : int or str or None The time for a half day retrieval. The 12 hours upfront are to be retrieved. Return ------ - + basetime : int or None + The time for a half day retrieval. The 12 hours upfront are to be + retrieved. ''' - if basetime: - if int(basetime) != 0 and int(basetime) != 12: + if basetime is not None: + basetime = int(basetime) + if basetime != 0 and basetime != 12: raise ValueError('ERROR: Basetime has an invalid value ' '-> {}'.format(str(basetime))) - return + return basetime def check_request(request, marsfile): '''Check if there is an old mars request file and remove it. @@ -666,10 +669,10 @@ def check_acctype(acctype, ftype): print('... Control parameter ACCTYPE was not defined.') try: if len(ftype) == 1 and ftype[0] != 'AN': - print('Use same field type as for the non-flux fields.') + print('... Use same field type as for the non-flux fields.') acctype = ftype[0] elif len(ftype) > 1 and ftype[1] != 'AN': - print('Use old setting by using TYPE[1] for flux forecast!') + print('... Use old setting by using TYPE[1] for flux forecast!') acctype = ftype[1] except: raise ValueError('ERROR: Accumulation field type could not be set!') @@ -680,10 +683,10 @@ def check_acctype(acctype, ftype): return acctype -def check_acctime(acctime, acctype, purefc): +def check_acctime(acctime, marsclass, purefc): '''Guarantees that the accumulation forecast times were set. - If it is not set, it is tried to set the value fore some of the + If it is not set, it tries to set the value for some of the most commonly used data sets. Otherwise it raises an error. Parameters @@ -691,8 +694,8 @@ def check_acctime(acctime, acctype, purefc): acctime : str The starting time from the accumulated forecasts. - acctype : str - The field type for the accumulated forecast fields. + marsclass : str + ECMWF data classification identifier. purefc : int Switch for definition of pure forecast mode or not. @@ -702,24 +705,25 @@ def check_acctime(acctime, acctype, purefc): acctime : str The starting time from the accumulated forecasts. ''' + if not acctime: print('... Control parameter ACCTIME was not defined.') print('... Value will be set depending on field type:\n ' - '\t\t EA=06/18\n\t\t EI/OD=00/12\n\t\t EP=18') - if acctype.upper() == 'EA': # Era 5 + '\t\t EA=06/18\n\t\t EI/OD=00/12\n\t\t EP=18') + if marsclass.upper() == 'EA': # Era 5 acctime = '06/18' - elif acctype.upper() == 'EI': # Era-Interim + elif marsclass.upper() == 'EI': # Era-Interim acctime = '00/12' - elif acctype.upper() == 'EP': # CERA + elif marsclass.upper() == 'EP': # CERA acctime = '18' - elif acctype.upper() == 'OD' and not purefc: # On-demand operational + elif marsclass.upper() == 'OD' and not purefc: # On-demand acctime = '00/12' else: raise ValueError('ERROR: Accumulation forecast time can not ' 'automatically be derived!') return acctime -def check_accmaxstep(accmaxstep, acctype, purefc, maxstep): +def check_accmaxstep(accmaxstep, marsclass, purefc, maxstep): '''Guarantees that the accumulation forecast step were set. Parameters @@ -727,8 +731,8 @@ def check_accmaxstep(accmaxstep, acctype, purefc, maxstep): accmaxstep : str The maximum forecast step for the accumulated forecast fields. - acctype : str - The field type for the accumulated forecast fields. + marsclass : str + ECMWF data classification identifier. purefc : int Switch for definition of pure forecast mode or not. @@ -745,11 +749,11 @@ def check_accmaxstep(accmaxstep, acctype, purefc, maxstep): if not accmaxstep: print('... Control parameter ACCMAXSTEP was not defined.') print('... Value will be set depending on field type/time: ' - '\t\t EA/EI/OD=12\n\t\t EP=24') - if acctype.upper() in ['EA', 'EI', 'OD'] and not purefc: + '\n\t\t EA/EI/OD=12\n\t\t EP=24') + if marsclass.upper() in ['EA', 'EI', 'OD'] and not purefc: # Era 5, Era-Interim, On-demand operational accmaxstep = '12' - elif acctype.upper() == 'EP': # CERA + elif marsclass.upper() == 'EP': # CERA accmaxstep = '18' elif purefc and accmaxstep != maxstep: accmaxstep = maxstep diff --git a/source/python/mods/get_mars_data.py b/source/python/mods/get_mars_data.py index b577abc..4601787 100755 --- a/source/python/mods/get_mars_data.py +++ b/source/python/mods/get_mars_data.py @@ -251,11 +251,10 @@ def mk_dates(c, fluxes): end = datetime.strptime(c.end_date, '%Y%m%d') chunk = timedelta(days=int(c.date_chunk)) - if c.basetime: - if c.basetime == '00': - start = start - timedelta(days=1) + if c.basetime == 0: + start = start - timedelta(days=1) - if not c.purefc and fluxes: + if not c.purefc and fluxes and not c.basetime == 0: start = start - timedelta(days=1) end = end + timedelta(days=1) diff --git a/source/python/mods/prepare_flexpart.py b/source/python/mods/prepare_flexpart.py index 45e3256..26a4aeb 100755 --- a/source/python/mods/prepare_flexpart.py +++ b/source/python/mods/prepare_flexpart.py @@ -151,7 +151,7 @@ def prepare_flexpart(ppid, c): # assign starting date minus 1 day # since we need the 12 hours upfront # (the day before from 12 UTC to current day 00 UTC) - if c.basetime == '00': + if c.basetime == 0: start = start - datetime.timedelta(days=1) print('Prepare ' + start.strftime("%Y%m%d") + diff --git a/source/python/mods/tools.py b/source/python/mods/tools.py index 1a755f8..4bb6e86 100644 --- a/source/python/mods/tools.py +++ b/source/python/mods/tools.py @@ -146,7 +146,7 @@ def get_cmdline_args(): help="file with CONTROL parameters") parser.add_argument("--basetime", dest="basetime", type=none_or_int, default=None, - help="base such as 00 or 12 (for half day retrievals)") + help="base such as 0 or 12 (for half day retrievals)") parser.add_argument("--step", dest="step", type=none_or_str, default=None, help="steps such as 00/to/48") @@ -161,6 +161,10 @@ def get_cmdline_args(): parser.add_argument("--debug", dest="debug", type=none_or_int, default=None, help="debug mode - leave temporary files intact") + parser.add_argument("--oper", dest="oper", + type=none_or_int, default=None, + help="operational mode - prepares dates with \ + environment variables") parser.add_argument("--request", dest="request", type=none_or_int, default=None, help="list all mars requests in file mars_requests.dat") diff --git a/source/python/submit.py b/source/python/submit.py index 9427b94..544f8a9 100755 --- a/source/python/submit.py +++ b/source/python/submit.py @@ -145,7 +145,7 @@ def submit(jtemplate, c, queue): ''' - if not c.basetime: + if not c.oper: # --------- create on demand job script ------------------------------------ if c.purefc: print('---- Pure forecast mode! ----') @@ -195,7 +195,7 @@ def submit(jtemplate, c, queue): c.start_date = '${MSJ_YEAR}${MSJ_MONTH}${MSJ_DAY}' c.end_date = '${MSJ_YEAR}${MSJ_MONTH}${MSJ_DAY}' - c.base_time = '${MSJ_BASETIME}' + c.basetime = '${MSJ_BASETIME}' if c.maxstep > 24: c.time = '${MSJ_BASETIME} {MSJ_BASETIME}' -- GitLab