From a073386c6f578c345dcb9b8121b2496c43b832ca Mon Sep 17 00:00:00 2001 From: lkugler <lukas.kugler@gmail.com> Date: Tue, 14 Dec 2021 02:17:37 +0100 Subject: [PATCH] superob feature --- config/cfg.py | 4 +- scripts/assim_synth_obs.py | 160 +++++++-------- scripts/obsseq.py | 407 ++++++++++++++++++++++--------------- tests/test_obsseq.py | 34 ++++ 4 files changed, 354 insertions(+), 251 deletions(-) create mode 100644 tests/test_obsseq.py diff --git a/config/cfg.py b/config/cfg.py index 762aa68..b101eee 100755 --- a/config/cfg.py +++ b/config/cfg.py @@ -7,7 +7,7 @@ class ExperimentConfiguration(object): pass exp = ExperimentConfiguration() -exp.expname = "exp_v1.19_wb-random_WV_superob2-12" +exp.expname = "exp_v1.19_wb-random_WV_superob4-12" exp.model_dx = 2000 exp.n_ens = 40 exp.n_nodes = 10 @@ -25,7 +25,7 @@ exp.input_profile = '/home/fs71386/lkugler/wrf_profiles/data/wrf/ens/2021-05-04/ exp.cov_loc_vert_km_horiz_km = (1, 30) exp.superob_km = 12 -n_obs = 22801 #22801: 2km, 121: 30km, 256:16x16 (20km); 961: 10km resoltn # radar: n_obs for each observation height level +n_obs = 5776 #5776: 4km, 121: 30km, 256:16x16 (20km); 961: 10km resoltn # radar: n_obs for each observation height level vis = dict(plotname='VIS 0.6µm', plotunits='[1]', kind='MSG_4_SEVIRI_BDRF', sat_channel=1, n_obs=n_obs, diff --git a/scripts/assim_synth_obs.py b/scripts/assim_synth_obs.py index 3e70acb..35dcbb7 100755 --- a/scripts/assim_synth_obs.py +++ b/scripts/assim_synth_obs.py @@ -55,9 +55,7 @@ def read_prior_obs(f_obs_prior): for j in range(5, 5 + exp.n_ens): prior_ens.append(float(obsseq[i + j].strip())) - OBSs.append( - dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens)) - ) + OBSs.append(dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens))) return OBSs @@ -86,7 +84,7 @@ def replace_errors_obsseqout(f, new_errors): obsseq = open(f, "r").readlines() # find number of lines between two ' OBS ' lines: - first_obs = None + first_obs = second_obs = None for i, line in enumerate(obsseq): if " OBS " in line: if first_obs is None: @@ -94,10 +92,10 @@ def replace_errors_obsseqout(f, new_errors): else: second_obs = i break + if not second_obs: + raise RuntimeError('just one OBS in this file?! '+str(f)) lines_between = second_obs - first_obs - lines_obserr_after_obsnr = ( - lines_between - 1 - ) # obserr line is one before ' OBS ' line + lines_obserr_after_obsnr = lines_between - 1 # obserr line is one before ' OBS ' line # replace values in list obsseq i_obs = 0 @@ -135,9 +133,7 @@ def set_DART_nml(just_prior_values=False): return vert_norm_rad list_obstypes = [obscfg["kind"] for obscfg in exp.observations] - list_cov_loc_radius_km = [ - obscfg["cov_loc_radius_km"] for obscfg in exp.observations - ] + list_cov_loc_radius_km = [obscfg["cov_loc_radius_km"] for obscfg in exp.observations] list_cov_loc_radian = [str(to_radian_horizontal(a)) for a in list_cov_loc_radius_km] if just_prior_values: @@ -172,39 +168,35 @@ def set_DART_nml(just_prior_values=False): append_file(cluster.dartrundir + "/input.nml", rttov_nml) -def obs_operator_ensemble(): +def get_Hx_truth_prior(): + # assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01 print("running obs operator on ensemble forecast") os.chdir(cluster.dartrundir) - list_ensemble_truths = [] t = time_module.time() - for iens in range(1, exp.n_ens + 1): - print("observation operator for ens #" + str(iens)) - # ens members are already linked to advance_temp<i>/wrfout_d01 - copy( - cluster.dartrundir + "/advance_temp" + str(iens) + "/wrfout_d01", - cluster.dartrundir + "/wrfout_d01", - ) - # DART may need a wrfinput file as well, which serves as a template for dimension sizes - symlink( - cluster.dartrundir + "/wrfout_d01", cluster.dartrundir + "/wrfinput_d01" - ) + # set input.nml to calculate prior Hx + set_DART_nml(just_prior_values=True) - # run perfect_model obs (forward operator) - os.system("mpirun -np 12 ./perfect_model_obs > /dev/null") + # run filter to calculate prior Hx + os.system("mpirun -np 12 ./filter &> log.filter.preassim") - # truth values in obs_seq.out are H(x) values - true, _ = read_truth_obs_obsseq(cluster.dartrundir + "/obs_seq.out") - list_ensemble_truths.append(true) + # read prior Hx values from obs_seq.final + osf = obsseq.ObsSeq(cluster.dartrundir + '/obs_seq.final') + + if hasattr(exp, "superob_km"): + print("superobbing to", exp.superob_km, "km") + + t = time_module.time() + osf.superob(window_km=exp.superob_km) + print("superob took", int(time_module.time() - t), "seconds") + + prior_Hx = osf.get_prior_Hx_matrix().T + truth_Hx = osf.get_truth_Hx() print("obs operator ensemble took", int(time_module.time() - t), "seconds") - n_obs = len(list_ensemble_truths[0]) - np_array = np.full((exp.n_ens, n_obs), np.nan) - for i in range(exp.n_ens): - np_array[i, :] = list_ensemble_truths[i] - return np_array + return truth_Hx, prior_Hx def obs_operator_nature(time): @@ -236,9 +228,7 @@ def prepare_nature_dart(time): ) -def prepare_prior_ensemble( - assim_time, prior_init_time, prior_valid_time, prior_path_exp -): +def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_path_exp): """Prepares DART files for running filter i.e. - links first guess state to DART first guess filenames @@ -304,7 +294,7 @@ def prepare_prior_ensemble( def calc_obserr_WV73(Hx_nature, Hx_prior): - + debug = False n_obs = len(Hx_nature) OEs = np.ones(n_obs) for iobs in range(n_obs): @@ -315,7 +305,8 @@ def calc_obserr_WV73(Hx_nature, Hx_prior): mean_CI = np.mean(CIs) oe_nature = oe_73(mean_CI) - print("oe_nature:", oe_nature, ", bt_y:", bt_y, ", mean_CI:", mean_CI) + if debug: + print("oe_nature:", oe_nature, ", bt_y:", bt_y, ", mean_CI:", mean_CI) OEs[iobs] = oe_nature return OEs @@ -432,21 +423,13 @@ def archive_filteroutput(time): def archive_osq_out(time): dir_obsseq = cluster.archivedir + "/obs_seq_out/" os.makedirs(dir_obsseq, exist_ok=True) - copy( - cluster.dartrundir + "/obs_seq.out", - dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out"), - ) - - -def superob(): - if hasattr(exp, "superob_km"): - t = time_module.time() - obs = obsseq.ObsSeqOut(cluster.dartrundir + "/obs_seq.out") - print("superobbing to", exp.superob_km, "km") - obs.superob(window_km=exp.superob_km) - obs.to_dart(f=cluster.dartrundir + "/obs_seq.out") - print("superob took", int(time_module.time() - t), "seconds") - + copy(cluster.dartrundir + "/obs_seq.out", + dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out")) + try: + copy(cluster.dartrundir + "/obs_seq.out-orig", + dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-orig")) + except Exception as e: + warnings.warn(str(e)) if __name__ == "__main__": """Assimilate observations (different obs types) @@ -486,12 +469,12 @@ if __name__ == "__main__": archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/") os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in os.chdir(cluster.dartrundir) - os.system( - cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py" - ) # link DART binaries to run_DART - os.system( - "rm -f input.nml obs_seq.in obs_seq.out obs_seq.final" - ) # remove any existing observation files + + # link DART binaries to run_DART + os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py") + + # remove any existing observation files + os.system("rm -f input.nml obs_seq.in obs_seq.out obs_seq.out-orig obs_seq.final") set_DART_nml() print("prepare nature") @@ -508,46 +491,35 @@ if __name__ == "__main__": # and collect the obs-error for assimilation in a single vector/list for i, obscfg in enumerate(exp.observations): + print('for', obscfg) n_obs = obscfg["n_obs"] - n_obs_z = len( - obscfg.get( - "heights", - [ - 1, - ], - ) - ) + levels = obscfg.get("heights", [1,]) + n_obs_z = len(levels) n_obs_3d = n_obs * n_obs_z do_error_parametrization = (obscfg["error_assimilate"] == False) and ( - obscfg.get("sat_channel") == 6 - ) + obscfg.get("sat_channel") == 6) if do_error_parametrization: # get observations for sat 6 - osq.create_obsseqin_alltypes( - time, - [ - obscfg, - ], - np.zeros(n_obs_3d), - ) + osq.create_obsseqin_alltypes(time, [obscfg, ], 0) + + # create observations at full resolution run_perfect_model_obs() - superob() - # depends on obs_seq.out produced before by run_perfect_model_obs() - Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir + "/obs_seq.out") + # run obs operator (through filter program) + # to create truth and prior ensemble Hx + # superob the high-res observations + # return the averaged observations / prior Hx + Hx_truth, Hx_prior = get_Hx_truth_prior() - Hx_prior = ( - obs_operator_ensemble() - ) # files are already linked to DART directory - err_assim = calc_obserr_WV73(Hx_nat, Hx_prior) + # compute the obs error for assimilation on the averaged grid + # since the assimilation is done on the averaged grid + err_assim = calc_obserr_WV73(Hx_truth, Hx_prior) else: err_assim = np.zeros(n_obs_3d) + obscfg["error_assimilate"] - error_assimilate.extend( - err_assim - ) # the obs-error we assume for assimilating observations + error_assimilate.extend(err_assim) ################################################ print(" 2) generate observations ") @@ -558,17 +530,25 @@ if __name__ == "__main__": error_generate.extend(np.zeros(n_obs_3d) + obscfg["error_generate"]) osq.create_obsseqin_alltypes(time, exp.observations, error_generate) - set_DART_nml() run_perfect_model_obs() # actually create observations that are used to assimilate - superob() - archive_osq_out(time) + + oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out") + + if hasattr(exp, "superob_km"): + print("superobbing to", exp.superob_km, "km") + oso.superob(window_km=exp.superob_km) + copy(cluster.dartrundir + "/obs_seq.out", cluster.dartrundir + "/obs_seq.out-orig") + oso.to_dart(f=cluster.dartrundir + "/obs_seq.out") ################################################ print(" 3) assimilate with observation-errors for assimilation") - replace_errors_obsseqout(cluster.dartrundir + "/obs_seq.out", error_assimilate) + oso.df['variance'] = error_assimilate # replace error variance + oso.to_dart(cluster.dartrundir + "/obs_seq.out") + archive_osq_out(time) + t = time_module.time() assimilate() print("filter took", time_module.time() - t, "seconds") diff --git a/scripts/obsseq.py b/scripts/obsseq.py index 8dae9a3..f639fbc 100644 --- a/scripts/obsseq.py +++ b/scripts/obsseq.py @@ -6,12 +6,6 @@ from config.cfg import exp, cluster from utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove -import matplotlib as mpl - -mpl.use("agg") -import matplotlib.pyplot as plt - - def plot_box(m, lat, lon, label="", **kwargs): m.drawgreatcircle( @@ -53,18 +47,15 @@ def rad_to_degrees(rad): return degr -class ObsSeqOut(object): - """ - Attributes: - obstypes (list) : contains (kind_nr, kind_name) tuples +class ObsSeq(object): + """Read, manipulate, save obs_seq.out / final files """ def __init__(self, filepath): - print(filepath) self.ascii = open(filepath, "r").readlines() - self.preamble, self.content = self.get_preamble_content() - self.obstypes = self.get_obstypes() + self.get_preamble_content() + self.read_preamble() self.dict = self.obs_to_dict() self.df = self.to_pandas() @@ -78,153 +69,100 @@ class ObsSeqOut(object): 2) Observation contents """ for i, line in enumerate(self.ascii): - if " OBS " in line: + if "OBS " in line: break + if i == len(self.ascii)-1: + raise RuntimeError('did not find `OBS ` in file!') - return self.ascii[:i], self.ascii[i:] + self.preamble = self.ascii[:i] + self.content = self.ascii[i:] - def get_obstypes(self): - """Return a list of tuples (kind_nr, kind_descriptor) for each obs type""" + def read_preamble(self): + """Defines + self.obstypes (tuple(kind_nr, kind_descriptor)) for each obs type + """ # how many obstypes - for i, line in enumerate(self.ascii): - if "obs_type_definitions" in line: + for i, line in enumerate(self.preamble): + if "obs_type_definitions" in line or "obs_kind_definitions" in line: break + + # check if we found definitions before end of file + if i == len(self.preamble)-1: # end of file + raise RuntimeError('did not find `obs_type_definitions` or `obs_kind_definitions` in file') + line_n_obstypes = i + 1 - n_obstypes = int(self.ascii[line_n_obstypes]) + n_obstypes = int(self.preamble[line_n_obstypes]) # read integer from file # read obs type kind (number and description) obstypes = [] for k in range(1, n_obstypes + 1): - kind_nr, kind_type = self.ascii[line_n_obstypes + k].split() + kind_nr, kind_type = self.preamble[line_n_obstypes + k].split() kind_nr = int(kind_nr) obstypes.append((kind_nr, kind_type)) - return obstypes - - def to_dart(self, f): - """Write to obs_seq.out file in DART format - - Args: - f (str): path of file to write - """ - - def write_file(msg, output_path="./"): - try: - os.remove(output_path) - except OSError: - pass - - with open(output_path, "w") as f: - f.write(msg) - print(output_path, "saved.") - - def write_preamble(n_obs): - - num_obstypes = str(len(self.obstypes)) - txt = " obs_sequence \n obs_kind_definitions \n " + num_obstypes + self.obstypes = obstypes - for (nr, obstype) in self.obstypes: - txt += "\n " + str(nr) + " " + obstype - nobs = str(n_obs) - txt += "\n num_copies: 2 num_qc: 1" - txt += "\n num_obs: " + nobs - txt += " max_num_obs: " + nobs - txt += "\n observations \n truth \n Quality Control \n" - txt += " first: 1 last: " + nobs - return txt - - def write_obs(i, obs, next_i_obs=None, prev_i_obs=None): - """Write the observation section of a obs_seq.out file - Args: - i (int): index of observation - obs (dict): observation data - - next_i_obs (int): index of next observation - prev_i_obs (int): index of previous observation - (in case it is the last) - - Returns - str - """ - - if next_i_obs: - line_link = " -1 " + str(next_i_obs) + " -1" - else: # last observation in file - line_link = " " + str(prev_i_obs) + " -1 -1" - - lon_rad = str(obs["loc3d"][0]) - lat_rad = str(obs["loc3d"][1]) - - out = ( - " \n".join( - [ - "\nOBS " + str(i), - str(obs["obs"]), - str(obs["truth"]), - str(obs["qc"]), - line_link, - "obdef", - "loc3d", - " ".join( - [ - lon_rad, - lat_rad, - str(obs["loc3d"][2]), - str(obs["loc3d"][3]), - ] - ), - "kind", - " " + str(obs["kind"]), - "".join(obs["metadata"]), - ] - ) - + str(i) - + "\n " - + obs["time"][0] - + " " - + obs["time"][1] - + "\n" - + str(obs["variance"]) - ) - return out - - n_obs = len(self.df) - outstr = write_preamble(n_obs) - - # loop through observations, concatenate obs sections - # DART format is linked list, needs index of next observation - # k ... 0, ..., len(df)-1 - # i_obs_this ... starts at 1 - for k, (_, obs) in enumerate(self.df.iterrows()): - i_obs_this = k + 1 + # read num_copies (2 for obs_seq.out, 44 for obs_seq.final with 40 ens members) + num_copies = False + for line in self.preamble: + if 'num_copies:' in line: + _, num_copies, _, num_qc = line.split() + break + if not num_copies: + raise RuntimeError('did not find `num_copies:` in '+str(self.preamble)) + num_copies = int(num_copies) + num_qc = int(num_qc) + + # read num_obs + num_obs = False + for i, line in enumerate(self.preamble): + if 'num_obs:' in line: + _, num_obs, _, max_num_obs = line.split() + break + if not num_obs: + raise RuntimeError('did not find `num_obs:` in '+str(self.preamble)) + assert num_obs == max_num_obs, NotImplementedError() + self.num_obs = int(num_obs) - if k < len(self.df) - 1: - i_obs_next = k + 2 - outstr += write_obs(i_obs_this, obs, next_i_obs=i_obs_next) - else: # last obs in file - i_obs_prev = k - outstr += write_obs(i_obs_this, obs, prev_i_obs=i_obs_prev) + # read keys for values (e.g. 'observations', 'truth', 'prior ensemble mean',) + keys = [] + line_start_keys = i+1 + for j in range(line_start_keys, line_start_keys+num_copies+num_qc): + line = self.preamble[j] + keys.append(line.strip()) - write_file(outstr, output_path=f) + self.keys_for_values = keys def obs_to_dict(self): """Convert an obs_seq.out file to a dictionary""" + obs_begin_str = "OBS " def check_obs_begin(line): - if not " OBS " in line: - raise RuntimeError("wrong line in observation") + if not obs_begin_str in line: + raise RuntimeError("This is not the first line: "+str(line)) def content_to_list(content): - """Split obs_seq.out content into list of observation-sections""" + """Split obs_seq.out content (lines of str) into list of observation-sections + + Args: + content (list of str) : contains lines of file + + Returns + list of list of str + """ obs_list = [] i = 0 - check_obs_begin(content[0]) + try: + check_obs_begin(content[0]) + except: + print(content) + raise obs_begin = 0 for i, line in enumerate(content): if i == 0: continue - if " OBS " in line: # then this line is beginning of obs + if obs_begin_str in line: # then this line is beginning of obs obs_end = i - 1 # previous line obs_list.append(content[obs_begin : obs_end + 1]) obs_begin = i # next obs starts here @@ -241,7 +179,11 @@ class ObsSeqOut(object): out = dict() lines = obs_list_entry - check_obs_begin(lines[0]) + try: + check_obs_begin(lines[0]) + except: + print(lines) + raise for i, line in enumerate(lines): if "loc3d" in line: # find location @@ -250,9 +192,11 @@ class ObsSeqOut(object): if "kind" in line: # find obs kind line_kind = i + 1 - out["obs"] = float(lines[1].strip()) - out["truth"] = float(lines[2].strip()) - out["qc"] = float(lines[3].strip()) + for k, key in enumerate(self.keys_for_values): + out[key] = float(lines[1+k].strip()) + #out["obs"] = float(lines[1].strip()) + #out["truth"] = float(lines[2].strip()) + #out["qc"] = float(lines[3].strip()) x, y, z, z_coord = lines[line_loc].split() out["loc3d"] = float(x), float(y), float(z), int(z_coord) out["kind"] = int(lines[line_kind].strip()) @@ -274,31 +218,31 @@ class ObsSeqOut(object): return obs_list_dict # content = [line1, ...] - # transform to [obs1, obs2, obs3, ...] - # obs1 = [obsline1, obsline2, ...] + # obs_list = [obs1, obs2, obs3, ...] + # where obs1 = [line1, line2, ...]; all lines pertaining to one obs obs_list = content_to_list(self.content) - return obs_list_to_dict(obs_list) - def to_pandas(self): - """Create xr.Dataset containing observations - Variables = observation types - """ - obs_dict_list = self.obs_to_dict() + # each obs is one dictionary + list_of_obsdict = obs_list_to_dict(obs_list) + return list_of_obsdict - # convert to pandas.DataFrame - # each observation is one line - # columns: all observation contents + def get_prior_Hx_matrix(self): + """Return prior Hx array (n_obs, n_ens)""" - # set keys from first obs (kind, loc3d, values) - keys = obs_dict_list[0].keys() - data = {key: [] for key in keys} + # which columns do we need? + keys = self.df.columns + keys_bool = np.array(['prior ensemble member' in a for a in keys]) - # fill the data lists for each column of the DataFrame - for obs in obs_dict_list: - for key in keys: - data[key].append(obs[key]) + # select columns in DataFrame + prior_Hx = self.df.iloc[:, keys_bool] - return pd.DataFrame(index=range(len(obs_dict_list)), data=data) + # consistency check: compute mean over ens - compare with value from file + assert np.allclose(prior_Hx.mean(axis=1), self.df['prior ensemble mean']) + + return prior_Hx.values + + def get_truth_Hx(self): + return self.df['truth'].values def get_lon_lat(self): lats = np.empty(len(self.df), np.float32) @@ -370,6 +314,8 @@ class ObsSeqOut(object): # how many observations in x/y direction? win_obs = int(window_km / obs_spacing_km) if debug: + print('window_km=', window_km) + print('obs spacing=', obs_spacing_km) print("window (#obs in x/y)=", win_obs) out = self.df.drop(self.df.index) # this df will be filled @@ -381,18 +327,24 @@ class ObsSeqOut(object): i_obs_box = i_obs_grid[i : i + win_obs, j : j + win_obs].ravel() if debug: - print("box=", i, i + win_obs, j, j + win_obs) - print("i_obs box=", i_obs_grid[i : i + win_obs, j : j + win_obs]) + print("index x from", i, 'to', i + win_obs) + print("index y from", j, 'to', j + win_obs) + print("obs indices box=", i_obs_grid[i : i + win_obs, j : j + win_obs]) # average the subset # metadata are assumed to be equal obs_box = self.df.iloc[i_obs_box] obs_mean = obs_box.iloc[0] - obs_mean.at["obs"] = obs_box["obs"].mean() - obs_mean.at["truth"] = obs_box["truth"].mean() - obs_mean.at["qc"] = obs_box["qc"].mean() - obs_mean.at["variance"] = obs_box["variance"].mean() + # despite its name, 'observations' is a single value + for key in obs_box: + if key in ['loc3d', 'kind', 'metadata', 'time']: + pass + elif 'spread' in key: # mean of std deviations + obs_mean.at[key] = np.sqrt((obs_box[key]**2).mean()) + else: + obs_mean.at[key] = obs_box[key].mean() + if debug: print("pre_avg:", obs_box.head()) print("avg:", obs_mean) @@ -400,8 +352,145 @@ class ObsSeqOut(object): out = out.append(obs_mean) self.df = out # overwrite input + n_pre_superob = self.num_obs + n_post_superob = len(self.df) + + # assume square of obs + n_windows_x = int(n_pre_superob**.5/win_obs) # in x-direction + n_target_post = n_windows_x**2 # number of windows + print('superob from', n_pre_superob, 'obs to', n_post_superob, 'obs') + assert n_post_superob == n_target_post, RuntimeError() + + def to_pandas(self): + """Create xr.Dataset containing observations + Variables = observation types + """ + obs_dict_list = self.obs_to_dict() + + # convert to pandas.DataFrame + # each observation is one line + # columns: all observation contents + + # set keys from first obs (kind, loc3d, values) + keys = obs_dict_list[0].keys() + data = {key: [] for key in keys} + + # fill the data lists for each column of the DataFrame + for obs in obs_dict_list: + for key in keys: + data[key].append(obs[key]) + + return pd.DataFrame(index=range(len(obs_dict_list)), data=data) + + def to_dart(self, f): + """Write to obs_seq.out file in DART format + + Args: + f (str): path of file to write + """ + + def write_file(msg, output_path="./"): + try: + os.remove(output_path) + except OSError: + pass + + with open(output_path, "w") as f: + f.write(msg) + print(output_path, "saved.") + + def write_preamble(n_obs): + + num_obstypes = str(len(self.obstypes)) + txt = " obs_sequence \n obs_kind_definitions \n " + num_obstypes + + for (nr, obstype) in self.obstypes: + txt += "\n " + str(nr) + " " + obstype + nobs = str(n_obs) + txt += "\n num_copies: 2 num_qc: 1" + txt += "\n num_obs: " + nobs + txt += " max_num_obs: " + nobs + txt += "\n observations \n truth \n Quality Control \n" + txt += " first: 1 last: " + nobs + return txt + + def write_obs(i, obs, next_i_obs=None, prev_i_obs=None): + """Write the observation section of a obs_seq.out file + Args: + i (int): index of observation + obs (dict): observation data + + next_i_obs (int): index of next observation + prev_i_obs (int): index of previous observation + (in case it is the last) + + Returns + str + """ + + if next_i_obs: + line_link = " -1 " + str(next_i_obs) + " -1" + else: # last observation in file + line_link = " " + str(prev_i_obs) + " -1 -1" + + lon_rad = str(obs["loc3d"][0]) + lat_rad = str(obs["loc3d"][1]) + + out = ( + " \n".join( + [ + "\nOBS " + str(i), + str(obs["observations"]), + str(obs["truth"]), + str(obs["Quality Control"]), + line_link, + "obdef", + "loc3d", " ".join( + [ + lon_rad, lat_rad, + str(obs["loc3d"][2]), + str(obs["loc3d"][3]), + ] + ), + "kind", " " + str(obs["kind"]), + "".join(obs["metadata"]), + ] + ) + + str(i) + + "\n " + + obs["time"][0] + + " " + + obs["time"][1] + + "\n" + + str(obs["variance"]) + ) + return out + + n_obs = len(self.df) + outstr = write_preamble(n_obs) + + # loop through observations, concatenate obs sections + # DART format is linked list, needs index of next observation + # k ... 0, ..., len(df)-1 + # i_obs_this ... starts at 1 + for k, (_, obs) in enumerate(self.df.iterrows()): + i_obs_this = k + 1 + + if k < len(self.df) - 1: + i_obs_next = k + 2 + outstr += write_obs(i_obs_this, obs, next_i_obs=i_obs_next) + else: # last obs in file + i_obs_prev = k + outstr += write_obs(i_obs_this, obs, prev_i_obs=i_obs_prev) + + write_file(outstr, output_path=f) + def plot(self, box=None): + import matplotlib as mpl + + mpl.use("agg") + import matplotlib.pyplot as plt import xarray as xr georef = xr.open_dataset("/gpfs/data/fs71386/lkugler/run_DART/geo_em.d01.nc") diff --git a/tests/test_obsseq.py b/tests/test_obsseq.py new file mode 100644 index 0000000..9f56175 --- /dev/null +++ b/tests/test_obsseq.py @@ -0,0 +1,34 @@ +import sys +sys.path.append('../scripts') +import filecmp +from config.cfg import exp, cluster + +import obsseq + + +def test_oso(): + + input = cluster.scriptsdir + "/../tests/obs_seq.orig.out" + output = cluster.scriptsdir + "/../tests/obs_seq.test.out" + output_true = cluster.scriptsdir + "/../tests/obs_seq.superob.out" + + obs = obsseq.ObsSeq(input) + + # select a subset + obs.superob(window_km=50) + + # write to obs_seq.out in DART format + obs.to_dart(f=output) + + assert filecmp.cmp(output, output_true) + + +def test_osf(): + input = cluster.scriptsdir + "/../tests/obs_seq.final" + obs = obsseq.ObsSeq(input) + prior_Hx = obs.get_prior_Hx_matrix() + + # TODO: compare with given truth + +if __name__ == '__main__': + test_osf() -- GitLab