Skip to content
Snippets Groups Projects
Commit 63c2ecaa authored by Andreas Hellerschmied's avatar Andreas Hellerschmied
Browse files

Merge branch 'feature_converter' into 'develop'

Draft: Feature converter

See merge request hellerdev/lifescale_gui!1
parents 3e2394d8 6f14de62
No related branches found
No related tags found
No related merge requests found
...@@ -30,6 +30,8 @@ venv/ ...@@ -30,6 +30,8 @@ venv/
ENV/ ENV/
env/ env/
### JetBrains template ### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm # Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 # Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
......
...@@ -9,10 +9,22 @@ ...@@ -9,10 +9,22 @@
# Questions: # Questions:
- Which license?
- Is it OK for Joseph Elsherbini to use/change his code?
- Is a "raw data viewer" needed to show the content of the binary raw data files?
- Individual files, or from one experiment?
- Load to pandas DF and show in table in GUI?
- Is editing required?
- Are any search options needed?
- Format conversion, e.g. to csv files needed?
- Should there be the possibility to store the current GUI settings (data and processing parameters, etc.) - Should there be the possibility to store the current GUI settings (data and processing parameters, etc.)
- Should there be the possibility to load default parameters and/or previously saved parameters (e.g. from json file)? - Should there be the possibility to load default parameters and/or previously saved parameters (e.g. from json file)?
- Is the json file required in the current version, e.g. for documentation (processing parameters, etc.) or further analysis? - Is the json file required in the current version, e.g. for documentation (processing parameters, etc.) or further analysis?
- Are there any default values for the processing parameters? Should they be stored anywhere (e.g. json file)? - Are there any default values for the processing parameters? Should they be stored anywhere (e.g. json file)?
- Standard values defined in pgm code (e.g. parameters.py file). These parameters are loaded on the initial start
- Save/Load parameters from GUI to json file => Define "standard parameter set" that way
## Processing: ## Processing:
- Is the input data (raw data) always organized in the same way, i.e.: Raw data folder that contains data files with the - Is the input data (raw data) always organized in the same way, i.e.: Raw data folder that contains data files with the
......
"""LifeSclae GUI is a utility program for handling data output.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
:Authors:
Andreas Hellerschmied (heller182@gmx.at)
"""
__version__ = '0.0.1'
__author__ = 'Andreas Hellerschmied'
__git_repo__ = 'tba'
__email__ = 'heller182@gmx.at'
__copyright__ = '(c) 2022 Andreas Hellerschmied'
\ No newline at end of file
"""Modelling the data output of a LifeScale run.
Copyright (C) 2022 Andreas Hellerschmied <heller182@gmx.at>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import datetime as dt
import numpy as np
import os
import pandas as pd
class LSData:
"""Modelling the data output of a LifeScale run.
Attributes
----------
run_name : str
Name of the LifeScale run.
input_xlsm_filename : str
Filename and path of the xlsm file written by LifeScale.
output_dir_path : str
Output directory filepath.
start_time_dt : datetime object
Start time of run.
end_time_dt : datetime object
End time of the run.
settings_dict : dict
Contains all settings from the Settings sheet of the input xlsm file. If more than one attributes are provides
for a parameter (dictionary key), the dictionary item is a list. If no attribute is provided, the item is `None`.
df_panel_data : pandas dataframe
Pandas dataframe that holds the data of the PanelData sheet of the input xlsm file.
df_interval_analysis : pandas dataframe
Pandas dataframe that holds the data of the IntervalAnalysis sheet plus additional data of the input xlsm file.
df_masses : pandas dataframe
Pandas dataframe that holds the data derived from the AcquisiotionIntervals sheet of the input xlsm file.
"""
def __init__(self,
run_name='',
accession_number='',
input_xlsm_filename='',
output_dir_path='',
start_time_dt=None,
end_time_dt=None,
settings_dict=None,
df_panel_data=None,
df_interval_analysis=None,
df_masses=None,
):
"""Default constructor of class LSData."""
# Check input arguments:
# run_name and accession number:
if isinstance(run_name, str):
self.run_name = run_name
else:
raise TypeError('"run_name" needs to be a string')
if isinstance(accession_number, str):
self.accession_number = accession_number
else:
raise TypeError('"accession_number" needs to be a string')
# output_dir_path:
if isinstance(output_dir_path, str):
self.output_dir_path = output_dir_path
else:
raise TypeError('"data_dir_path" needs to be a string')
# xlsm_filename:
if isinstance(input_xlsm_filename, str):
self.input_xlsm_filename = input_xlsm_filename
else:
raise TypeError('"xlsm_filename" needs to be a string')
# Start and end time of the run:
if isinstance(start_time_dt, dt.datetime):
self.start_time_dt = start_time_dt
else:
raise TypeError('"start_time_dt" needs to be a datetime object')
if isinstance(end_time_dt, dt.datetime):
self.end_time_dt = end_time_dt
else:
raise TypeError('"end_time_dt" needs to be a datetime object')
if isinstance(end_time_dt, dt.datetime):
self.end_time_dt = end_time_dt
else:
raise TypeError('"end_time_dt" needs to be a datetime object')
# Settings dict:
if isinstance(settings_dict, dict):
self.settings_dict = settings_dict
else:
raise TypeError('"settings_dict" needs to be a dict.')
# Dataframes:
if isinstance(df_panel_data, pd.DataFrame):
self.df_panel_data = df_panel_data
else:
raise TypeError('"df_panel_data" needs to be a pandas dataframe.')
if isinstance(df_interval_analysis, pd.DataFrame):
self.df_interval_analysis = df_interval_analysis
else:
raise TypeError('"df_interval_analysis" needs to be a pandas dataframe.')
if isinstance(df_masses, pd.DataFrame):
self.df_masses = df_masses
else:
raise TypeError('"df_masses" needs to be a pandas dataframe.')
# Initialize additional attributes:
pass
@classmethod
def from_xlsm_file(cls, input_xlsm_filename, verbose=True):
"""Constructor that generates and populates the LSData object from an xlsm LS output file.
Parameters
----------
input_xlsm_filename : str
Filename and path of the xlsm file written by LifeScale.
verbose : bool, optional (default = `True`)
If `True`. status messages are written to the command line.
Returns
-------
:py:obj:`.LSData`
Contains all LS output data loaded from the given xlsm file.
"""
REQUIRED_XLSM_SHEET_NAMES = [
'AcquisitionIntervals',
'IntervalAnalysis',
'PanelData',
'Settings',
]
# Check input data:
# input_xlsm_filename:
if not input_xlsm_filename:
raise AssertionError(f'The parameter "input_xlsm_filename" must not be empty!')
if not isinstance(input_xlsm_filename, str):
raise TypeError('"input_xlsm_filename" needs to be a string')
if not os.path.isfile(input_xlsm_filename):
raise AssertionError(f'XLSM file {input_xlsm_filename} does not exist!')
# Load all needed sheets of the xlsm file to pandas dataframes:
if verbose:
print(f'Load data from xlsm file: {input_xlsm_filename}')
# Get sheet names:
xl_file = pd.ExcelFile(input_xlsm_filename)
sheet_names = xl_file.sheet_names
# Check, if all required sheets are available:
if set(REQUIRED_XLSM_SHEET_NAMES) - set(sheet_names):
missings_steets = list(set(REQUIRED_XLSM_SHEET_NAMES) - set(sheet_names))
raise AssertionError(f'The following sheets are missing the file {input_xlsm_filename}: {missings_steets}')
# PanelData:
if verbose:
print(f' - Parse PanalData')
df_panel_data = xl_file.parse('PanelData')
df_panel_data = remove_space_from_column_names(df_panel_data)
df_panel_data['NumberOfIntervals'] = None
if not (df_panel_data[['Id']].value_counts().count() == len(df_panel_data)):
raise AssertionError(
f'The values in the column "Id" in PanelData is not unique!')
# IntervalAnalysis:
if verbose:
print(f' - Parse IntervalAnalysis')
df_interval_analysis = xl_file.parse('IntervalAnalysis')
df_interval_analysis = remove_space_from_column_names(df_interval_analysis)
df_interval_analysis['Status'] = None # From AcquisitionIntervals
df_interval_analysis['DetectedParticles'] = None # From AcquisitionIntervals
df_interval_analysis['MeasuredVolume'] = None # From AcquisitionIntervals
df_interval_analysis['ResonantFrequency'] = None # From AcquisitionIntervals
if not (df_interval_analysis[['Id', 'IntervalNumber']].value_counts().count() == len(df_interval_analysis)):
raise AssertionError(f'The combination if the values in the columns "Id" and "IntervalNumber" in IntervalAnalysis is not unique!')
# Settings:
if verbose:
print(f' - Parse Settings')
settings_dict = {}
df_tmp = xl_file.parse('Settings', header=None)
for idx, row in df_tmp.iterrows():
short_row = row[1:]
item_not_nan_max_idx = short_row.loc[~short_row.isna()].index.max()
if item_not_nan_max_idx is np.nan: # No items that are not NaN!
settings_dict[row[0]] = None
else:
tmp_list = short_row[:item_not_nan_max_idx].to_list()
num_items = len(tmp_list)
if num_items == 1:
settings_dict[row[0]] = tmp_list[0]
else:
settings_dict[row[0]] = tmp_list
run_name = settings_dict['Name']
if settings_dict['AccessionNumber'] is None:
accession_number = ''
else:
accession_number = str(settings_dict['AccessionNumber'])
start_time_dt = settings_dict['StartTime']
end_time_dt =start_time_dt + dt.timedelta(settings_dict['ElapsedTime']/(24*60))
# # Settings (dataframe):
# df_settings = xl_file.parse('Settings', header=None).transpose()
# df_settings.columns = df_settings.iloc[0]
# df_settings = df_settings[1:]
# df_settings.reset_index(drop=True, inplace=True)
# Masses (from sheet AcquisitionIntervals):
if verbose:
print(f' - Parse Masses')
id_list = []
well_list = []
interval_num_list = []
time_list = []
masses_list = []
volume_list = []
total_num_particles_list = []
transit_time_list = []
pressure_drop_list = []
time_list_length_old = 0
masses_list_length_old = 0
volume_list_length_old = 0
total_num_particles_list_length_old = 0
transit_time_list_length_old = 0
pressure_drop_list_length_old = 0
current_id = None
current_well = None
current_interval_num = None
current_detected_particles = None
df_tmp = xl_file.parse('AcquisitionIntervals', header=None)
for ids, row in df_tmp.iterrows():
if row[0] == 'Id':
current_id = row[1]
if ~(df_interval_analysis['Id'] == current_id).any():
raise AssertionError(f'"ID="{current_id} is not available in InteralAnalysis!')
if ~(df_panel_data['Id'] == current_id).any():
raise AssertionError(f'"ID="{current_id} is not available in PanelData!')
continue
if row[0] == 'Well':
current_well = row[1]
if ~(df_interval_analysis['Well'] == current_well).any():
raise AssertionError(f'"Well="{current_well} is not available in InteralAnalysis!')
if ~(df_panel_data['Well'] == current_well).any():
raise AssertionError(f'"Well="{current_well} is not available in PanelData!')
continue
if row[0] == 'Antibiotic':
continue
if row[0] == 'AntibioticConcentration':
continue
if row[0] == 'NumberOfIntervals':
df_panel_data.loc[df_panel_data['Id'] == current_id, 'NumberOfIntervals'] = row[1]
continue
if row[0] == 'IntervalNumber':
current_interval_num = row[1]
if ~(df_interval_analysis['IntervalNumber'] == current_interval_num).any():
raise AssertionError(
f'"IntervalNumber="{current_interval_num} is not available in InteralAnalysis!')
continue
if row[0] == 'StartTime':
continue
if row[0] == 'EndTime':
continue
if row[0] == 'DilutionFactor':
continue
if row[0] == 'Status':
tmp_filter = (df_interval_analysis['Id'] == current_id) & (df_interval_analysis['IntervalNumber'] == current_interval_num)
if len(tmp_filter[tmp_filter]) != 1:
raise AssertionError(f'Invalid number of matches of "Id={current_id}" and "IntervalNumber={current_interval_num}" in IntervalAnalysis: {len(tmp_filter[tmp_filter])}')
df_interval_analysis.loc[tmp_filter, 'Status'] = row[1]
continue
if row[0] == 'DetectedParticles':
tmp_filter = (df_interval_analysis['Id'] == current_id) & (
df_interval_analysis['IntervalNumber'] == current_interval_num)
if len(tmp_filter[tmp_filter]) != 1:
raise AssertionError(f'Invalid number of matches of "Id={current_id}" and "IntervalNumber={current_interval_num}" in IntervalAnalysis: {len(tmp_filter[tmp_filter])}')
df_interval_analysis.loc[tmp_filter, 'DetectedParticles'] = row[1]
current_detected_particles = row[1] # For cross-checks
continue
if row[0] == 'MeasuredVolume':
tmp_filter = (df_interval_analysis['Id'] == current_id) & (
df_interval_analysis['IntervalNumber'] == current_interval_num)
if len(tmp_filter[tmp_filter]) != 1:
raise AssertionError(f'Invalid number of matches of "Id={current_id}" and "IntervalNumber={current_interval_num}" in IntervalAnalysis: {len(tmp_filter[tmp_filter])}')
df_interval_analysis.loc[tmp_filter, 'MeasuredVolume'] = row[1]
continue
if row[0] == 'ResonantFrequency':
tmp_filter = (df_interval_analysis['Id'] == current_id) & (
df_interval_analysis['IntervalNumber'] == current_interval_num)
if len(tmp_filter[tmp_filter]) != 1:
raise AssertionError(f'Invalid number of matches of "Id={current_id}" and "IntervalNumber={current_interval_num}" in IntervalAnalysis: {len(tmp_filter[tmp_filter])}')
df_interval_analysis.loc[tmp_filter, 'ResonantFrequency'] = row[1]
continue
if row[0] == 'Time':
tmp_list = row_to_list(row[1:])
if (len(tmp_list) == 0) and (current_detected_particles != 0):
raise AssertionError(f'Number of "DetectedParticles={current_detected_particles}" does not match length of "Time" series (= {len(tmp_list)}) ')
time_list += tmp_list
continue
if row[0] == 'Mass':
tmp_list = row_to_list(row[1:])
masses_list += tmp_list
continue
if row[0] == 'Volume':
tmp_list = row_to_list(row[1:])
volume_list += tmp_list
continue
if row[0] == 'TotalNumberOfParticlesThroughSensor':
tmp_list = row_to_list(row[1:])
total_num_particles_list += tmp_list
continue
if row[0] == 'TransitTime':
tmp_list = row_to_list(row[1:])
transit_time_list += tmp_list
continue
if row[0] == 'PressureDrop':
tmp_list = row_to_list(row[1:])
pressure_drop_list += tmp_list
# Finish data collection for the current Interval (sample):
# Check if the length of all data series of the current interval number match:
if not (len(time_list) == len(masses_list) == len(volume_list) == len(total_num_particles_list) == len(
transit_time_list) == len(pressure_drop_list)):
raise AssertionError(
f'The lengths of the data series in AcquisitionIntervals of "Well={current_well}" and "IntervalNumber={current_interval_num}" do not match!')
# Set up lists for well, id and interval number:
num_additional_items_in_data_series = len(time_list) - time_list_length_old
tmp_list = [current_id] * num_additional_items_in_data_series
id_list += tmp_list
tmp_list = [current_well] * num_additional_items_in_data_series
well_list += tmp_list
tmp_list = [current_interval_num] * num_additional_items_in_data_series
interval_num_list += tmp_list
# Reset counters:
time_list_length_old = len(time_list)
masses_list_length_old = len(masses_list)
volume_list_length_old = len(volume_list)
total_num_particles_list_length_old = len(total_num_particles_list)
transit_time_list_length_old = len(transit_time_list)
pressure_drop_list_length_old = len(pressure_drop_list)
continue
# Check if the length of all data series lists match:
if not (len(time_list) == len(masses_list) == len(volume_list) == len(total_num_particles_list) == len(
transit_time_list) == len(pressure_drop_list) == len(id_list) == len(well_list) == len(interval_num_list)):
raise AssertionError(
f'The lengths of the data series in AcquisitionIntervals do not match!')
# Create dataframe:
df_masses_columns = ['Id', 'Well', 'IntervalNumber', 'Time', 'Mass', 'Volume', 'TotalNumberOfParticlesThroughSensor', 'TransitTime', 'PressureDrop']
df_masses = pd.DataFrame(list(zip(id_list, well_list, interval_num_list, time_list, masses_list, volume_list, total_num_particles_list, transit_time_list, pressure_drop_list)),
columns = df_masses_columns)
# Sensor:
# sensor_dict = {}
# df_sensor = xl_file.parse('Sensor', header=None).transpose()
# # - The df needs to have two rows => key and value for the dict!
# if df_sensor.shape[0] != 2:
# raise AssertionError(f'More than one column of parameters in the sheet "Sensor!"')
# for col_idx in df_sensor.columns:
# sensor_dict[df_sensor.loc[0, col_idx]] = df_sensor.loc[1, col_idx]
if verbose:
print(f'...finished loading and parsing data!')
return cls(run_name=run_name,
accession_number=accession_number,
input_xlsm_filename=input_xlsm_filename,
start_time_dt=start_time_dt,
end_time_dt=end_time_dt,
settings_dict=settings_dict,
df_panel_data=df_panel_data,
df_interval_analysis=df_interval_analysis,
df_masses=df_masses,
)
def export_csv_files(self, output_filepath, verbose=True, sort_by_time=False):
"""Write CSV files to output directory"""
print('Write output')
# Checks:
if not os.path.exists(output_filepath):
raise AssertionError(f'The output path does not exist: {output_filepath}')
self.output_dir_path = output_filepath
if self.accession_number:
filename_ending = f'{self.run_name}_{self.accession_number}.csv'
else:
filename_ending = f'{self.run_name}.csv'
# Write PanelData:
filename = os.path.join(output_filepath, f'SampleMetadata_{filename_ending}')
if verbose:
print(f'Write PanelData to: {filename}')
self.df_panel_data.to_csv(filename, index=False)
# Write Masses:
filename = os.path.join(output_filepath, f'Masses_{filename_ending}')
if verbose:
print(f'Write Masses to: {filename}')
if sort_by_time:
if verbose:
print(f' - Sort Masses by Time.')
self.df_masses.sort_values(by=['Time']).to_csv(filename, index=False)
else:
self.df_masses.to_csv(filename, index=False)
# Write IntervalAnalysis:
filename = os.path.join(output_filepath, f'Summary_{filename_ending}')
if verbose:
print(f'Write IntervalAnalysis to: {filename}')
self.df_interval_analysis.to_csv(filename, index=False)
# TODO: Output format (number of digits)
# TODO: Select columns for output (setable as parameter + default settings for each csv file)
# TODO: Optionally order data series by time (parameter)?!
@property
def get_number_of_observations(self):
"""Return the number of observations (items in the data series)."""
if self.df_masses is not None:
return len(self.df_masses)
else:
return 0
@property
def get_number_of_wells(self):
"""Return the number wells."""
if self.df_panel_data is not None:
return len(self.df_panel_data)
else:
return 0
@property
def get_number_of_intervals(self):
"""Return the number intervals."""
if self.df_interval_analysis is not None:
return len(self.df_interval_analysis)
else:
return 0
def __str__(self):
if self.run_name is not None:
return f'Run "{self.run_name}" with {self.get_number_of_observations} observations in {self.get_number_of_intervals} intervals and {self.get_number_of_wells} wells.'
else:
return f'Not data available yet.'
def remove_space_from_column_names(df):
"""Removes white space from column names of input dataframe."""
col_names = df.columns
col_names_corrected = []
for col_name in col_names:
col_names_corrected.append(col_name.strip())
df.columns = col_names_corrected
return df
def row_to_list(row) -> list:
"""Convert dataframe row to list and remove all trailing NaN values."""
item_not_nan_max_idx = row.loc[~row.isna()].index.max()
if item_not_nan_max_idx is np.nan: # No items that are not NaN!
out_list = []
else:
out_list = row[:item_not_nan_max_idx].to_list()
return out_list
if __name__ == '__main__':
"""Main function, primarily for debugging and testing."""
xlsm_filename = '../../data/Example_several_wells/Vibrio_starvation_24.11.22_221125_163425.xlsm'
oputput_directory = '../../output/'
ls_data = LSData.from_xlsm_file(input_xlsm_filename=xlsm_filename)
ls_data.export_csv_files(oputput_directory)
print('test')
"""Modelling a LifeScale run
Copyright (C) 2022 Andreas Hellerschmied <heller182@gmx.at>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import datetime as dt
import pytz
import numpy as np
import pickle
import os
import sys
import pandas as pd
# from gravtools.models.lsm import LSM
class LSRun:
"""LifeScale run object.
A LS run contains:
- run name and description
- input data
- settings
Attributes
----------
run_name : str
Name of the lsm run.
output_directory : str
Path to output directory (all output files are stored there).
pgm_version : str
Version of the program.
"""
def __init__(self,
campaign_name,
output_directory,
surveys=None, # Always use non-mutable default arguments!
stations=None, # Always use non-mutable default arguments!
lsm_runs=None, # Always use non-mutable default arguments!
ref_delta_t_dt=None # Reference time for drift determination
):
"""
Parameters
"""
\ No newline at end of file
*
!.gitignore
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment