Skip to content
Snippets Groups Projects
Select Git revision
  • 1bad8e08e908fba710a70c4bbce7a9664a5bad6a
  • release default protected
  • workshop
3 results

.ipyloadcfg.py

Blame
  • determine_dt.py 4.20 KiB
    # -*- coding: utf-8 -*-
    """
    @author: Martin Weise
    """
    import json
    import csv
    import logging
    import io
    from clients.s3_client import S3Client
    
    import messytables, pandas as pd
    from messytables import CSVTableSet, type_guess, \
        headers_guess, headers_processor, offset_processor
    
    
    def determine_datatypes(filename, enum=False, enum_tol=0.0001, separator=None) -> {}:
        # Use option enum=True for searching Postgres ENUM Types in CSV file. Remark
        # Enum is not SQL standard, hence, it might not be supported by all db-engines.
        # However, it can be used in Postgres and MySQL.
        s3_client = S3Client()
        s3_client.file_exists('dbrepo-upload', filename)
        response = s3_client.get_file('dbrepo-upload', filename)
        stream = response['Body']
        if response['ContentLength'] == 0:
            logging.warning(f'Failed to determine data types: file {filename} has empty body')
            return json.dumps({'columns': [], 'separator': ','})
        if separator is None:
            logging.info('Attempt to guess separator for from first line')
            with io.BytesIO(stream.read()) as fh:
                line = next(fh)
                dialect = csv.Sniffer().sniff(line.decode('utf-8'))
                separator = dialect.delimiter
                logging.info('determined separator: %s', separator)
    
        # Load a file object:
        with io.BytesIO(stream.read()) as fh:
            logging.info('Analysing corpus with separator: %s', separator)
            table_set = CSVTableSet(fh, delimiter=separator)
    
            # A table set is a collection of tables:
            row_set = table_set.tables[0]
    
            # guess header names and the offset of the header:
            offset, headers = headers_guess(row_set.sample)
            row_set.register_processor(headers_processor(headers))
    
            # add one to begin with content, not the header:
            row_set.register_processor(offset_processor(offset + 1))
    
            # guess column types:
            types = type_guess(row_set.sample, strict=True)
    
            r = {}
    
            # list of rows
            if enum == True:
                rows = pd.read_csv(fh, sep=separator, header=offset)
                n = len(rows)
    
            for i in range(0, (len(types))):
                if type(types[i]) == messytables.types.BoolType:
                    r[headers[i]] = "bool"
                elif type(types[i]) == messytables.types.IntegerType:
                    r[headers[i]] = "bigint"
                elif type(types[i]) == messytables.types.DateType:
                    if "%H" in types[i].format or "%M" in types[i].format or "%S" in types[i].format or "%Z" in types[
                        i].format:
                        r[headers[i]] = "timestamp"  # todo: guesses date format too, return it
                    else:
                        r[headers[i]] = "date"
                elif type(types[i]) == messytables.types.DecimalType or type(types[i]) == messytables.types.FloatType:
                    r[headers[i]] = "decimal"
                elif type(types[i]) == messytables.types.StringType:
                    r[headers[i]] = "varchar"
                elif type(types[i]) == messytables.types.PercentageType:
                    r[headers[i]] = "double"
                elif type(types[i]) == messytables.types.CurrencyType:
                    r[headers[i]] = "double"
                elif type(types[i]) == messytables.types.TimeType:
                    r[headers[i]] = "time"
                else:
                    if enum == True:
                        enum_set = set()
                        m = 0
                        is_enum = True
                        for elem in range(0, n):
                            if (m < enum_tol * n):
                                enum_set.add(rows.iloc[elem, i])
                            else:
                                is_enum = False
                                break
                            m = len(enum_set)
                        if is_enum:
                            enum_set.discard(None)
                            r[headers[i]] = {"enums": list(enum_set)}
                        else:
                            r[headers[i]] = "text"
                    else:
                        r[headers[i]] = "text"
            fh.close()
            s = {'columns': r, 'separator': separator}
            logging.info('Determined data types %s', s)
        return json.dumps(s)