Skip to content
Snippets Groups Projects
Select Git revision
  • d8f455cecf58f177a0c550cdbd1ce18a2cdf2d09
  • consistent_config default protected
2 results

setup.py

Blame
  • obsseq.py 25.70 KiB
    """Read, modify and save DART obs_seq files.
    
    Not usable for creating obs_seq, since it does not know which metadata is necessary for each type
    """
    
    import os, sys, shutil, warnings
    import numpy as np
    import pandas as pd
    
    from config.cfg import exp
    from config.cluster import cluster
    from dartwrf.utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove
    
    
    def plot_box(m, lat, lon, label="", **kwargs):
        """"Draw bounding box
    
        Args:
            m (mpl_toolkits.basemap.Basemap)
            lat, lon (np.array)     2-dimensional arrays of longitudes, latitudes
        """
    
        m.drawgreatcircle(
            lon[0, -1], lat[0, -1], lon[0, 0], lat[0, 0], del_s=20, zorder=4, **kwargs
        )
        m.drawgreatcircle(
            lon[0, -1], lat[0, -1], lon[-1, -1], lat[-1, -1], del_s=20, zorder=4, **kwargs
        )
        m.drawgreatcircle(
            lon[-1, 0], lat[-1, 0], lon[-1, -1], lat[-1, -1], del_s=20, zorder=4, **kwargs
        )
        m.drawgreatcircle(
            lon[0, 0],
            lat[0, 0],
            lon[-1, 0],
            lat[-1, 0],
            del_s=20,
            zorder=4,
            label=label,
            **kwargs
        )
    
    
    def degrees_to_rad(degr):
        """Convert to DART convention = radians"""
        if degr < 0:
            degr += 360
        return degr / 360 * 2 * np.pi
    
    
    def rad_to_degrees(rad):
        """Convert to degrees from DART convention (radians)"""
        assert rad >= 0, "no negative radians allowed"
        degr = rad / np.pi * 180
    
        # convert degr (180,360) to (-180,0)
        if degr > 180:
            degr -= 360
        return degr
    
    
    
    
    class ObsRecord(pd.DataFrame):
        """Content of obsseq.ObsSeq instances
        provides additional methods for pd.DataFrame
        created inside an ObsSeq instance
        """
        @property
        def _constructor(self):
            # This ensures that pandas operations return ObsRecord instances
            # e.g. subsetting df with df[3:4] returns ObsRecord
            return ObsRecord
    
        def get_prior_Hx(self):
            """Return prior Hx array (n_obs, n_ens)"""
            return self._get_model_Hx('prior')
    
        def get_posterior_Hx(self):
            """Return posterior Hx array (n_obs, n_ens)"""
            try:
                return self._get_model_Hx('posterior')
            except Exception as e:
                # this is useful if we evaluate a 'posterior state'
                # then the variable is called 'prior' while it really is a posterior
                warnings.warn(str(e)+' returning prior from this file instead!')
                return self._get_model_Hx('prior')
    
        def get_truth_Hx(self):
            return self['truth'].values
    
        def _get_model_Hx(self, what):
            """Return ensemble member info
    
            Args:
                self (pd.DataFrame):      usually self.self
                what (str):             'prior' or 'posterior'
    
            Returns:
                np.array
    
            Works with all observations (self = self.self) 
            or a subset of observations (self = self.self[343:348])
            """
            if what not in  ['prior', 'posterior']:
                raise ValueError
    
            # which columns do we need?
            keys = self.columns
            keys_bool = np.array([what+' ensemble member' in a for a in keys])
    
            # select columns in DataFrame
            Hx = self.iloc[:, keys_bool]
    
            # consistency check: compute mean over ens - compare with value from file
            assert np.allclose(Hx.mean(axis=1), self[what+' ensemble mean'])
            return Hx.values
    
        def get_lon_lat(self):
            lats = np.empty(len(self), np.float32)
            lons = lats.copy()
    
            for i, (i_obs, values) in enumerate(self.loc3d.items()):
                x, y, z, z_coord = values
    
                # convert radian to degrees lon/lat
                lon = rad_to_degrees(x)
                lat = rad_to_degrees(y)
                lons[i] = lon
                lats[i] = lat
    
            return pd.DataFrame(index=self.index, data=dict(lat=lats, lon=lons))
    
        def get_from_cartesian_grid(self, i, j, k):
            """Get the observation using cartesian grid indices (ix, iy, iz)
            """
            # find indices of observations within pandas.DataFrame
            return self.iloc[self.i_obs_grid[i, j, k].ravel()]
    
        def determine_nlayers(self):
            nlayers = 1  # first guess
    
            if len(exp.observations) == 1:
                # obscfg = exp.observations[0]
                # if 'heights' in obscfg:
                #     nlayers = len(obscfg['heights'])
    
                heights = [loc_xyz[2] for loc_xyz in self['loc3d']]
                heights = sorted(heights)
                h0 = heights[0]
                for i, h in enumerate(heights):
                    if h != h0:
                        break
                obs_per_layer = i  # if it fails at 1, there is 1 obs per layer
    
                nlayers = int(len(self)/obs_per_layer)
            else:
                warnings.warn('I can only guess the number of layers from this file.')
            return nlayers
    
        def superob(self, window_km):
            """Select subset, average, overwrite existing obs with average
    
            TODO: allow different obs types (KIND)
            TODO: loc3d overwrite with mean
            Metadata is copied from the first obs in a superob-box
    
            Note:
                This routine discards observations (round off)
                e.g. 31 obs with 5 obs-window => obs #31 is not processed
    
            Args:
                window_km (numeric):        horizontal window edge length
                                            includes obs on edge
                                            25x25 km with 5 km obs density
                                            = average 5 x 5 observations
            """
            def calc_deg_from_km(distance_km, center_lat):
                """Approximately calculate distance in degrees from meters
                Input: distance in km; degree latitude
                Output: distance in degrees of latitude, longitude
                """
                assert distance_km > 0, "window size <= 0, must be > 0"
                dist_deg_lat = distance_km / km_per_degrees
                dist_deg_lon = dist_deg_lat * np.cos(center_lat * np.pi / 180)
                return dist_deg_lat, dist_deg_lon
    
            def calc_km_from_deg(deg_lat, deg_lon, center_lat):
                dist_km_lat = deg_lat * km_per_degrees
                dist_km_lon = deg_lon * km_per_degrees * np.cos(center_lat * np.pi / 180)
                return dist_km_lat, dist_km_lon
    
    
            debug = False
            radius_earth_meters = 6.371 * 1e6
            m_per_degrees = np.pi * radius_earth_meters / 180  # m per degree latitude
            km_per_degrees = m_per_degrees / 1000
    
            # determine obs density (approx)
            # TODO: error prone section
            # from IPython import embed; embed()
            coords = self.get_lon_lat()
            dx_obs_lat_deg = coords.lat.diff().max()
            km_lat, _ = calc_km_from_deg(dx_obs_lat_deg, np.nan, 45)
            obs_spacing_km = int(km_lat)
    
            # how many observations in x/y direction in one superob box
            # in total there are win_obs**2 many observations inside
            win_obs = int(window_km / obs_spacing_km)
            if debug:
                print('window_km=', window_km)
                print('obs spacing=', obs_spacing_km)
                print("window (#obs in x/y)=", win_obs)
    
            # superob in case of multiple layers, only implemented for single obstype
            nlayers = self.determine_nlayers()
            
            # indices of observations (starting from 0)
            i_obs_grid = self.index.values  
    
            # get the observation indices from obs_seq (list)
            # onto a cartesian grid (ix, iy, iz)
            gridpoints_per_layer = len(i_obs_grid)/nlayers
            nx = int(gridpoints_per_layer ** 0.5)
            self.nx = nx
            i_obs_grid = i_obs_grid.reshape(nx, nx, nlayers)
            self.i_obs_grid = i_obs_grid
    
            # loop through columns/rows
            # avoid loop in (lat,lon) space as coordinates are non-cartesian
            # i.e. first column of observations has different longitudes!
    
            out = self.drop(self.index)  # this df will be filled
            boxes = []
    
            for i in range(0, nx+1 - win_obs, win_obs):
                for j in range(0, nx+1 - win_obs, win_obs):
                    for k in range(0, nlayers):
                        if debug: print(i,j,k)
    
                        # find indices of observations within superob window
                        # i_obs_box = i_obs_grid[i : i + win_obs, j : j + win_obs, k].ravel()
    
                        if debug:
                            print("index x from", i, 'to', i + win_obs)
                            print("index y from", j, 'to', j + win_obs)
                            print("obs indices box=", i_obs_grid[i : i + win_obs, j : j + win_obs, k])
    
                        # find observations within superob window
                        obs_box = self.get_from_cartesian_grid(slice(i, i + win_obs),
                                                               slice(j, j + win_obs),
                                                               k)
    
                        
                        # save boundary of box to list, for plotting later
                        eps = dx_obs_lat_deg/2  # for plotting
                        eps2 = eps*0.8  # for plotting
                        lat1, lon1 = self.get_from_cartesian_grid(i, j, k).get_lon_lat().values[0]
                        lat2, lon2 = self.get_from_cartesian_grid(i+win_obs-1, j, k).get_lon_lat().values[0]
                        lat3, lon3 = self.get_from_cartesian_grid(i, j+win_obs-1, k).get_lon_lat().values[0]
                        lat4, lon4 = self.get_from_cartesian_grid(i+win_obs-1, j+win_obs-1, k).get_lon_lat().values[0]
    
                        boxes.append(([lat1-eps2, lat2+eps2, lat3-eps2, lat4+eps2],
                                    [lon1-eps, lon2-eps, lon3+eps, lon4+eps]))
    
                        # average the subset
                        # metadata are assumed to be equal
                        obs_mean = obs_box.iloc[0]
    
                        # average spread and other values
                        for key in obs_box:
                            if key in ['loc3d', 'kind', 'metadata', 'time']:
                                pass
                            elif 'spread' in key:
                                # stdev of mean of values = sqrt(mean of variances)
                                obs_mean.at[key] = np.sqrt((obs_box[key]**2).mean())
                            elif key == 'variance':
                                # variance of mean = sum(variances)/n^2
                                obs_mean.at[key] = obs_box[key].sum()/obs_box[key].size**2
                            else:
                                obs_mean.at[key] = obs_box[key].mean()
    
                        # define location of superobservation
                        if win_obs % 2 == 0:
                            # even number of observations in x-direction
                            # there is no center obs
                            raise NotImplementedError()
                        else:
                            # odd number of observations in x-direction
                            # -> there is an observation in the middle
                            # take the location of that obs
                            # int(win_obs/2) is the index of the center element when indices start at 0
                            i_obs_center = i_obs_grid[i + int(win_obs/2), j + int(win_obs/2), k]
                            obs_mean.at['loc3d'] = self.iloc[i_obs_center]['loc3d']
    
                        # check if all obs share the same vertical position
                        heights_in_box = np.array([a[2] for a in obs_box['loc3d']])
                        assert np.allclose(heights_in_box, obs_mean['loc3d'][2])
    
                        if debug:
                            print("pre_avg:", obs_box.head())
                            print("avg:", obs_mean)
    
                        out = out.append(obs_mean)
    
            n_pre_superob = len(self)
            n_post_superob = len(out)
            out.attrs['boxes'] = boxes
    
            # quick after check - does the output obs number match with the expected number?
            n_windows_x = int((n_pre_superob/nlayers)**.5/win_obs)  # assume square of obs
            n_target_post = n_windows_x**2 * nlayers  # number of windows
            print('superob from', n_pre_superob, 'obs to', n_post_superob, 'obs')
            if n_post_superob != n_target_post:
                raise RuntimeError('expected', n_target_post, 'superobservations, but created', 
                                    n_post_superob)
    
            out.attrs['df_pre_superob'] = self  # original data
            self = out  # overwrite dataframe
            return self  # output itself (allows to write it to a new name)
    
    
    class ObsSeq(object):
        """Read, manipulate, save obs_seq.out / final files
        """
    
        def __init__(self, filepath):
            self.ascii = open(filepath, "r").readlines()
    
            self.get_preamble_content()
            self.read_preamble()
    
            self.df = ObsRecord(self.to_pandas())
    
        def __str__(self):
            return self.df.__str__()
    
        def get_preamble_content(self):
            """Split the obs_seq.out file into two parts
            1) First lines of obs_seq.out file until the first observation message
            2) Observation contents
            """
            for i, line in enumerate(self.ascii):
                if "OBS " in line:
                    break
            if i == len(self.ascii)-1:
                raise RuntimeError('did not find `OBS ` in file!')
    
            self.preamble = self.ascii[:i]
            self.content = self.ascii[i:]
    
        def read_preamble(self):
            """Defines 
            self.obstypes (tuple(kind_nr, kind_descriptor)) for each obs type
            """
            # how many obstypes
            for i, line in enumerate(self.preamble):
                if "obs_type_definitions" in line or "obs_kind_definitions" in line:
                    break
    
            # check if we found definitions before end of file
            if i == len(self.preamble)-1:  # end of file
                raise RuntimeError('did not find `obs_type_definitions` or `obs_kind_definitions` in file')
    
            line_n_obstypes = i + 1
            n_obstypes = int(self.preamble[line_n_obstypes])  # read integer from file
    
            # read obs type kind (number and description)
            obstypes = []
            for k in range(1, n_obstypes + 1):
                kind_nr, kind_type = self.preamble[line_n_obstypes + k].split()
                kind_nr = int(kind_nr)
                obstypes.append((kind_nr, kind_type))
            self.obstypes = obstypes
    
            # read num_copies (2 for obs_seq.out, 44 for obs_seq.final with 40 ens members)
            num_copies = False
            for line in self.preamble:
                if 'num_copies:' in line:
                    _, num_copies, _, num_qc = line.split()
                    break
            if not num_copies:
                raise RuntimeError('did not find `num_copies:` in '+str(self.preamble))
            num_copies = int(num_copies)
            num_qc = int(num_qc)
    
            # read num_obs
            num_obs = False
            for i, line in enumerate(self.preamble):
                if 'num_obs:' in line:
                    _, num_obs, _, max_num_obs = line.split()
                    break
            if not num_obs:
                raise RuntimeError('did not find `num_obs:` in '+str(self.preamble))
            assert num_obs == max_num_obs, NotImplementedError()
            self.num_obs = int(num_obs)
    
            # read keys for values (e.g. 'observations', 'truth', 'prior ensemble mean',)
            keys = []
            line_start_keys = i+1
            for j in range(line_start_keys, line_start_keys+num_copies+num_qc):
                line = self.preamble[j]
                keys.append(line.strip())
    
            self.keys_for_values = keys
    
        def obs_to_dict(self):
            """Convert an obs_seq.out file to a dictionary"""
            obs_begin_str = "OBS  "
    
            def check_obs_begin(line):
                if not obs_begin_str in line:
                    raise RuntimeError("This is not the first line: "+str(line))
    
            def content_to_list(content):
                """Split obs_seq.out content (lines of str) into list of observation-sections
    
                Args:
                    content (list of str) : contains lines of file
                
                Returns 
                    list of list of str
                """
                obs_list = []
                i = 0
                try:
                    check_obs_begin(content[0])
                except:
                    print(content)
                    raise
                obs_begin = 0
    
                for i, line in enumerate(content):
                    if i == 0:
                        continue
    
                    if obs_begin_str in line:  # then this line is beginning of obs
                        obs_end = i - 1  # previous line
                        obs_list.append(content[obs_begin : obs_end + 1])
                        obs_begin = i  # next obs starts here
    
                    if i == len(content) - 1:  # last line
                        obs_end = i
                        obs_list.append(content[obs_begin : obs_end + 1])
    
                if not len(obs_list) > 1:
                    warnings.warn('len(obs_list)='+str(len(obs_list)))
    
                # consistency check to ensure that all observations have been detected
                if len(obs_list) != self.num_obs:
                    raise RuntimeError('num_obs read in does not match preamble num_obs '
                                       +str(len(obs_list))+' != '+str(self.num_obs))
                return obs_list
    
            def one_obs_to_dict(obs_list_entry):
                """"""
                out = dict()
                lines = obs_list_entry
    
                try:
                    check_obs_begin(lines[0])
                except:
                    print(lines)
                    raise
    
                for i, line in enumerate(lines):
                    if "loc3d" in line:  # find location
                        line_loc = i + 1
    
                    if "kind" in line:  # find obs kind
                        line_kind = i + 1
    
                for k, key in enumerate(self.keys_for_values):
                    out[key] = float(lines[1+k].strip())
    
                x, y, z, z_coord = lines[line_loc].split()
                out["loc3d"] = float(x), float(y), float(z), int(z_coord)
                out["kind"] = int(lines[line_kind].strip())
                out["metadata"] = lines[line_kind + 1 : -2]
                out["time"] = tuple(lines[-2].split())
                out["variance"] = float(lines[-1].strip())
                return out
    
            def obs_list_to_dict(obs_list):
                # wraps `one_obs_to_dict`
                obs_list_dict = []  # list of dict
                for entry in obs_list:
    
                    # convert list of lines to dictionary
                    # with (kind, loc3d, values, ...) as keys
                    obs_dict = one_obs_to_dict(entry)
    
                    obs_list_dict.append(obs_dict)  # append dict to list
                return obs_list_dict
    
            # content = [line1, ...]
            # obs_list = [obs1, obs2, obs3, ...]
            # where obs1 = [line1, line2, ...]; all lines pertaining to one obs
            obs_list = content_to_list(self.content)
    
            # each obs is one dictionary
            list_of_obsdict = obs_list_to_dict(obs_list)
            return list_of_obsdict
    
        def to_pandas(self):
            """Create pd.DataFrame with rows=observations
            """
            obs_dict_list = self.obs_to_dict()
    
            # convert to pandas.DataFrame
            # each observation is one line
            # columns: all observation contents
    
            # set keys from first obs (kind, loc3d, values)
            keys = obs_dict_list[0].keys()
            data = {key: [] for key in keys}
    
            # fill the data lists for each column of the DataFrame
            for obs in obs_dict_list:
                for key in keys:
                    data[key].append(obs[key])
    
            return pd.DataFrame(index=range(len(obs_dict_list)), data=data)
    
        def to_dart(self, f):
            """Write to obs_seq.out file in DART format
    
            Args:
                f (str):    path of file to write
            """
    
            def write_file(msg, output_path="./"):
                try:
                    os.remove(output_path)
                except OSError:
                    pass
    
                with open(output_path, "w") as f:
                    f.write(msg)
                    print(output_path, "saved.")
    
            def write_preamble(n_obs):
    
                num_obstypes = str(len(self.obstypes))
                txt = " obs_sequence \n obs_kind_definitions \n     " + num_obstypes
    
                for (nr, obstype) in self.obstypes:
                    txt += "\n         " + str(nr) + " " + obstype
                nobs = str(n_obs)
                txt += "\n num_copies:            2  num_qc:            1"
                txt += "\n num_obs:           " + nobs
                txt += "   max_num_obs:            " + nobs
                txt += "\n observations \n truth \n Quality Control \n"
                txt += " first:            1  last:            " + nobs
                return txt
    
            def write_obs(i, obs, next_i_obs=None, prev_i_obs=None):
                """Write the observation section of a obs_seq.out file
    
                Args:
                    i   (int):      index of observation
                    obs (dict):     observation data
    
                    next_i_obs (int):   index of next observation
                    prev_i_obs (int):   index of previous observation
                                        (in case it is the last)
    
                Returns
                    str
                """
    
                if next_i_obs:
                    line_link = "        -1           " + str(next_i_obs) + "          -1"
                else:  # last observation in file
                    line_link = "          " + str(prev_i_obs) + "           -1          -1"
    
                lon_rad = str(obs["loc3d"][0])
                lat_rad = str(obs["loc3d"][1])
    
                out = (
                    " \n".join(
                        [
                            "\nOBS         " + str(i),
                            str(obs["observations"]),
                            str(obs["truth"]),
                            str(obs["Quality Control"]),
                            line_link,
                            "obdef",
                            "loc3d", "    ".join(
                                [
                                    lon_rad, lat_rad,
                                    str(obs["loc3d"][2]), 
                                    str(obs["loc3d"][3]),
                                ]
                            ),
                            "kind", "         " + str(int(obs["kind"])),
                            "".join(obs["metadata"]),
                        ]
                    )
                    + " \n " + obs["time"][0] + "     " + obs["time"][1]
                    + " \n " + str(obs["variance"])
                )
                return out
    
            n_obs = len(self.df)
            outstr = write_preamble(n_obs)
    
            # loop through observations, concatenate obs sections
            # DART format is linked list, needs index of next observation
            # k     ...         0, ..., len(df)-1
            # i_obs_this ...    starts at 1
            for k, (_, obs) in enumerate(self.df.iterrows()):
                i_obs_this = k + 1
    
                if k < len(self.df) - 1:
                    i_obs_next = k + 2
                    outstr += write_obs(i_obs_this, obs, next_i_obs=i_obs_next)
                else:  # last obs in file
                    i_obs_prev = k
                    outstr += write_obs(i_obs_this, obs, prev_i_obs=i_obs_prev)
    
            write_file(outstr, output_path=f)
    
    
        def plot(self, f_out="./map_obs_superobs.png"):
            print('plotting obs...')
            import matplotlib as mpl
    
            mpl.use("agg")
            import matplotlib.pyplot as plt
            import xarray as xr
    
            georef = xr.open_dataset("/gpfs/data/fs71386/lkugler/run_DART/geo_em.d01.nc")
            lon = georef.XLONG_M.values.squeeze()
            lat = georef.XLAT_M.values.squeeze()
    
            from mpl_toolkits.basemap import Basemap
    
            fig, ax = plt.subplots(figsize=(12, 12))
    
            # m = Basemap(projection='geos', lon_0=lon[100,100], resolution='i',
            #             llcrnrlon=lon[0,0]-1,llcrnrlat=lat[0,0]-3,
            #             urcrnrlon=lon[-1,-1]+1,urcrnrlat=lat[-1,-1]+3)
            m = Basemap(
                projection="lcc",
                resolution="h",
                lon_0=lon[100, 100],
                lat_0=lat[25, 25],
                lat_1=lat[100, 100],
                lat_2=lat[175, 175],
                llcrnrlon=lon[0, 0] - 2,
                llcrnrlat=lat[0, 0] - 2,
                urcrnrlon=lon[-1, -1] + 2,
                urcrnrlat=lat[-1, -1] + 2,
            )
            # m.fillcontinents(color='0.1', lake_color='0.2')
    
            m.drawlsmask(land_color="0.1", ocean_color="0.1")
            m.drawcoastlines(color="white")
            m.drawcountries(color="white")
    
            plot_box(m, lat, lon, label="domain", color="green", lw=1) #4)
    
            # OBSERVATIONS
            original_df = self.df.attrs['df_pre_superob']
            coords = original_df.get_lon_lat()
            lats = coords.lat.values
            longs = coords.lon.values
            coords = zip(lats, longs)
    
            label="observed pixel"
            for lati, long in coords:
                m.plot(long, lati, ".",
                    markersize=0.4, #4,
                    latlon=True,
                    color="grey",
                    label=label,
                    zorder=4,
                )
                label=''
    
            # after superob
            coords = self.df.get_lon_lat()
            lats = coords.lat.values
            longs = coords.lon.values
            coords = zip(lats, longs)
    
            label = 'superobservation'
            for lati, long in coords:
                m.plot(long, lati, ".",
                    markersize=0.5, #5,
                    latlon=True,
                    color="red",
                    label=label,
                    zorder=4,
                )
                label = ''
    
    
            #from IPython import embed; embed()
            if self.df.attrs['boxes']:
                label = 'superob'
                for lats, lons in self.df.attrs['boxes']:
                    lats, lons = np.meshgrid(lats, lons)
    
                    plot_box(m, lats, lons, label=label, color="white", lw=0.1) #1)
                    label = ''
    
            plt.legend()
            plt.tight_layout()
            plt.savefig(f_out, dpi=600)
            print(f_out, "saved.")
    
    
    if __name__ == "__main__":
        # for testing purposes
    
        # f = cluster.scriptsdir + "/../tests/obs_seq.orig.out"
        f = "/home/fs71386/lkugler/data/sim_archive/exp_v1.21_P3_wbub7_VIS_obs2-10_loc20/obs_seq_out/2008-07-30_12:30_obs_seq.out-orig"
        obs = ObsSeq(f)
    
        # select a subset (lat-lon)
        obs.df = obs.df.superob(window_km=10)
        # print(type(obs.df))
    
        obs.plot(f_out="./map_obs_superobs.png")
    
        # write to obs_seq.out in DART format
        # obs.to_dart(f=cluster.dartrundir + "/obs_seq.out")