Skip to content
Snippets Groups Projects
Commit 45b99e6b authored by Anne Philipp's avatar Anne Philipp
Browse files

added possibility to extract ensemble members with ELDA stream (and ENFO)

parent ae2756e3
No related branches found
No related tags found
No related merge requests found
......@@ -66,7 +66,8 @@ import numpy as np
from eccodes import (codes_index_select, codes_new_from_index, codes_get,
codes_get_values, codes_set_values, codes_set,
codes_write, codes_release, codes_new_from_index,
codes_index_release, codes_index_get)
codes_index_release, codes_index_get, codes_get_array,
codes_set_array, codes_grib_new_from_file)
# software specific classes and modules from flex_extract
sys.path.append('../')
......@@ -76,6 +77,7 @@ from mods.tools import (init128, to_param_id, silent_remove, product,
my_error, make_dir, get_informations, get_dimensions,
execute_subprocess)
from MarsRetrieval import MarsRetrieval
from UioFiles import UioFiles
import mods.disaggregation as disaggregation
# ------------------------------------------------------------------------------
......@@ -662,10 +664,8 @@ class EcFlexpart(object):
'param':None}
for ftype in self.types:
# fk contains field types such as
# ftype contains field types such as
# [AN, FC, PF, CV]
# fv contains all of the items of the belonging key
# [times, steps]
for pk, pv in self.params.iteritems():
# pk contains one of these keys of params
# [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
......@@ -706,6 +706,23 @@ class EcFlexpart(object):
if pk == 'GG__SL' and pv[0] == 'Q':
retr_param_dict['area'] = ""
retr_param_dict['gaussian'] = 'reduced'
if ftype.upper() == 'FC' and \
'acc' not in retr_param_dict['target']:
if (int(retr_param_dict['time'][0]) +
int(retr_param_dict['step'][0])) > 23:
dates = retr_param_dict['date'].split('/')
sdate = datetime.strptime(dates[0], '%Y%m%d%H')
sdate = sdate - timedelta(days=1)
retr_param_dict['date'] = '/'.join(
[sdate.strftime("%Y%m%d")] +
retr_param_dict['date'][1:])
print('CHANGED FC start date to ' +
sdate.strftime("%Y%m%d") +
' to accomodate TIME=' +
retr_param_dict['time'][0] +
', STEP=' +
retr_param_dict['time'][0])
# ------ on demand path --------------------------------------------------
if self.basetime is None:
......@@ -759,21 +776,14 @@ class EcFlexpart(object):
'acc' not in pk ) :
times = retr_param_dict['time'].split('/')
steps = retr_param_dict['step'].split('/')
print 'times', times, int(times[0]), times[1:]
print 'steps', steps, int(steps[0])
while int(times[0]) + int(steps[0]) <= 12:
print 'HELLO'
times = times[1:]
print 'in while 1 ', times
if len(times) > 1:
retr_param_dict['time'] = '/'.join(times)
else:
retr_param_dict['time'] = times[0]
print 'in while 2 ', times
print retr_param_dict['time']
if (pk != 'OG_OROLSM__SL' and
int(retr_param_dict['step'].split('/')[0]) == 0 and
int(timesave.split('/')[0]) == 0):
......@@ -915,6 +925,10 @@ class EcFlexpart(object):
# get the values of the keys which are used for distinct access
# of grib messages via product
if '/' in self.number:
# more than one ensemble member is selected
index_keys = ["number", "date", "time", "step"]
else:
index_keys = ["date", "time", "step"]
iid, index_vals = self._mk_index_values(c.inputdir,
inputfiles,
......@@ -990,23 +1004,35 @@ class EcFlexpart(object):
else:
t_enddate = t_date + timedelta(2*int(c.dtime))
# if necessary, add ensemble member number to filename suffix
# otherwise, add empty string
if 'number' in index_keys:
index_number = index_keys.index('number')
if len(index_vals[index_number]) > 1:
numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
else:
numbersuffix = ''
if c.purefc:
fnout = os.path.join(c.inputdir, 'flux' +
t_date.strftime('%Y%m%d.%H') +
'.{:0>3}'.format(step-2*int(c.dtime)))
'.{:0>3}'.format(step-2*int(c.dtime)) +
numbersuffix)
gnout = os.path.join(c.inputdir, 'flux' +
t_date.strftime('%Y%m%d.%H') +
'.{:0>3}'.format(step-int(c.dtime)))
'.{:0>3}'.format(step-int(c.dtime)) +
numbersuffix)
hnout = os.path.join(c.inputdir, 'flux' +
t_date.strftime('%Y%m%d.%H') +
'.{:0>3}'.format(step))
'.{:0>3}'.format(step) +
numbersuffix)
else:
fnout = os.path.join(c.inputdir, 'flux' +
t_m2dt.strftime('%Y%m%d%H'))
t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
gnout = os.path.join(c.inputdir, 'flux' +
t_m1dt.strftime('%Y%m%d%H'))
t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
hnout = os.path.join(c.inputdir, 'flux' +
t_dt.strftime('%Y%m%d%H'))
t_dt.strftime('%Y%m%d%H') + numbersuffix)
print("outputfile = " + fnout)
f_handle = open(fnout, 'w')
......@@ -1369,6 +1395,10 @@ class EcFlexpart(object):
# get the values of the keys which are used for distinct access
# of grib messages via product
if '/' in self.number:
# more than one ensemble member is selected
index_keys = ["number", "date", "time", "step"]
else:
index_keys = ["date", "time", "step"]
iid, index_vals = self._mk_index_values(c.inputdir,
inputfiles,
......@@ -1523,6 +1553,13 @@ class EcFlexpart(object):
suffix = cdate[2:8] + '.' + ctime + '.' + cstep
else:
suffix = cdate_hour[2:10]
# if necessary, add ensemble member number to filename suffix
if 'number' in index_keys:
index_number = index_keys.index('number')
if len(index_vals[index_number]) > 1:
suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
fnout = os.path.join(c.inputdir, c.prefix + suffix)
print("outputfile = " + fnout)
# collect for final processing
......@@ -1556,6 +1593,67 @@ class EcFlexpart(object):
return
def calc_extra_elda(self, path, prefix):
''' Calculates extra ensemble members for ELDA - Stream.
Parameters
----------
path : str
Path to the output files.
prefix : str
The prefix of the output filenames as defined in Control file.
Return
------
'''
# max number
maxnum = int(self.number.split('/')[-1])
# get a list of all prepared output files with control forecast (CF)
CF_filelist = UioFiles(path, prefix + '*.N000')
for cffile in CF_filelist.files:
with open(cffile, 'rb') as f:
cfvalues=[]
while True:
fid = codes_grib_new_from_file(f)
if fid is None:
break
cfvalues.append(codes_get_array(fid, 'values'))
codes_release(fid)
filename = cffile.split('N000')[0]
for i in range(1, maxnum+1): # max number nehmen
# read an ensemble member
g = open(filename + 'N{:0>3}'.format(i), 'rb')
# create file for newly calculated ensemble member
h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
# number of message in grib file
j = 0
while True:
gid = codes_grib_new_from_file(g)
if gid is None:
break
values = codes_get_array(gid, 'values')
codes_set_array(gid, 'values',
values-2*(values-cfvalues[j]))
codes_set(gid, 'number', i+maxnum)
codes_write(gid, h)
codes_release(gid)
j += 1
g.close()
h.close()
print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
self.outputfilelist.append(
os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
return
def process_output(self, c):
'''Postprocessing of FLEXPART input files.
......
......@@ -208,6 +208,15 @@ def mk_server(c):
return server
def check_dates_for_nonflux_fc_times(types, times):
'''
'''
for ty, ti in zip(types,times):
if ty.upper() == 'FC' and int(ti) == 18:
return True
return False
def mk_dates(c, fluxes):
'''Prepares start and end date depending on flux or non flux data.
......@@ -258,6 +267,11 @@ def mk_dates(c, fluxes):
start = start - timedelta(days=1)
end = end + timedelta(days=1)
# if we have non-flux forecast data starting at 18 UTC
# we need to start retrieving data one day in advance
if not fluxes and check_dates_for_nonflux_fc_times(c.type, c.time):
start = start - timedelta(days=1)
return start, end, chunk
def remove_old(pattern, inputdir):
......
......@@ -176,6 +176,8 @@ def prepare_flexpart(ppid, c):
# copy/transfer/interpolate them or make them GRIB2
flexpart = EcFlexpart(c, fluxes=False)
flexpart.create(inputfiles, c)
if c.stream.lower() == 'elda':
flexpart.calc_extra_elda(os.path.join(c.inputdir, c.prefix))
flexpart.process_output(c)
# make use of a possible conversion to a
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment