Skip to content
Snippets Groups Projects
Commit a073386c authored by lkugler's avatar lkugler
Browse files

superob feature

parent 861ac9b1
No related branches found
No related tags found
No related merge requests found
...@@ -7,7 +7,7 @@ class ExperimentConfiguration(object): ...@@ -7,7 +7,7 @@ class ExperimentConfiguration(object):
pass pass
exp = ExperimentConfiguration() exp = ExperimentConfiguration()
exp.expname = "exp_v1.19_wb-random_WV_superob2-12" exp.expname = "exp_v1.19_wb-random_WV_superob4-12"
exp.model_dx = 2000 exp.model_dx = 2000
exp.n_ens = 40 exp.n_ens = 40
exp.n_nodes = 10 exp.n_nodes = 10
...@@ -25,7 +25,7 @@ exp.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/2021-05-04/ ...@@ -25,7 +25,7 @@ exp.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/2021-05-04/
exp.cov_loc_vert_km_horiz_km = (1, 30) exp.cov_loc_vert_km_horiz_km = (1, 30)
exp.superob_km = 12 exp.superob_km = 12
n_obs = 22801 #22801: 2km, 121: 30km, 256:16x16 (20km); 961: 10km resoltn # radar: n_obs for each observation height level n_obs = 5776 #5776: 4km, 121: 30km, 256:16x16 (20km); 961: 10km resoltn # radar: n_obs for each observation height level
vis = dict(plotname='VIS 0.6µm', plotunits='[1]', vis = dict(plotname='VIS 0.6µm', plotunits='[1]',
kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs, kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs,
......
...@@ -55,9 +55,7 @@ def read_prior_obs(f_obs_prior): ...@@ -55,9 +55,7 @@ def read_prior_obs(f_obs_prior):
for j in range(5, 5 + exp.n_ens): for j in range(5, 5 + exp.n_ens):
prior_ens.append(float(obsseq[i + j].strip())) prior_ens.append(float(obsseq[i + j].strip()))
OBSs.append( OBSs.append(dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens)))
dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens))
)
return OBSs return OBSs
...@@ -86,7 +84,7 @@ def replace_errors_obsseqout(f, new_errors): ...@@ -86,7 +84,7 @@ def replace_errors_obsseqout(f, new_errors):
obsseq = open(f, "r").readlines() obsseq = open(f, "r").readlines()
# find number of lines between two ' OBS ' lines: # find number of lines between two ' OBS ' lines:
first_obs = None first_obs = second_obs = None
for i, line in enumerate(obsseq): for i, line in enumerate(obsseq):
if " OBS " in line: if " OBS " in line:
if first_obs is None: if first_obs is None:
...@@ -94,10 +92,10 @@ def replace_errors_obsseqout(f, new_errors): ...@@ -94,10 +92,10 @@ def replace_errors_obsseqout(f, new_errors):
else: else:
second_obs = i second_obs = i
break break
if not second_obs:
raise RuntimeError('just one OBS in this file?! '+str(f))
lines_between = second_obs - first_obs lines_between = second_obs - first_obs
lines_obserr_after_obsnr = ( lines_obserr_after_obsnr = lines_between - 1 # obserr line is one before ' OBS ' line
lines_between - 1
) # obserr line is one before ' OBS ' line
# replace values in list obsseq # replace values in list obsseq
i_obs = 0 i_obs = 0
...@@ -135,9 +133,7 @@ def set_DART_nml(just_prior_values=False): ...@@ -135,9 +133,7 @@ def set_DART_nml(just_prior_values=False):
return vert_norm_rad return vert_norm_rad
list_obstypes = [obscfg["kind"] for obscfg in exp.observations] list_obstypes = [obscfg["kind"] for obscfg in exp.observations]
list_cov_loc_radius_km = [ list_cov_loc_radius_km = [obscfg["cov_loc_radius_km"] for obscfg in exp.observations]
obscfg["cov_loc_radius_km"] for obscfg in exp.observations
]
list_cov_loc_radian = [str(to_radian_horizontal(a)) for a in list_cov_loc_radius_km] list_cov_loc_radian = [str(to_radian_horizontal(a)) for a in list_cov_loc_radius_km]
if just_prior_values: if just_prior_values:
...@@ -172,39 +168,35 @@ def set_DART_nml(just_prior_values=False): ...@@ -172,39 +168,35 @@ def set_DART_nml(just_prior_values=False):
append_file(cluster.dartrundir + "/input.nml", rttov_nml) append_file(cluster.dartrundir + "/input.nml", rttov_nml)
def obs_operator_ensemble(): def get_Hx_truth_prior():
# assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01 # assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01
print("running obs operator on ensemble forecast") print("running obs operator on ensemble forecast")
os.chdir(cluster.dartrundir) os.chdir(cluster.dartrundir)
list_ensemble_truths = []
t = time_module.time() t = time_module.time()
for iens in range(1, exp.n_ens + 1): # set input.nml to calculate prior Hx
print("observation operator for ens #" + str(iens)) set_DART_nml(just_prior_values=True)
# ens members are already linked to advance_temp<i>/wrfout_d01
copy(
cluster.dartrundir + "/advance_temp" + str(iens) + "/wrfout_d01",
cluster.dartrundir + "/wrfout_d01",
)
# DART may need a wrfinput file as well, which serves as a template for dimension sizes
symlink(
cluster.dartrundir + "/wrfout_d01", cluster.dartrundir + "/wrfinput_d01"
)
# run perfect_model obs (forward operator) # run filter to calculate prior Hx
os.system("mpirun -np 12 ./perfect_model_obs > /dev/null") os.system("mpirun -np 12 ./filter &> log.filter.preassim")
# truth values in obs_seq.out are H(x) values # read prior Hx values from obs_seq.final
true, _ = read_truth_obs_obsseq(cluster.dartrundir + "/obs_seq.out") osf = obsseq.ObsSeq(cluster.dartrundir + '/obs_seq.final')
list_ensemble_truths.append(true)
if hasattr(exp, "superob_km"):
print("superobbing to", exp.superob_km, "km")
t = time_module.time()
osf.superob(window_km=exp.superob_km)
print("superob took", int(time_module.time() - t), "seconds")
prior_Hx = osf.get_prior_Hx_matrix().T
truth_Hx = osf.get_truth_Hx()
print("obs operator ensemble took", int(time_module.time() - t), "seconds") print("obs operator ensemble took", int(time_module.time() - t), "seconds")
n_obs = len(list_ensemble_truths[0]) return truth_Hx, prior_Hx
np_array = np.full((exp.n_ens, n_obs), np.nan)
for i in range(exp.n_ens):
np_array[i, :] = list_ensemble_truths[i]
return np_array
def obs_operator_nature(time): def obs_operator_nature(time):
...@@ -236,9 +228,7 @@ def prepare_nature_dart(time): ...@@ -236,9 +228,7 @@ def prepare_nature_dart(time):
) )
def prepare_prior_ensemble( def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_path_exp):
assim_time, prior_init_time, prior_valid_time, prior_path_exp
):
"""Prepares DART files for running filter """Prepares DART files for running filter
i.e. i.e.
- links first guess state to DART first guess filenames - links first guess state to DART first guess filenames
...@@ -304,7 +294,7 @@ def prepare_prior_ensemble( ...@@ -304,7 +294,7 @@ def prepare_prior_ensemble(
def calc_obserr_WV73(Hx_nature, Hx_prior): def calc_obserr_WV73(Hx_nature, Hx_prior):
debug = False
n_obs = len(Hx_nature) n_obs = len(Hx_nature)
OEs = np.ones(n_obs) OEs = np.ones(n_obs)
for iobs in range(n_obs): for iobs in range(n_obs):
...@@ -315,6 +305,7 @@ def calc_obserr_WV73(Hx_nature, Hx_prior): ...@@ -315,6 +305,7 @@ def calc_obserr_WV73(Hx_nature, Hx_prior):
mean_CI = np.mean(CIs) mean_CI = np.mean(CIs)
oe_nature = oe_73(mean_CI) oe_nature = oe_73(mean_CI)
if debug:
print("oe_nature:", oe_nature, ", bt_y:", bt_y, ", mean_CI:", mean_CI) print("oe_nature:", oe_nature, ", bt_y:", bt_y, ", mean_CI:", mean_CI)
OEs[iobs] = oe_nature OEs[iobs] = oe_nature
return OEs return OEs
...@@ -432,21 +423,13 @@ def archive_filteroutput(time): ...@@ -432,21 +423,13 @@ def archive_filteroutput(time):
def archive_osq_out(time): def archive_osq_out(time):
dir_obsseq = cluster.archivedir + "/obs_seq_out/" dir_obsseq = cluster.archivedir + "/obs_seq_out/"
os.makedirs(dir_obsseq, exist_ok=True) os.makedirs(dir_obsseq, exist_ok=True)
copy( copy(cluster.dartrundir + "/obs_seq.out",
cluster.dartrundir + "/obs_seq.out", dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out"))
dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out"), try:
) copy(cluster.dartrundir + "/obs_seq.out-orig",
dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-orig"))
except Exception as e:
def superob(): warnings.warn(str(e))
if hasattr(exp, "superob_km"):
t = time_module.time()
obs = obsseq.ObsSeqOut(cluster.dartrundir + "/obs_seq.out")
print("superobbing to", exp.superob_km, "km")
obs.superob(window_km=exp.superob_km)
obs.to_dart(f=cluster.dartrundir + "/obs_seq.out")
print("superob took", int(time_module.time() - t), "seconds")
if __name__ == "__main__": if __name__ == "__main__":
"""Assimilate observations (different obs types) """Assimilate observations (different obs types)
...@@ -486,12 +469,12 @@ if __name__ == "__main__": ...@@ -486,12 +469,12 @@ if __name__ == "__main__":
archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/") archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/")
os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in
os.chdir(cluster.dartrundir) os.chdir(cluster.dartrundir)
os.system(
cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py" # link DART binaries to run_DART
) # link DART binaries to run_DART os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py")
os.system(
"rm -f input.nml obs_seq.in obs_seq.out obs_seq.final" # remove any existing observation files
) # remove any existing observation files os.system("rm -f input.nml obs_seq.in obs_seq.out obs_seq.out-orig obs_seq.final")
set_DART_nml() set_DART_nml()
print("prepare nature") print("prepare nature")
...@@ -508,46 +491,35 @@ if __name__ == "__main__": ...@@ -508,46 +491,35 @@ if __name__ == "__main__":
# and collect the obs-error for assimilation in a single vector/list # and collect the obs-error for assimilation in a single vector/list
for i, obscfg in enumerate(exp.observations): for i, obscfg in enumerate(exp.observations):
print('for', obscfg)
n_obs = obscfg["n_obs"] n_obs = obscfg["n_obs"]
n_obs_z = len( levels = obscfg.get("heights", [1,])
obscfg.get( n_obs_z = len(levels)
"heights",
[
1,
],
)
)
n_obs_3d = n_obs * n_obs_z n_obs_3d = n_obs * n_obs_z
do_error_parametrization = (obscfg["error_assimilate"] == False) and ( do_error_parametrization = (obscfg["error_assimilate"] == False) and (
obscfg.get("sat_channel") == 6 obscfg.get("sat_channel") == 6)
)
if do_error_parametrization: if do_error_parametrization:
# get observations for sat 6 # get observations for sat 6
osq.create_obsseqin_alltypes( osq.create_obsseqin_alltypes(time, [obscfg, ], 0)
time,
[ # create observations at full resolution
obscfg,
],
np.zeros(n_obs_3d),
)
run_perfect_model_obs() run_perfect_model_obs()
superob()
# depends on obs_seq.out produced before by run_perfect_model_obs() # run obs operator (through filter program)
Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir + "/obs_seq.out") # to create truth and prior ensemble Hx
# superob the high-res observations
# return the averaged observations / prior Hx
Hx_truth, Hx_prior = get_Hx_truth_prior()
Hx_prior = ( # compute the obs error for assimilation on the averaged grid
obs_operator_ensemble() # since the assimilation is done on the averaged grid
) # files are already linked to DART directory err_assim = calc_obserr_WV73(Hx_truth, Hx_prior)
err_assim = calc_obserr_WV73(Hx_nat, Hx_prior)
else: else:
err_assim = np.zeros(n_obs_3d) + obscfg["error_assimilate"] err_assim = np.zeros(n_obs_3d) + obscfg["error_assimilate"]
error_assimilate.extend( error_assimilate.extend(err_assim)
err_assim
) # the obs-error we assume for assimilating observations
################################################ ################################################
print(" 2) generate observations ") print(" 2) generate observations ")
...@@ -558,17 +530,25 @@ if __name__ == "__main__": ...@@ -558,17 +530,25 @@ if __name__ == "__main__":
error_generate.extend(np.zeros(n_obs_3d) + obscfg["error_generate"]) error_generate.extend(np.zeros(n_obs_3d) + obscfg["error_generate"])
osq.create_obsseqin_alltypes(time, exp.observations, error_generate) osq.create_obsseqin_alltypes(time, exp.observations, error_generate)
set_DART_nml() set_DART_nml()
run_perfect_model_obs() # actually create observations that are used to assimilate run_perfect_model_obs() # actually create observations that are used to assimilate
superob()
archive_osq_out(time) oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out")
if hasattr(exp, "superob_km"):
print("superobbing to", exp.superob_km, "km")
oso.superob(window_km=exp.superob_km)
copy(cluster.dartrundir + "/obs_seq.out", cluster.dartrundir + "/obs_seq.out-orig")
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
################################################ ################################################
print(" 3) assimilate with observation-errors for assimilation") print(" 3) assimilate with observation-errors for assimilation")
replace_errors_obsseqout(cluster.dartrundir + "/obs_seq.out", error_assimilate) oso.df['variance'] = error_assimilate # replace error variance
oso.to_dart(cluster.dartrundir + "/obs_seq.out")
archive_osq_out(time)
t = time_module.time() t = time_module.time()
assimilate() assimilate()
print("filter took", time_module.time() - t, "seconds") print("filter took", time_module.time() - t, "seconds")
......
...@@ -6,12 +6,6 @@ from config.cfg import exp, cluster ...@@ -6,12 +6,6 @@ from config.cfg import exp, cluster
from utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove from utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove
import matplotlib as mpl
mpl.use("agg")
import matplotlib.pyplot as plt
def plot_box(m, lat, lon, label="", **kwargs): def plot_box(m, lat, lon, label="", **kwargs):
m.drawgreatcircle( m.drawgreatcircle(
...@@ -53,18 +47,15 @@ def rad_to_degrees(rad): ...@@ -53,18 +47,15 @@ def rad_to_degrees(rad):
return degr return degr
class ObsSeqOut(object): class ObsSeq(object):
""" """Read, manipulate, save obs_seq.out / final files
Attributes:
obstypes (list) : contains (kind_nr, kind_name) tuples
""" """
def __init__(self, filepath): def __init__(self, filepath):
print(filepath)
self.ascii = open(filepath, "r").readlines() self.ascii = open(filepath, "r").readlines()
self.preamble, self.content = self.get_preamble_content() self.get_preamble_content()
self.obstypes = self.get_obstypes() self.read_preamble()
self.dict = self.obs_to_dict() self.dict = self.obs_to_dict()
self.df = self.to_pandas() self.df = self.to_pandas()
...@@ -80,151 +71,98 @@ class ObsSeqOut(object): ...@@ -80,151 +71,98 @@ class ObsSeqOut(object):
for i, line in enumerate(self.ascii): for i, line in enumerate(self.ascii):
if "OBS " in line: if "OBS " in line:
break break
if i == len(self.ascii)-1:
raise RuntimeError('did not find `OBS ` in file!')
return self.ascii[:i], self.ascii[i:] self.preamble = self.ascii[:i]
self.content = self.ascii[i:]
def get_obstypes(self): def read_preamble(self):
"""Return a list of tuples (kind_nr, kind_descriptor) for each obs type""" """Defines
self.obstypes (tuple(kind_nr, kind_descriptor)) for each obs type
"""
# how many obstypes # how many obstypes
for i, line in enumerate(self.ascii): for i, line in enumerate(self.preamble):
if "obs_type_definitions" in line: if "obs_type_definitions" in line or "obs_kind_definitions" in line:
break break
# check if we found definitions before end of file
if i == len(self.preamble)-1: # end of file
raise RuntimeError('did not find `obs_type_definitions` or `obs_kind_definitions` in file')
line_n_obstypes = i + 1 line_n_obstypes = i + 1
n_obstypes = int(self.ascii[line_n_obstypes]) n_obstypes = int(self.preamble[line_n_obstypes]) # read integer from file
# read obs type kind (number and description) # read obs type kind (number and description)
obstypes = [] obstypes = []
for k in range(1, n_obstypes + 1): for k in range(1, n_obstypes + 1):
kind_nr, kind_type = self.ascii[line_n_obstypes + k].split() kind_nr, kind_type = self.preamble[line_n_obstypes + k].split()
kind_nr = int(kind_nr) kind_nr = int(kind_nr)
obstypes.append((kind_nr, kind_type)) obstypes.append((kind_nr, kind_type))
return obstypes self.obstypes = obstypes
def to_dart(self, f):
"""Write to obs_seq.out file in DART format
Args: # read num_copies (2 for obs_seq.out, 44 for obs_seq.final with 40 ens members)
f (str): path of file to write num_copies = False
""" for line in self.preamble:
if 'num_copies:' in line:
_, num_copies, _, num_qc = line.split()
break
if not num_copies:
raise RuntimeError('did not find `num_copies:` in '+str(self.preamble))
num_copies = int(num_copies)
num_qc = int(num_qc)
# read num_obs
num_obs = False
for i, line in enumerate(self.preamble):
if 'num_obs:' in line:
_, num_obs, _, max_num_obs = line.split()
break
if not num_obs:
raise RuntimeError('did not find `num_obs:` in '+str(self.preamble))
assert num_obs == max_num_obs, NotImplementedError()
self.num_obs = int(num_obs)
def write_file(msg, output_path="./"): # read keys for values (e.g. 'observations', 'truth', 'prior ensemble mean',)
try: keys = []
os.remove(output_path) line_start_keys = i+1
except OSError: for j in range(line_start_keys, line_start_keys+num_copies+num_qc):
pass line = self.preamble[j]
keys.append(line.strip())
with open(output_path, "w") as f: self.keys_for_values = keys
f.write(msg)
print(output_path, "saved.")
def write_preamble(n_obs): def obs_to_dict(self):
"""Convert an obs_seq.out file to a dictionary"""
obs_begin_str = "OBS "
num_obstypes = str(len(self.obstypes)) def check_obs_begin(line):
txt = " obs_sequence \n obs_kind_definitions \n " + num_obstypes if not obs_begin_str in line:
raise RuntimeError("This is not the first line: "+str(line))
for (nr, obstype) in self.obstypes: def content_to_list(content):
txt += "\n " + str(nr) + " " + obstype """Split obs_seq.out content (lines of str) into list of observation-sections
nobs = str(n_obs)
txt += "\n num_copies: 2 num_qc: 1"
txt += "\n num_obs: " + nobs
txt += " max_num_obs: " + nobs
txt += "\n observations \n truth \n Quality Control \n"
txt += " first: 1 last: " + nobs
return txt
def write_obs(i, obs, next_i_obs=None, prev_i_obs=None):
"""Write the observation section of a obs_seq.out file
Args: Args:
i (int): index of observation content (list of str) : contains lines of file
obs (dict): observation data
next_i_obs (int): index of next observation
prev_i_obs (int): index of previous observation
(in case it is the last)
Returns Returns
str list of list of str
""" """
if next_i_obs:
line_link = " -1 " + str(next_i_obs) + " -1"
else: # last observation in file
line_link = " " + str(prev_i_obs) + " -1 -1"
lon_rad = str(obs["loc3d"][0])
lat_rad = str(obs["loc3d"][1])
out = (
" \n".join(
[
"\nOBS " + str(i),
str(obs["obs"]),
str(obs["truth"]),
str(obs["qc"]),
line_link,
"obdef",
"loc3d",
" ".join(
[
lon_rad,
lat_rad,
str(obs["loc3d"][2]),
str(obs["loc3d"][3]),
]
),
"kind",
" " + str(obs["kind"]),
"".join(obs["metadata"]),
]
)
+ str(i)
+ "\n "
+ obs["time"][0]
+ " "
+ obs["time"][1]
+ "\n"
+ str(obs["variance"])
)
return out
n_obs = len(self.df)
outstr = write_preamble(n_obs)
# loop through observations, concatenate obs sections
# DART format is linked list, needs index of next observation
# k ... 0, ..., len(df)-1
# i_obs_this ... starts at 1
for k, (_, obs) in enumerate(self.df.iterrows()):
i_obs_this = k + 1
if k < len(self.df) - 1:
i_obs_next = k + 2
outstr += write_obs(i_obs_this, obs, next_i_obs=i_obs_next)
else: # last obs in file
i_obs_prev = k
outstr += write_obs(i_obs_this, obs, prev_i_obs=i_obs_prev)
write_file(outstr, output_path=f)
def obs_to_dict(self):
"""Convert an obs_seq.out file to a dictionary"""
def check_obs_begin(line):
if not " OBS " in line:
raise RuntimeError("wrong line in observation")
def content_to_list(content):
"""Split obs_seq.out content into list of observation-sections"""
obs_list = [] obs_list = []
i = 0 i = 0
try:
check_obs_begin(content[0]) check_obs_begin(content[0])
except:
print(content)
raise
obs_begin = 0 obs_begin = 0
for i, line in enumerate(content): for i, line in enumerate(content):
if i == 0: if i == 0:
continue continue
if " OBS " in line: # then this line is beginning of obs if obs_begin_str in line: # then this line is beginning of obs
obs_end = i - 1 # previous line obs_end = i - 1 # previous line
obs_list.append(content[obs_begin : obs_end + 1]) obs_list.append(content[obs_begin : obs_end + 1])
obs_begin = i # next obs starts here obs_begin = i # next obs starts here
...@@ -241,7 +179,11 @@ class ObsSeqOut(object): ...@@ -241,7 +179,11 @@ class ObsSeqOut(object):
out = dict() out = dict()
lines = obs_list_entry lines = obs_list_entry
try:
check_obs_begin(lines[0]) check_obs_begin(lines[0])
except:
print(lines)
raise
for i, line in enumerate(lines): for i, line in enumerate(lines):
if "loc3d" in line: # find location if "loc3d" in line: # find location
...@@ -250,9 +192,11 @@ class ObsSeqOut(object): ...@@ -250,9 +192,11 @@ class ObsSeqOut(object):
if "kind" in line: # find obs kind if "kind" in line: # find obs kind
line_kind = i + 1 line_kind = i + 1
out["obs"] = float(lines[1].strip()) for k, key in enumerate(self.keys_for_values):
out["truth"] = float(lines[2].strip()) out[key] = float(lines[1+k].strip())
out["qc"] = float(lines[3].strip()) #out["obs"] = float(lines[1].strip())
#out["truth"] = float(lines[2].strip())
#out["qc"] = float(lines[3].strip())
x, y, z, z_coord = lines[line_loc].split() x, y, z, z_coord = lines[line_loc].split()
out["loc3d"] = float(x), float(y), float(z), int(z_coord) out["loc3d"] = float(x), float(y), float(z), int(z_coord)
out["kind"] = int(lines[line_kind].strip()) out["kind"] = int(lines[line_kind].strip())
...@@ -274,31 +218,31 @@ class ObsSeqOut(object): ...@@ -274,31 +218,31 @@ class ObsSeqOut(object):
return obs_list_dict return obs_list_dict
# content = [line1, ...] # content = [line1, ...]
# transform to [obs1, obs2, obs3, ...] # obs_list = [obs1, obs2, obs3, ...]
# obs1 = [obsline1, obsline2, ...] # where obs1 = [line1, line2, ...]; all lines pertaining to one obs
obs_list = content_to_list(self.content) obs_list = content_to_list(self.content)
return obs_list_to_dict(obs_list)
def to_pandas(self): # each obs is one dictionary
"""Create xr.Dataset containing observations list_of_obsdict = obs_list_to_dict(obs_list)
Variables = observation types return list_of_obsdict
"""
obs_dict_list = self.obs_to_dict()
# convert to pandas.DataFrame def get_prior_Hx_matrix(self):
# each observation is one line """Return prior Hx array (n_obs, n_ens)"""
# columns: all observation contents
# set keys from first obs (kind, loc3d, values) # which columns do we need?
keys = obs_dict_list[0].keys() keys = self.df.columns
data = {key: [] for key in keys} keys_bool = np.array(['prior ensemble member' in a for a in keys])
# fill the data lists for each column of the DataFrame # select columns in DataFrame
for obs in obs_dict_list: prior_Hx = self.df.iloc[:, keys_bool]
for key in keys:
data[key].append(obs[key])
return pd.DataFrame(index=range(len(obs_dict_list)), data=data) # consistency check: compute mean over ens - compare with value from file
assert np.allclose(prior_Hx.mean(axis=1), self.df['prior ensemble mean'])
return prior_Hx.values
def get_truth_Hx(self):
return self.df['truth'].values
def get_lon_lat(self): def get_lon_lat(self):
lats = np.empty(len(self.df), np.float32) lats = np.empty(len(self.df), np.float32)
...@@ -370,6 +314,8 @@ class ObsSeqOut(object): ...@@ -370,6 +314,8 @@ class ObsSeqOut(object):
# how many observations in x/y direction? # how many observations in x/y direction?
win_obs = int(window_km / obs_spacing_km) win_obs = int(window_km / obs_spacing_km)
if debug: if debug:
print('window_km=', window_km)
print('obs spacing=', obs_spacing_km)
print("window (#obs in x/y)=", win_obs) print("window (#obs in x/y)=", win_obs)
out = self.df.drop(self.df.index) # this df will be filled out = self.df.drop(self.df.index) # this df will be filled
...@@ -381,18 +327,24 @@ class ObsSeqOut(object): ...@@ -381,18 +327,24 @@ class ObsSeqOut(object):
i_obs_box = i_obs_grid[i : i + win_obs, j : j + win_obs].ravel() i_obs_box = i_obs_grid[i : i + win_obs, j : j + win_obs].ravel()
if debug: if debug:
print("box=", i, i + win_obs, j, j + win_obs) print("index x from", i, 'to', i + win_obs)
print("i_obs box=", i_obs_grid[i : i + win_obs, j : j + win_obs]) print("index y from", j, 'to', j + win_obs)
print("obs indices box=", i_obs_grid[i : i + win_obs, j : j + win_obs])
# average the subset # average the subset
# metadata are assumed to be equal # metadata are assumed to be equal
obs_box = self.df.iloc[i_obs_box] obs_box = self.df.iloc[i_obs_box]
obs_mean = obs_box.iloc[0] obs_mean = obs_box.iloc[0]
obs_mean.at["obs"] = obs_box["obs"].mean() # despite its name, 'observations' is a single value
obs_mean.at["truth"] = obs_box["truth"].mean() for key in obs_box:
obs_mean.at["qc"] = obs_box["qc"].mean() if key in ['loc3d', 'kind', 'metadata', 'time']:
obs_mean.at["variance"] = obs_box["variance"].mean() pass
elif 'spread' in key: # mean of std deviations
obs_mean.at[key] = np.sqrt((obs_box[key]**2).mean())
else:
obs_mean.at[key] = obs_box[key].mean()
if debug: if debug:
print("pre_avg:", obs_box.head()) print("pre_avg:", obs_box.head())
print("avg:", obs_mean) print("avg:", obs_mean)
...@@ -400,8 +352,145 @@ class ObsSeqOut(object): ...@@ -400,8 +352,145 @@ class ObsSeqOut(object):
out = out.append(obs_mean) out = out.append(obs_mean)
self.df = out # overwrite input self.df = out # overwrite input
n_pre_superob = self.num_obs
n_post_superob = len(self.df)
# assume square of obs
n_windows_x = int(n_pre_superob**.5/win_obs) # in x-direction
n_target_post = n_windows_x**2 # number of windows
print('superob from', n_pre_superob, 'obs to', n_post_superob, 'obs')
assert n_post_superob == n_target_post, RuntimeError()
def to_pandas(self):
"""Create xr.Dataset containing observations
Variables = observation types
"""
obs_dict_list = self.obs_to_dict()
# convert to pandas.DataFrame
# each observation is one line
# columns: all observation contents
# set keys from first obs (kind, loc3d, values)
keys = obs_dict_list[0].keys()
data = {key: [] for key in keys}
# fill the data lists for each column of the DataFrame
for obs in obs_dict_list:
for key in keys:
data[key].append(obs[key])
return pd.DataFrame(index=range(len(obs_dict_list)), data=data)
def to_dart(self, f):
"""Write to obs_seq.out file in DART format
Args:
f (str): path of file to write
"""
def write_file(msg, output_path="./"):
try:
os.remove(output_path)
except OSError:
pass
with open(output_path, "w") as f:
f.write(msg)
print(output_path, "saved.")
def write_preamble(n_obs):
num_obstypes = str(len(self.obstypes))
txt = " obs_sequence \n obs_kind_definitions \n " + num_obstypes
for (nr, obstype) in self.obstypes:
txt += "\n " + str(nr) + " " + obstype
nobs = str(n_obs)
txt += "\n num_copies: 2 num_qc: 1"
txt += "\n num_obs: " + nobs
txt += " max_num_obs: " + nobs
txt += "\n observations \n truth \n Quality Control \n"
txt += " first: 1 last: " + nobs
return txt
def write_obs(i, obs, next_i_obs=None, prev_i_obs=None):
"""Write the observation section of a obs_seq.out file
Args:
i (int): index of observation
obs (dict): observation data
next_i_obs (int): index of next observation
prev_i_obs (int): index of previous observation
(in case it is the last)
Returns
str
"""
if next_i_obs:
line_link = " -1 " + str(next_i_obs) + " -1"
else: # last observation in file
line_link = " " + str(prev_i_obs) + " -1 -1"
lon_rad = str(obs["loc3d"][0])
lat_rad = str(obs["loc3d"][1])
out = (
" \n".join(
[
"\nOBS " + str(i),
str(obs["observations"]),
str(obs["truth"]),
str(obs["Quality Control"]),
line_link,
"obdef",
"loc3d", " ".join(
[
lon_rad, lat_rad,
str(obs["loc3d"][2]),
str(obs["loc3d"][3]),
]
),
"kind", " " + str(obs["kind"]),
"".join(obs["metadata"]),
]
)
+ str(i)
+ "\n "
+ obs["time"][0]
+ " "
+ obs["time"][1]
+ "\n"
+ str(obs["variance"])
)
return out
n_obs = len(self.df)
outstr = write_preamble(n_obs)
# loop through observations, concatenate obs sections
# DART format is linked list, needs index of next observation
# k ... 0, ..., len(df)-1
# i_obs_this ... starts at 1
for k, (_, obs) in enumerate(self.df.iterrows()):
i_obs_this = k + 1
if k < len(self.df) - 1:
i_obs_next = k + 2
outstr += write_obs(i_obs_this, obs, next_i_obs=i_obs_next)
else: # last obs in file
i_obs_prev = k
outstr += write_obs(i_obs_this, obs, prev_i_obs=i_obs_prev)
write_file(outstr, output_path=f)
def plot(self, box=None): def plot(self, box=None):
import matplotlib as mpl
mpl.use("agg")
import matplotlib.pyplot as plt
import xarray as xr import xarray as xr
georef = xr.open_dataset("/gpfs/data/fs71386/lkugler/run_DART/geo_em.d01.nc") georef = xr.open_dataset("/gpfs/data/fs71386/lkugler/run_DART/geo_em.d01.nc")
......
import sys
sys.path.append('../scripts')
import filecmp
from config.cfg import exp, cluster
import obsseq
def test_oso():
input = cluster.scriptsdir + "/../tests/obs_seq.orig.out"
output = cluster.scriptsdir + "/../tests/obs_seq.test.out"
output_true = cluster.scriptsdir + "/../tests/obs_seq.superob.out"
obs = obsseq.ObsSeq(input)
# select a subset
obs.superob(window_km=50)
# write to obs_seq.out in DART format
obs.to_dart(f=output)
assert filecmp.cmp(output, output_true)
def test_osf():
input = cluster.scriptsdir + "/../tests/obs_seq.final"
obs = obsseq.ObsSeq(input)
prior_Hx = obs.get_prior_Hx_matrix()
# TODO: compare with given truth
if __name__ == '__main__':
test_osf()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment