Skip to content
Snippets Groups Projects
Select Git revision
  • fc96c675c14428e654b9c60f56b9fa32ce3a0f48
  • master default protected
  • dev protected
  • replication_test
  • release-1.10 protected
  • release-1.9 protected
  • 551-init-broker-service-permissions
  • 549-test-oai-pmh
  • 545-saving-multiple-times-breaks-pid-metadata
  • 499-standalone-compute-service-2
  • 539-load-tests
  • hotfix/helm-chart
  • luca_ba_new_interface
  • 534-bug-when-adding-access-to-user-that-is-not-registered-at-dashboard-service
  • release-1.8 protected
  • 533-integrate-semantic-recommendation
  • feature/openshift
  • 518-spark-doesn-t-map-the-headers-correct
  • 485-fixity-checks
  • 530-various-schema-problems-with-subsets
  • release-1.7 protected
  • v1.10.2 protected
  • v1.10.1 protected
  • v1.10.0-rc13 protected
  • v1.10.0-rc12 protected
  • v1.10.0-rc11 protected
  • v1.10.0-rc10 protected
  • v1.10.0-rc9 protected
  • v1.10.0-rc8 protected
  • v1.10.0-rc7 protected
  • v1.10.0-rc6 protected
  • v1.10.0-rc5 protected
  • v1.10.0-rc4 protected
  • v1.10.0-rc3 protected
  • v1.10.0-rc2 protected
  • v1.10.0rc1 protected
  • v1.10.0rc0 protected
  • v1.10.0 protected
  • v1.9.3 protected
  • v1.9.2 protected
  • v1.9.2-rc0 protected
41 results

changelog.md

Blame
  • To find the state of this project's repository at the time of any of these versions, check out the tags.
    ls_data.py 23.46 KiB
    """Modelling the data output of a LifeScale run.
    
    Copyright (C) 2022  Andreas Hellerschmied <heller182@gmx.at>
    
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
    """
    
    import datetime as dt
    import numpy as np
    import os
    import pandas as pd
    
    
    class LSData:
        """Modelling the data output of a LifeScale run.
    
        Attributes
        ----------
        run_name : str
            Name of the LifeScale run.
        input_xlsm_filename : str
            Filename and path of the xlsm file written by LifeScale.
        output_dir_path : str
            Output directory filepath.
        start_time_dt : datetime object
            Start time of run.
        end_time_dt : datetime object
            End time of the run.
        settings_dict : dict
            Contains all settings from the Settings sheet of the input xlsm file. If more than one attributes are provides
            for a parameter (dictionary key), the dictionary item is a list. If no attribute is provided, the item is `None`
        df_panel_data : pandas dataframe
            Pandas dataframe that holds the data of the PanelData sheet of the input xlsm file.
        df_interval_analysis : pandas dataframe
            Pandas dataframe that holds the data of the IntervalAnalysis sheet plus additional data of the input xlsm file.
        df_masses : pandas dataframe
            Pandas dataframe that holds the data derived from the AcquisitionIntervals sheet of the input xlsm file.
        """
    
        def __init__(self,
                     run_name='',
                     guid='',
                     input_xlsm_filename='',
                     output_dir_path='',
                     start_time_dt=None,
                     end_time_dt=None,
                     settings_dict=None,
                     df_panel_data=None,
                     df_interval_analysis=None,
                     df_masses=None,
                     ):
            """Default constructor of class LSData."""
    
            # Check input arguments:
    
            # run_name and accession number:
            if isinstance(run_name, str):
                self.run_name = run_name
            else:
                raise TypeError('"run_name" needs to be a string')
            if isinstance(guid, str):
                self.guid = guid
            else:
                raise TypeError('"guid" needs to be a string')
    
            # output_dir_path:
            if isinstance(output_dir_path, str):
                self.output_dir_path = output_dir_path
            else:
                raise TypeError('"data_dir_path" needs to be a string')
    
            # xlsm_filename:
            if isinstance(input_xlsm_filename, str):
                self.input_xlsm_filename = input_xlsm_filename
            else:
                raise TypeError('"xlsm_filename" needs to be a string')
    
            # Start and end time of the run:
            if isinstance(start_time_dt, dt.datetime):
                self.start_time_dt = start_time_dt
            else:
                raise TypeError('"start_time_dt" needs to be a datetime object')
            if isinstance(end_time_dt, dt.datetime):
                self.end_time_dt = end_time_dt
            else:
                raise TypeError('"end_time_dt" needs to be a datetime object')
            if isinstance(end_time_dt, dt.datetime):
                self.end_time_dt = end_time_dt
            else:
                raise TypeError('"end_time_dt" needs to be a datetime object')
    
            # Settings dict:
            if isinstance(settings_dict, dict):
                self.settings_dict = settings_dict
            else:
                raise TypeError('"settings_dict" needs to be a dict.')
    
            # Dataframes:
            if isinstance(df_panel_data, pd.DataFrame):
                self.df_panel_data = df_panel_data
            else:
                raise TypeError('"df_panel_data" needs to be a pandas dataframe.')
            if isinstance(df_interval_analysis, pd.DataFrame):
                self.df_interval_analysis = df_interval_analysis
            else:
                raise TypeError('"df_interval_analysis" needs to be a pandas dataframe.')
            if isinstance(df_masses, pd.DataFrame):
                self.df_masses = df_masses
            else:
                raise TypeError('"df_masses" needs to be a pandas dataframe.')
    
            # Initialize additional attributes:
            pass
    
        @classmethod
        def from_xlsm_file(cls, input_xlsm_filename, verbose=True):
            """Constructor that generates and populates the LSData object from an xlsm LS output file.
    
            Parameters
            ----------
            input_xlsm_filename : str
                Filename and path of the xlsm file written by LifeScale.
            verbose : bool, optional (default = `True`)
                If `True`. status messages are written to the command line.
    
            Returns
            -------
            :py:obj:`.LSData`
                Contains all LS output data loaded from the given xlsm file.
            """
    
            REQUIRED_XLSM_SHEET_NAMES = [
                'AcquisitionIntervals',
                'IntervalAnalysis',
                'PanelData',
                'Settings',
            ]
    
            # Check input data:
            # input_xlsm_filename:
            if not input_xlsm_filename:
                raise AssertionError(f'The parameter "input_xlsm_filename" must not be empty!')
            if not isinstance(input_xlsm_filename, str):
                raise TypeError('"input_xlsm_filename" needs to be a string')
            if not os.path.isfile(input_xlsm_filename):
                raise AssertionError(f'XLSM file {input_xlsm_filename} does not exist!')
    
            # Load all needed sheets of the xlsm file to pandas dataframes:
            if verbose:
                print(f'Load data from xlsm file: {input_xlsm_filename}')
    
            # Get sheet names:
            xl_file = pd.ExcelFile(input_xlsm_filename)
            sheet_names = xl_file.sheet_names
    
            # Check, if all required sheets are available:
            if set(REQUIRED_XLSM_SHEET_NAMES) - set(sheet_names):
                missing_sheets = list(set(REQUIRED_XLSM_SHEET_NAMES) - set(sheet_names))
                raise AssertionError(f'The following sheets are missing the file {input_xlsm_filename}: {missing_sheets}')
    
            # PanelData:
            if verbose:
                print(f' - Parse PanelData')
            df_panel_data = xl_file.parse('PanelData')
            df_panel_data = remove_space_from_column_names(df_panel_data)
            df_panel_data['NumberOfIntervals'] = None
            if not (df_panel_data[['Id']].value_counts().count() == len(df_panel_data)):
                raise AssertionError(
                    f'The values in the column "Id" in PanelData is not unique!')
    
            # IntervalAnalysis:
            if verbose:
                print(f' - Parse IntervalAnalysis')
            df_interval_analysis = xl_file.parse('IntervalAnalysis')
            df_interval_analysis = remove_space_from_column_names(df_interval_analysis)
            df_interval_analysis['Status'] = None  # From AcquisitionIntervals
            df_interval_analysis['DetectedParticles'] = None  # From AcquisitionIntervals
            df_interval_analysis['MeasuredVolume'] = None  # From AcquisitionIntervals
            df_interval_analysis['ResonantFrequency'] = None  # From AcquisitionIntervals
            if not (df_interval_analysis[['Id', 'IntervalNumber']].value_counts().count() == len(df_interval_analysis)):
                raise AssertionError(
                    f'The combination if the values in the columns "Id" and "IntervalNumber" in IntervalAnalysis is not '
                    f'unique!')
    
                # Settings:
            if verbose:
                print(f' - Parse Settings')
            settings_dict = {}
            df_tmp = xl_file.parse('Settings', header=None)
            for idx, row in df_tmp.iterrows():
                short_row = row[1:]
                item_not_nan_max_idx = short_row.loc[~short_row.isna()].index.max()
                if item_not_nan_max_idx is np.nan:  # No items that are not NaN!
                    settings_dict[row[0]] = None
                else:
                    tmp_list = short_row.loc[:item_not_nan_max_idx].to_list()
                    num_items = len(tmp_list)
                    if num_items == 1:
                        settings_dict[row[0]] = tmp_list[0]
                    else:
                        settings_dict[row[0]] = tmp_list
            run_name = settings_dict['Name']
            if settings_dict['Guid'] is None:
                guid = ''
            else:
                guid = str(settings_dict['Guid'])
            start_time_dt = settings_dict['StartTime']
            end_time_dt = start_time_dt + dt.timedelta(settings_dict['ElapsedTime'] / (24 * 60))
    
            # # Settings (dataframe):
            # df_settings = xl_file.parse('Settings', header=None).transpose()
            # df_settings.columns = df_settings.iloc[0]
            # df_settings = df_settings[1:]
            # df_settings.reset_index(drop=True, inplace=True)
    
            # Masses (from sheet AcquisitionIntervals):
            if verbose:
                print(f' - Parse Masses')
            id_list = []
            well_list = []
            interval_num_list = []
            time_list = []
            masses_list = []
            volume_list = []
            total_num_particles_list = []
            transit_time_list = []
            pressure_drop_list = []
            time_list_length_old = 0
            # masses_list_length_old = 0
            # volume_list_length_old = 0
            # total_num_particles_list_length_old = 0
            # transit_time_list_length_old = 0
            # pressure_drop_list_length_old = 0
    
            current_id = None
            current_well = None
            current_interval_num = None
            current_detected_particles = None
    
            df_tmp = xl_file.parse('AcquisitionIntervals', header=None)
            for ids, row in df_tmp.iterrows():
                if row[0] == 'Id':
                    current_id = row[1]
                    if ~(df_interval_analysis['Id'] == current_id).any():
                        raise AssertionError(f'"ID="{current_id} is not available in IntervalAnalysis!')
                    if ~(df_panel_data['Id'] == current_id).any():
                        raise AssertionError(f'"ID="{current_id} is not available in PanelData!')
                    continue
                if row[0] == 'Well':
                    current_well = row[1]
                    if ~(df_interval_analysis['Well'] == current_well).any():
                        raise AssertionError(f'"Well="{current_well} is not available in IntervalAnalysis!')
                    if ~(df_panel_data['Well'] == current_well).any():
                        raise AssertionError(f'"Well="{current_well} is not available in PanelData!')
                    continue
                if row[0] == 'Antibiotic':
                    continue
                if row[0] == 'AntibioticConcentration':
                    continue
                if row[0] == 'NumberOfIntervals':
                    df_panel_data.loc[df_panel_data['Id'] == current_id, 'NumberOfIntervals'] = row[1]
                    continue
                if row[0] == 'IntervalNumber':
                    current_interval_num = row[1]
                    if ~(df_interval_analysis['IntervalNumber'] == current_interval_num).any():
                        raise AssertionError(
                            f'"IntervalNumber="{current_interval_num} is not available in IntervalAnalysis!')
                    continue
                if row[0] == 'StartTime':
                    continue
                if row[0] == 'EndTime':
                    continue
                if row[0] == 'DilutionFactor':
                    continue
                if row[0] == 'Status':
                    tmp_filter = (df_interval_analysis['Id'] == current_id) & (
                            df_interval_analysis['IntervalNumber'] == current_interval_num)
                    if len(tmp_filter[tmp_filter]) != 1:
                        raise AssertionError(
                            f'Invalid number of matches of "Id={current_id}" and "IntervalNumber={current_interval_num}" '
                            f'in IntervalAnalysis: {len(tmp_filter[tmp_filter])}')
                    df_interval_analysis.loc[tmp_filter, 'Status'] = row[1]
                    continue
                if row[0] == 'DetectedParticles':
                    tmp_filter = (df_interval_analysis['Id'] == current_id) & (
                            df_interval_analysis['IntervalNumber'] == current_interval_num)
                    if len(tmp_filter[tmp_filter]) != 1:
                        raise AssertionError(
                            f'Invalid number of matches of "Id={current_id}" and "IntervalNumber={current_interval_num}" '
                            f'in IntervalAnalysis: {len(tmp_filter[tmp_filter])}')
                    df_interval_analysis.loc[tmp_filter, 'DetectedParticles'] = row[1]
                    current_detected_particles = row[1]  # For cross-checks
                    continue
                if row[0] == 'MeasuredVolume':
                    tmp_filter = (df_interval_analysis['Id'] == current_id) & (
                            df_interval_analysis['IntervalNumber'] == current_interval_num)
                    if len(tmp_filter[tmp_filter]) != 1:
                        raise AssertionError(
                            f'Invalid number of matches of "Id={current_id}" and "IntervalNumber={current_interval_num}" '
                            f'in IntervalAnalysis: {len(tmp_filter[tmp_filter])}')
                    df_interval_analysis.loc[tmp_filter, 'MeasuredVolume'] = row[1]
                    continue
                if row[0] == 'ResonantFrequency':
                    tmp_filter = (df_interval_analysis['Id'] == current_id) & (
                            df_interval_analysis['IntervalNumber'] == current_interval_num)
                    if len(tmp_filter[tmp_filter]) != 1:
                        raise AssertionError(
                            f'Invalid number of matches of "Id={current_id}" and "IntervalNumber={current_interval_num}" '
                            f'in IntervalAnalysis: {len(tmp_filter[tmp_filter])}')
                    df_interval_analysis.loc[tmp_filter, 'ResonantFrequency'] = row[1]
                    continue
                if row[0] == 'Time':
                    tmp_list = row_to_list(row[1:])
                    if (len(tmp_list) == 0) and (current_detected_particles != 0):
                        raise AssertionError(
                            f'Number of "DetectedParticles={current_detected_particles}" does not match length of "Time" '
                            f'series (= {len(tmp_list)}) ')
                    time_list += tmp_list
                    continue
                if row[0] == 'Mass':
                    tmp_list = row_to_list(row[1:])
                    masses_list += tmp_list
                    continue
                if row[0] == 'Volume':
                    tmp_list = row_to_list(row[1:])
                    volume_list += tmp_list
                    continue
                if row[0] == 'TotalNumberOfParticlesThroughSensor':
                    tmp_list = row_to_list(row[1:])
                    total_num_particles_list += tmp_list
                    continue
                if row[0] == 'TransitTime':
                    tmp_list = row_to_list(row[1:])
                    transit_time_list += tmp_list
                    continue
                if row[0] == 'PressureDrop':
                    tmp_list = row_to_list(row[1:])
                    pressure_drop_list += tmp_list
    
                    # Finish data collection for the current Interval (sample):
                    # Check if the length of all data series of the current interval number match:
                    if not (len(time_list) == len(masses_list) == len(volume_list) == len(total_num_particles_list) == len(
                            transit_time_list) == len(pressure_drop_list)):
                        raise AssertionError(
                            f'The lengths of the data series in AcquisitionIntervals of "Well={current_well}" and '
                            f'"IntervalNumber={current_interval_num}" do not match!')
                    # Set up lists for well, id and interval number:
                    num_additional_items_in_data_series = len(time_list) - time_list_length_old
                    tmp_list = [current_id] * num_additional_items_in_data_series
                    id_list += tmp_list
                    tmp_list = [current_well] * num_additional_items_in_data_series
                    well_list += tmp_list
                    tmp_list = [current_interval_num] * num_additional_items_in_data_series
                    interval_num_list += tmp_list
                    # Reset counters:
                    time_list_length_old = len(time_list)
                    # masses_list_length_old = len(masses_list)
                    # volume_list_length_old = len(volume_list)
                    # total_num_particles_list_length_old = len(total_num_particles_list)
                    # transit_time_list_length_old = len(transit_time_list)
                    # pressure_drop_list_length_old = len(pressure_drop_list)
                    continue
    
            # Check if the length of all data series lists match:
            if not (len(time_list) == len(masses_list) == len(volume_list) == len(total_num_particles_list) == len(
                    transit_time_list) == len(pressure_drop_list) == len(id_list) == len(well_list) == len(
                    interval_num_list)):
                raise AssertionError(
                    f'The lengths of the data series in AcquisitionIntervals do not match!')
    
            # Create dataframe:
            df_masses_columns = ['Id', 'Well', 'IntervalNumber', 'Time', 'Mass', 'Volume',
                                 'TotalNumberOfParticlesThroughSensor', 'TransitTime', 'PressureDrop']
            df_masses = pd.DataFrame(list(
                zip(id_list, well_list, interval_num_list, time_list, masses_list, volume_list, total_num_particles_list,
                    transit_time_list, pressure_drop_list)),
                columns=df_masses_columns)
            df_masses['Id'] = df_masses['Id'].astype(int)
            df_masses['IntervalNumber'] = df_masses['IntervalNumber'].astype(int)
    
            # Sensor:
            # sensor_dict = {}
            # df_sensor = xl_file.parse('Sensor', header=None).transpose()
            # # - The df needs to have two rows => key and value for the dict!
            # if df_sensor.shape[0] != 2:
            #     raise AssertionError(f'More than one column of parameters in the sheet "Sensor!"')
            # for col_idx in df_sensor.columns:
            #     sensor_dict[df_sensor.loc[0, col_idx]] = df_sensor.loc[1, col_idx]
    
            if verbose:
                print(f'...finished loading and parsing data!')
    
            return cls(run_name=run_name,
                       guid=guid,
                       input_xlsm_filename=input_xlsm_filename,
                       start_time_dt=start_time_dt,
                       end_time_dt=end_time_dt,
                       settings_dict=settings_dict,
                       df_panel_data=df_panel_data,
                       df_interval_analysis=df_interval_analysis,
                       df_masses=df_masses,
                       )
    
        def export_csv_files(self, output_filepath, verbose=True, sort_by_time=False):
            """Write CSV files to output directory"""
            if verbose:
                print('Write output')
    
            # Checks:
            if not os.path.exists(output_filepath):
                raise AssertionError(f'The output path does not exist: {output_filepath}')
            self.output_dir_path = output_filepath
    
            if self.guid:
                filename_ending = f'{self.run_name}_{self.guid}.csv'
            else:
                filename_ending = f'{self.run_name}.csv'
    
            # Write PanelData:
            filename = os.path.join(output_filepath, f'Metadata_{filename_ending}')
            if verbose:
                print(f'Write PanelData to: {filename}')
            self.df_panel_data.to_csv(filename, index=False)
    
            # Write Masses:
    
            filename = os.path.join(output_filepath, f'Masses_{filename_ending}')
            if verbose:
                print(f'Write Masses to: {filename}')
            if sort_by_time:
                if verbose:
                    print(f' - Sort Masses by Time.')
                self.df_masses.sort_values(by=['Time']).to_csv(filename, index=False)
            else:
                self.df_masses.to_csv(filename, index=False)
    
            # Write IntervalAnalysis:
            filename = os.path.join(output_filepath, f'SamplesSummary_{filename_ending}')
            if verbose:
                print(f'Write IntervalAnalysis to: {filename}')
            self.df_interval_analysis.to_csv(filename, index=False)
    
            # TODO: Output format (number of digits)
            # TODO: Select columns for output (settable as parameter + default settings for each csv file)
            # TODO: Optionally order data series by time (parameter)?!
    
        def calc_sample_statistics(self, verbose=True):
            """Calculate statistical values for each sample and add it to the self.df_interval_analysis."""
            if verbose:
                print('Calculate sample statistics.')
    
            for idx, row in self.df_interval_analysis.iterrows():
                tmp_filter = (self.df_masses['Id'] == row.Id) & (self.df_masses['IntervalNumber'] == row.IntervalNumber)
                tmp_filter_1 = (self.df_interval_analysis['Id'] == row.Id) & \
                               (self.df_interval_analysis['IntervalNumber'] == row.IntervalNumber)
                self.df_interval_analysis.loc[tmp_filter_1, 'Mass_median'] = self.df_masses.loc[tmp_filter, 'Mass'].median()
                self.df_interval_analysis.loc[tmp_filter_1, 'Mass_std'] = self.df_masses.loc[tmp_filter, 'Mass'].std()
                self.df_interval_analysis.loc[tmp_filter_1, 'Mass_q25'] = \
                    self.df_masses.loc[tmp_filter, 'Mass'].quantile(0.25)
                self.df_interval_analysis.loc[tmp_filter_1, 'Mass_q75'] = \
                    self.df_masses.loc[tmp_filter, 'Mass'].quantile(0.75)
                self.df_interval_analysis.loc[tmp_filter_1, 'Mass_iqr'] = \
                    self.df_interval_analysis.loc[tmp_filter_1, 'Mass_q75'] - \
                    self.df_interval_analysis.loc[tmp_filter_1, 'Mass_q25']
    
        @property
        def get_number_of_observations(self):
            """Return the number of observations (items in the data series)."""
            if self.df_masses is not None:
                return len(self.df_masses)
            else:
                return 0
    
        @property
        def get_number_of_wells(self):
            """Return the number wells."""
            if self.df_panel_data is not None:
                return len(self.df_panel_data)
            else:
                return 0
    
        @property
        def get_number_of_intervals(self):
            """Return the number intervals."""
            if self.df_interval_analysis is not None:
                return len(self.df_interval_analysis)
            else:
                return 0
    
        def __str__(self):
            if self.run_name is not None:
                return f'Run "{self.run_name}" with {self.get_number_of_observations} observations in ' \
                       f'{self.get_number_of_intervals} intervals and {self.get_number_of_wells} wells. '
            else:
                return f'Not data available yet.'
    
    
    def remove_space_from_column_names(df):
        """Removes white space from column names of input dataframe."""
        col_names = df.columns
        col_names_corrected = []
        for col_name in col_names:
            col_names_corrected.append(col_name.strip())
        df.columns = col_names_corrected
        return df
    
    
    def row_to_list(row) -> list:
        """Convert dataframe row to list and remove all trailing NaN values."""
        item_not_nan_max_idx = row.loc[~row.isna()].index.max()
        if item_not_nan_max_idx is np.nan:  # No items that are not NaN!
            out_list = []
        else:
            out_list = row.loc[:item_not_nan_max_idx].to_list()
        return out_list
    
    
    if __name__ == '__main__':
        """Main function, primarily for debugging and testing."""
        xlsm_filename = '../../data/Example_several_wells/Vibrio_starvation_24.11.22_221125_163425.xlsm'
        output_directory = '../../output/'
    
        ls_data = LSData.from_xlsm_file(input_xlsm_filename=xlsm_filename)
        ls_data.export_csv_files(output_directory)
    
        print('test')