Skip to content
Snippets Groups Projects
Verified Commit 5880d860 authored by Martin Weise's avatar Martin Weise
Browse files

Merge branch 'master' into dev

parents d813afb7 1ff9d73c
Branches
Tags
5 merge requests!345Updated docs and endpoints:,!341Fixed mapping problem where UK and FK share columns they are inserted,!339Fixed mapping problem where UK and FK share columns they are inserted,!338Fixed mapping problem where UK and FK share columns they are inserted,!334Fixed mapping problem where UK and FK share columns they are inserted
Showing
with 2 additions and 45413 deletions
......@@ -22,10 +22,7 @@ import at.tuwien.gateway.DataDatabaseSidecarGateway;
import at.tuwien.gateway.MetadataServiceGateway;
import at.tuwien.test.AbstractUnitTest;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
......@@ -507,6 +504,7 @@ public class TableServiceIntegrationTest extends AbstractUnitTest {
}
@Test
@Disabled("Not stable CI/CD")
public void getStatistics_succeeds() throws TableMalformedException, SQLException, TableNotFoundException {
/* test */
......
import logging
from dataclasses import dataclass
import requests
from flask import current_app
from typing import List
from jwt import jwk_from_pem, JWT
@dataclass(init=True, eq=True)
class User:
username: str
roles: List[str]
class KeycloakClient:
def obtain_user_token(self, username: str, password: str) -> str:
response = requests.post(
f"{current_app.config['AUTH_SERVICE_ENDPOINT']}/realms/dbrepo/protocol/openid-connect/token",
data={
"username": username,
"password": password,
"grant_type": "password",
"client_id": current_app.config["AUTH_SERVICE_CLIENT"],
"client_secret": current_app.config["AUTH_SERVICE_CLIENT_SECRET"]
})
body = response.json()
if "access_token" not in body:
raise AssertionError("Failed to obtain user token(s)")
return response.json()["access_token"]
def verify_jwt(self, access_token: str) -> User:
public_key = jwk_from_pem(str(current_app.config["JWT_PUBKEY"]).encode('utf-8'))
payload = JWT().decode(message=access_token, key=public_key, do_time_check=True)
logging.debug(f"JWT token client_id={payload.get('client_id')} and realm_access={payload.get('realm_access')}")
return User(username=payload.get('client_id'), roles=payload.get('realm_access')["roles"])
"""
The opensearch_client.py is used by the different API endpoints in routes.py to handle requests to the opensearch db
"""
import os
from json import dumps, load
import logging
from dbrepo.api.dto import Database
from collections.abc import MutableMapping
from opensearchpy import OpenSearch, TransportError, RequestError, NotFoundError
from omlib.measure import om
from omlib.constants import OM_IDS
from omlib.omconstants import OM
from omlib.unit import Unit
class OpenSearchClient:
"""
The client to communicate with the OpenSearch database.
"""
host: str = None
port: int = None
username: str = None
password: str = None
instance: OpenSearch = None
def __init__(self, host: str = None, port: int = None, username: str = None, password: str = None):
self.host = os.getenv('OPENSEARCH_HOST', host)
self.port = int(os.getenv('OPENSEARCH_PORT', port))
self.username = os.getenv('OPENSEARCH_USERNAME', username)
self.password = os.getenv('OPENSEARCH_PASSWORD', password)
def _instance(self) -> OpenSearch:
"""
Wrapper method to get the instance singleton.
@returns: The opensearch instance singleton, if successful.
"""
if self.instance is None:
self.instance = OpenSearch(hosts=[{"host": self.host, "port": self.port}],
http_compress=True,
http_auth=(self.username, self.password))
return self.instance
def get_database(self, database_id: int) -> Database:
"""
Gets a database by given id.
@param database_id: The database id.
@returns: The database, if successful.
@throws: opensearchpy.exceptions.NotFoundError If the database was not found in the Search Database.
"""
response: dict = self._instance().get(index="database", id=database_id)
return Database.parse_obj(response["_source"])
def update_database(self, database_id: int, data: Database) -> Database:
"""
Updates the database data with given id.
@param database_id: The database id.
@param data: The database data.
@returns: The updated database, if successful.
@throws: opensearchpy.exceptions.NotFoundError If the database was not found in the Search Database.
"""
logging.debug(f"updating database with id: {database_id} in search database")
self._instance().index(index="database", id=database_id, body=dumps(data.model_dump()))
response: dict = self._instance().get(index="database", id=database_id)
database = Database.parse_obj(response["_source"])
logging.info(f"Updated database with id {database_id} in index 'database'")
return database
def delete_database(self, database_id: int) -> None:
"""
Deletes the database data with given id.
@param database_id: The database id.
@throws: opensearchpy.exceptions.NotFoundError If the database was not found in the Search Database.
"""
self._instance().delete(index="database", id=database_id)
logging.info(f"Deleted database with id {database_id} in index 'database'")
def query_index_by_term_opensearch(self, term, mode):
"""
old code, is effectively replaced by general_search() now
sends an opensearch query
:return list of dicts
"""
query_str = ""
if mode == "exact":
query_str = f"{term}"
elif mode == "contains":
query_str = f"*{term}*"
response = self._instance().search(
index="database",
body={
"query": {
"query_string": {
"query": query_str,
"allow_leading_wildcard": "true", # default true
}
},
},
)
results = [hit["_source"] for hit in response["hits"]["hits"]]
return results
def get_fields_for_index(self, field_type: str):
"""
returns a list of attributes of the data for a specific index.
:param field_type: The search type
:return: list of fields
"""
fields = {
"database": "*",
"table": "tables.*",
"column": "tables.columns.*",
"concept": "tables.columns.concept.*",
"unit": "tables.columns.unit.*",
"identifier": "identifiers.*",
"view": "views.*",
"user": "creator.*",
}
if field_type not in fields.keys():
raise NotFoundError(f"Failed to find field type: {field_type}")
logging.debug(f'requesting field(s) {fields[field_type]} for filter: {field_type}')
fields = self._instance().indices.get_field_mapping(fields[field_type])
fields_list = []
fd = flatten_dict(fields)
for key in fd.keys():
if not key.startswith('database'):
continue
entry = {}
if key.split(".")[-1] == "type":
entry["attr_name"] = key_to_attr_name(key)
entry["attr_friendly_name"] = attr_name_to_attr_friendly_name(entry["attr_name"])
entry["type"] = fd[key]
fields_list.append(entry)
return fields_list
def fuzzy_search(self, search_term=None):
logging.info(f"Performing fuzzy search")
fuzzy_body = {
"query": {
"multi_match": {
"query": search_term,
"fuzziness": "AUTO",
"fuzzy_transpositions": True,
"minimum_should_match": 3
}
}
}
logging.debug(f'search body: {fuzzy_body}')
response = self._instance().search(
index="database",
body=fuzzy_body
)
logging.info(f"Found {len(response['hits']['hits'])} result(s)")
return response
def general_search(self, field_type: str = None, field_value_pairs: dict = None):
"""
Main method for searching stuff in the opensearch db
all parameters are optional
:param field_type: The index to be searched. Optional.
:param field_value_pairs: The key-value pair of properties that need to match. Optional.
:return: The object of results and HTTP status code. e.g. { "hits": { "hits": [] } }, 200
"""
musts = []
if field_value_pairs is not None and len(field_value_pairs) > 0:
logging.debug(f'field_value_pairs present: {field_value_pairs}')
for key, value in field_value_pairs.items():
if field_value_pairs[key] == None:
logging.debug(f"skip empty key: {key}")
continue
logging.debug(f"processing key: {key}")
if '.' in key:
logging.debug(f'key {key} is nested: use nested query')
musts.append({
"match": {
key: value
}
})
else:
logging.debug(f'key {key} is flat: use bool query')
musts.append({
"match": {
key: {"query": value, "minimum_should_match": "90%"}
}
})
body = {
"query": {"bool": {"must": musts}}
}
logging.debug(f'search in index database for type: {field_type}')
logging.debug(f'search body: {dumps(body)}')
response = self._instance().search(
index="database",
body=dumps(body)
)
results = [hit["_source"] for hit in response["hits"]["hits"]]
return results
def unit_independent_search(self, t1: float, t2: float, field_value_pairs):
"""
Main method for searching stuff in the opensearch db
:param t1: start value
:param t2: end value
:param field_value_pairs: the key-value pairs
:return:
"""
logging.info(f"Performing unit-independent search")
searches = []
body = {
"size": 0,
"aggs": {
"units": {
"terms": {"field": "unit.uri", "size": 500}
}
}
}
response = self._instance().search(
index="database",
body=dumps(body)
)
unit_uris = [hit["key"] for hit in response["aggregations"]["units"]["buckets"]]
logging.debug(f"found {len(unit_uris)} unit(s) in column index")
if len(unit_uris) == 0:
raise NotFoundError("Failed to search: no unit assigned")
base_unit = unit_uri_to_unit(field_value_pairs["unit.uri"])
for unit_uri in unit_uris:
gte = t1
lte = t2
if unit_uri != field_value_pairs["unit.uri"]:
target_unit = unit_uri_to_unit(unit_uri)
if not Unit.can_convert(base_unit, target_unit):
logging.error(f"Cannot convert unit {field_value_pairs['unit.uri']} to target unit {unit_uri}")
continue
gte = om(t1, base_unit).convert(target_unit)
lte = om(t2, base_unit).convert(target_unit)
logging.debug(
f"converted original range [{t1},{t2}] for base unit {base_unit} to mapped range [{gte},{lte}] for target unit={target_unit}")
searches.append({'index': 'column'})
searches.append({
"query": {
"bool": {
"must": [
{
"match": {
"concept.uri": {
"query": field_value_pairs["concept.uri"]
}
}
},
{
"range": {
"val_min": {
"gte": gte
}
}
},
{
"range": {
"val_max": {
"lte": lte
}
}
},
{
"match": {
"unit.uri": {
"query": unit_uri
}
}
}
]
}
}
})
logging.debug('searches: %s', searches)
body = ''
for search in searches:
body += '%s \n' % dumps(search)
responses = self._instance().msearch(
body=dumps(body)
)
response = {
"hits": {
"hits": flatten([hits["hits"]["hits"] for hits in responses["responses"]])
},
"took": responses["took"]
}
return response
def key_to_attr_name(key: str) -> str:
"""
Maps an attribute key to a machine-readable representation
:param key: The attribute key
:return: The machine-readable representation of the attribute key
"""
parts = []
previous = None
for part in key.split(".")[1:-1]: # remove the first and last sub-item database.xxx.yyy.zzz.type -> xxx.yyy.zzz
if part == "mappings" or part == "mapping": # remove the mapping sub-item(s)
continue
if part == previous: # remove redundant sub-item(s)
continue
previous = part
parts.append(part)
return ".".join(parts)
def attr_name_to_attr_friendly_name(key: str) -> str:
"""
Maps an attribute key to a human-readable representation
:param key: The attribute key
:return: The human-readable representation of the attribute key
"""
with open('friendly_names_overrides.json') as json_data:
d = load(json_data)
for json_key in d.keys():
if json_key == key:
logging.debug(f"friendly name exists for key {json_key}")
return d[json_key]
return ''.join(key.replace('_', ' ').title().split('.')[-1:])
def flatten_dict(
d: MutableMapping, parent_key: str = "", sep: str = "."
) -> MutableMapping:
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, MutableMapping):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def flatten(mylist):
return [item for sublist in mylist for item in sublist]
def unit_uri_to_unit(uri):
base_identifier = uri[len(OM_IDS.NAMESPACE):].replace("-", "")
return getattr(OM, base_identifier)
import rdflib
from rdflib import URIRef
from omlib.dimension import Dimension
from omlib.scale import Scale
from omlib.unit import Prefix, Unit
class OM_IDS:
NAMESPACE = 'http://www.ontology-of-units-of-measure.org/resource/om-2/'
class SI:
SYSTEM_OF_UNITS = str(OM_IDS.NAMESPACE + 'InternationalSystemOfUnits') # WARNING If you change "SI" also change in constants.py
# SI Prefixes
YOTTA = Prefix('yotta', 'Y', 1e24, OM_IDS.NAMESPACE + 'yotta')
ZETTA = Prefix('zetta', 'Z', 1e21, OM_IDS.NAMESPACE + 'zetta')
EXA = Prefix('exa', 'E', 1e18, OM_IDS.NAMESPACE + 'exa')
PETA = Prefix('peta', 'P', 1e15, OM_IDS.NAMESPACE + 'peta')
TERA = Prefix('tera', 'T', 1e12, OM_IDS.NAMESPACE + 'tera')
GIGA = Prefix('giga', 'G', 1e9, OM_IDS.NAMESPACE + 'giga')
MEGA = Prefix('mega', 'M', 1e6, OM_IDS.NAMESPACE + 'mega')
KILO = Prefix('kilo', 'k', 1e3, OM_IDS.NAMESPACE + 'kilo')
HECTO = Prefix('hecto', 'h', 1e2, OM_IDS.NAMESPACE + 'hecto')
DECA = Prefix('deca', 'da', 1e1, OM_IDS.NAMESPACE + 'deca')
DECI = Prefix('deci', 'd', 1e-1, OM_IDS.NAMESPACE + 'deci')
CENTI = Prefix('centi', 'c', 1e-2, OM_IDS.NAMESPACE + 'centi')
MILLI = Prefix('milli', 'm', 1e-3, OM_IDS.NAMESPACE + 'milli')
MICRO = Prefix('micro', 'μ', 1e-6, OM_IDS.NAMESPACE + 'micro')
NANO = Prefix('nano', 'n', 1e-9, OM_IDS.NAMESPACE + 'nano')
PICO = Prefix('pico', 'p', 1e-12, OM_IDS.NAMESPACE + 'pico')
FEMTO = Prefix('femto', 'f', 1e-15, OM_IDS.NAMESPACE + 'femto')
ATTO = Prefix('atto', 'a', 1e-18, OM_IDS.NAMESPACE + 'atto')
ZEPTO = Prefix('zepto', 'z', 1e-21, OM_IDS.NAMESPACE + 'zepto')
YOCTO = Prefix('yocto', 'y', 1e-24, OM_IDS.NAMESPACE + 'yocto')
# SI Base Units
SECOND = Unit.get_singular_unit('second', 's', Dimension(1, 0, 0, 0, 0, 0, 0),
identifier=OM_IDS.NAMESPACE + 'second-Time',
system_of_units=SYSTEM_OF_UNITS, is_base_unit=True)
METRE = Unit.get_singular_unit('metre', 'm', Dimension(0, 1, 0, 0, 0, 0, 0), identifier=OM_IDS.NAMESPACE + 'metre',
system_of_units=SYSTEM_OF_UNITS, is_base_unit=True)
GRAM = Unit.get_singular_unit('gram', 'g', Dimension(0, 0, 1, 0, 0, 0, 0), identifier=OM_IDS.NAMESPACE + 'gram')
KILOGRAM = Unit.get_prefixed_unit(KILO, GRAM, identifier=OM_IDS.NAMESPACE + 'kilogram',
system_of_units=SYSTEM_OF_UNITS,
is_base_unit=True)
AMPERE = Unit.get_singular_unit('ampere', 'A', Dimension(0, 0, 0, 1, 0, 0, 0),
identifier=OM_IDS.NAMESPACE + 'ampere',
system_of_units=SYSTEM_OF_UNITS, is_base_unit=True)
KELVIN = Unit.get_singular_unit('kelvin', 'K', Dimension(0, 0, 0, 0, 1, 0, 0),
identifier=OM_IDS.NAMESPACE + 'kelvin',
system_of_units=SYSTEM_OF_UNITS, is_base_unit=True)
MOLE = Unit.get_singular_unit('mole', 'mol', Dimension(0, 0, 0, 0, 0, 1, 0),
identifier=OM_IDS.NAMESPACE + 'mole',
system_of_units=SYSTEM_OF_UNITS, is_base_unit=True)
CANDELA = Unit.get_singular_unit('candela', 'cd', Dimension(0, 0, 0, 0, 0, 0, 1),
identifier=OM_IDS.NAMESPACE + 'candela', system_of_units=SYSTEM_OF_UNITS,
is_base_unit=True)
class IEC:
KIBI = Prefix('kibi', 'Ki', pow(2, 10), OM_IDS.NAMESPACE + 'kibi')
MEBI = Prefix('mebi', 'Mi', pow(2, 20), OM_IDS.NAMESPACE + 'mebi')
GIBI = Prefix('gibi', 'Gi', pow(2, 30), OM_IDS.NAMESPACE + 'gibi')
TEBI = Prefix('tebi', 'Ti', pow(2, 40), OM_IDS.NAMESPACE + 'tebi')
PEBI = Prefix('pebi', 'Pi', pow(2, 50), OM_IDS.NAMESPACE + 'pebi')
EXBI = Prefix('exbi', 'Ei', pow(2, 60), OM_IDS.NAMESPACE + 'exbi')
ZEBI = Prefix('zebi', 'Zi', pow(2, 70), OM_IDS.NAMESPACE + 'zebi')
YOBI = Prefix('yobi', 'Yi', pow(2, 80), OM_IDS.NAMESPACE + 'yobi')
class JEDEC:
KILO = Prefix('kilo', 'k', pow(2, 10), OM_IDS.NAMESPACE + 'jedec-kilo')
MEGA = Prefix('mega', 'M', pow(2, 20), OM_IDS.NAMESPACE + 'jedec-mega')
GIGA = Prefix('giga', 'G', pow(2, 30), OM_IDS.NAMESPACE + 'jedec-giga')
from omlib.exceptions.dimensionexception import DimensionalException
class Dimension:
def __init__(self, T=0, L=0, M=0, I=0, Theta=0, N=0, J=0):
self.T = T
self.L = L
self.M = M
self.I = I
self.Theta = Theta
self.N = N
self.J = J
def time_dimension_exponent(self):
return self.T
def length_dimension_exponent(self):
return self.L
def mass_dimension_exponent(self):
return self.M
def electric_current_dimension_exponent(self):
return self.I
def thermodynamic_temperature_dimension_exponent(self):
return self.Theta
def amount_of_substance_dimension_exponent(self):
return self.N
def luminous_intensity_dimension_exponent(self):
return self.J
def __eq__(self, other):
return (self.T == other.T and self.L == other.L and self.M == other.M and self.I == other.I and
self.Theta == other.Theta and self.N == other.N and self.J == other.J)
def __str__(self):
return f'(T={self.T}, L={self.L}, M={self.M}, I={self.I}, θ={self.Theta}, N={self.N}, J={self.J})'
def __add__(self, other):
if not self == other:
raise DimensionalException("Entities of different dimensions cannot be added together. {} != {}"
.format(self, other))
return self
def __sub__(self, other):
if not self == other:
raise DimensionalException("Entities of different dimensions cannot be subtracted from each other. {} != {}"
.format(self, other))
return self
def __mul__(self, other):
return Dimension(self.T + other.T, self.L + other.L, self.M + other.M, self.I + other.I,
self.Theta + other.Theta, self.N + other.N, self.J + other.J)
def __truediv__(self, other):
return Dimension(self.T - other.T, self.L - other.L, self.M - other.M, self.I - other.I,
self.Theta - other.Theta, self.N - other.N, self.J - other.J)
@staticmethod
def pow(base, exponent):
if isinstance(base, Dimension):
return Dimension(base.T * exponent, base.L * exponent, base.M * exponent, base.I * exponent,
base.Theta * exponent, base.N * exponent, base.J * exponent)
else:
return Dimension()
class DimensionalException(Exception):
def __init__(self, message):
self.message = message
class UnitConversionException(Exception):
def __init__(self, message):
self.message = message
class ScaleConversionException(Exception):
def __init__(self, message):
self.message = message
class UnitIdentityException(Exception):
def __init__(self, message):
self.message = message
class ScaleIdentityException(Exception):
def __init__(self, message):
self.message = message
import math
from omlib.constants import SI
from omlib.exceptions.dimensionexception import DimensionalException
from omlib.scale import Scale
from omlib.thing import Thing
from omlib.unit import Unit, PrefixedUnit, SingularUnit
def om(numerical_value, unit_or_scale, identifier=None):
if isinstance(unit_or_scale, Unit):
return Measure(numerical_value, unit_or_scale, identifier)
if isinstance(unit_or_scale, Scale):
return Point(numerical_value, unit_or_scale, identifier)
return None
class Point(Thing):
@staticmethod
def create_by_converting(point, to_scale):
if not isinstance(point, Point):
raise ValueError("The parameter to the convert method is not of the correct type (Point).")
if not isinstance(to_scale, Scale):
raise ValueError("The parameter to the convert method is not of the correct type (Scale).")
new_point = Point(point.numericalValue, point.scale)
new_point.convert(to_scale)
return new_point
@staticmethod
def create_by_converting_to_ratio_scale(point):
if not isinstance(point, Point):
raise ValueError("The parameter to the convert method is not of the correct type (Point).")
new_point = Point(point.numericalValue, point.scale)
new_point.convert_to_ratio_scale()
return new_point
def __init__(self, numerical_value, scale, identifier=None):
super().__init__(identifier=identifier)
self.numericalValue = numerical_value
self.scale = scale
def convert(self, to_scale):
if not isinstance(to_scale, Scale):
raise ValueError("The parameter to the convert method is not of the correct type (Scale).")
factor = Scale.conversion_factor(self.scale, to_scale)
off_set = Scale.conversion_off_set(self.scale, to_scale)
self.numericalValue = self.numericalValue * factor + off_set
self.scale = to_scale
def convert_to_ratio_scale(self):
base = self.scale.base_ratio_scale()
factor = Scale.conversion_factor(self.scale, base[0])
off_set = Scale.conversion_off_set(self.scale, base[0])
self.numericalValue = self.numericalValue * factor + off_set
self.scale = base[0]
def __str__(self):
return f'{self.numericalValue} {self.scale.unit.symbol()}'
def __new_value_for_comparisson(self, other):
if isinstance(other, Point):
factor = Scale.conversion_factor(self.scale, other.scale)
off_set = Scale.conversion_off_set(self.scale, other.scale)
new_value = self.numericalValue * factor + off_set
return new_value
return None
def __eq__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value == other.numericalValue
return False
def __ne__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value != other.numericalValue
return False
def __lt__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value < other.numericalValue
return False
def __le__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value <= other.numericalValue
return False
def __gt__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value > other.numericalValue
return False
def __ge__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value >= other.numericalValue
return False
def __add__(self, other):
if not isinstance(other, Measure):
raise ValueError('The value to be added is not a measure and only measures can be added to a point.')
if not self.scale.dimensions == other.unit.dimensions:
raise DimensionalException("Measures and Points with units of different dimensions cannot be added "
"together. {} != {}"
.format(self.scale.unit, other.unit))
new_measure = Measure.create_by_converting(other, self.scale.unit)
return_point = Point(self.numericalValue + new_measure.numericalValue, self.scale)
return return_point
def __sub__(self, other):
if not isinstance(other, Measure) and not isinstance(other, Point):
raise ValueError('The value to be subtracted is not a point or a measure and only measures or points '
'can be subtracted from a point.')
if isinstance(other, Measure):
if not self.scale.dimensions == other.unit.dimensions:
raise DimensionalException("Measures and Points with units of different dimensions cannot be "
"subtracted from each other. {} != {}".format(self.scale.unit, other.unit))
new_measure = Measure.create_by_converting(other, self.scale.unit)
return_point = Point(self.numericalValue - new_measure.numericalValue, self.scale)
return return_point
if isinstance(other, Point):
if not self.scale.dimensions == other.scale.dimensions:
raise DimensionalException("Measures and Points with units of different dimensions cannot be "
"subtracted from each other. {} != {}".format(self.scale.unit, other.unit))
new_point = Point.create_by_converting(other, self.scale)
return_measure = Measure(self.numericalValue - new_point.numericalValue, self.scale.unit)
return return_measure
def __mul__(self, other):
as_measure = Measure(self.numericalValue, self.scale.unit)
return as_measure * other
def __truediv__(self, other):
as_measure = Measure(self.numericalValue, self.scale.unit)
return as_measure / other
class Measure(Thing):
@staticmethod
def create_by_converting(measure, to_unit):
if not isinstance(measure, Measure):
raise ValueError("The parameter to the convert method is not of the correct type (Measure).")
if not isinstance(to_unit, Unit):
raise ValueError("The parameter to the convert method is not of the correct type (Unit).")
new_measure = Measure(measure.numericalValue, measure.unit)
new_measure.convert(to_unit)
return new_measure
@staticmethod
def create_by_converting_to_base_units(measure, in_system_of_units=SI):
if not isinstance(measure, Measure):
raise ValueError("The parameter to the convert method is not of the correct type (Measure).")
new_measure = Measure(measure.numericalValue, measure.unit)
new_measure.convert_to_base_units(in_system_of_units)
return new_measure
@staticmethod
def create_by_converting_to_convenient_units(measure, in_system_of_units=None, use_prefixes=True):
if not isinstance(measure, Measure):
raise ValueError("The parameter to the convert method is not of the correct type (Measure).")
new_measure = Measure(measure.numericalValue, measure.unit)
new_measure.convert_to_convenient_units(in_system_of_units, use_prefixes=use_prefixes)
return new_measure
def __init__(self, numerical_value, unit, identifier=None):
super().__init__(identifier=identifier)
self.numericalValue = numerical_value
self.unit = unit
def convert(self, to_unit):
if not isinstance(to_unit, Unit):
raise ValueError("The parameter to the convert method is not of the correct type (Unit).")
factor = Unit.conversion_factor(self.unit, to_unit)
self.numericalValue = self.numericalValue * factor
self.unit = to_unit
def convert_to_base_units(self, in_system_of_units=None):
if in_system_of_units is None:
in_system_of_units = self.unit.systemOfUnits
if in_system_of_units is None:
in_system_of_units = SI.SYSTEM_OF_UNITS
base = Unit.get_base_units(self.unit, in_system_of_units)
self.convert(base)
def convert_to_convenient_units(self, system_of_units=None, use_prefixes=True):
if system_of_units is None:
system_of_units = self.unit.systemOfUnits
test_units = self.unit.with_dimensions(self.unit.dimensions, in_system_of_units=system_of_units)
selected_unit = self.unit
log_selected_value = Measure.__get_log_value(self.numericalValue, selected_unit)
for test_unit in test_units:
if use_prefixes or not isinstance(test_unit, PrefixedUnit):
factor = Unit.conversion_factor(self.unit, test_unit)
value = abs(self.numericalValue * factor)
log_value = Measure.__get_log_value(value, test_unit)
if log_value < log_selected_value:
log_selected_value = log_value
selected_unit = test_unit
if abs(log_value - log_selected_value) < 0.1 and isinstance(test_unit, SingularUnit):
log_selected_value = log_value
selected_unit = test_unit
if abs(log_value - log_selected_value) < 0.1 and isinstance(test_unit, PrefixedUnit)\
and not isinstance(selected_unit, SingularUnit):
log_selected_value = log_value
selected_unit = test_unit
self.convert(selected_unit)
@staticmethod
def __get_log_value(value, unit):
if value == 0.0:
return 0
log_value = abs(math.log10(value))
if value < 1:
log_value = log_value + 2
if not isinstance(unit, SingularUnit):
log_value = log_value + 1
return log_value
def __str__(self):
return f'{self.numericalValue} {self.unit.symbol()}'
def __new_value_for_comparisson(self, other):
if isinstance(other, Measure):
factor = Unit.conversion_factor(self.unit, other.unit)
new_value = self.numericalValue * factor
return new_value
return None
def __eq__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value == other.numericalValue
return False
def __ne__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value != other.numericalValue
return False
def __lt__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value < other.numericalValue
return False
def __le__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value <= other.numericalValue
return False
def __gt__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value > other.numericalValue
return False
def __ge__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value >= other.numericalValue
return False
def __add__(self, other):
if not isinstance(other, Measure):
raise ValueError('The value to be added is not a measure.')
if not self.unit.dimensions == other.unit.dimensions:
raise DimensionalException("Measures with units of different dimensions cannot be added together. {} != {}"
.format(self.unit, other.unit))
new_measure = Measure.create_by_converting(other, self.unit)
return_measure = Measure(self.numericalValue + new_measure.numericalValue, self.unit)
return return_measure
def __sub__(self, other):
if not isinstance(other, Measure):
raise ValueError('The value to be subtracted is not a measure.')
if not self.unit.dimensions == other.unit.dimensions:
raise DimensionalException("Measures with units of different dimensions cannot be subtracted from each "
"other. {} != {}".format(self.unit, other.unit))
new_measure = Measure.create_by_converting(other, self.unit)
return_measure = Measure(self.numericalValue - new_measure.numericalValue, self.unit)
return return_measure
def __mul__(self, other):
if isinstance(other, float) or isinstance(other, int):
new_measure = Measure(self.numericalValue * other, self.unit)
return new_measure
if not isinstance(other, Measure) and not isinstance(other, Point):
raise ValueError('The multiplicand is not a measure, a point, a float, or an int.')
other_unit = None
if isinstance(other, Measure):
other_unit = other.unit
if isinstance(other, Point):
other_unit = other.scale.unit
new_value = self.numericalValue * other.numericalValue
new_unit = Unit.get_unit_multiplication(self.unit, other_unit)
new_measure = Measure(new_value, new_unit)
simple_unit = Unit.simplified_compound_unit(new_unit)
new_measure.convert(simple_unit)
return new_measure
def __truediv__(self, other):
if isinstance(other, float) or isinstance(other, int):
new_measure = Measure(self.numericalValue / other, self.unit)
return new_measure
if not isinstance(other, Measure) and not isinstance(other, Point):
raise ValueError('The denominator is not a measure, a point, a float, or an int.')
other_unit = None
if isinstance(other, Measure):
other_unit = other.unit
if isinstance(other, Point):
other_unit = other.scale.unit
new_value = self.numericalValue / other.numericalValue
new_unit = Unit.get_unit_division(self.unit, other_unit)
new_measure = Measure(new_value, new_unit)
simple_unit = Unit.simplified_compound_unit(new_unit)
new_measure.convert(simple_unit)
return new_measure
Source diff could not be displayed: it is too large. Options to address this: view the blob.
Source diff could not be displayed: it is too large. Options to address this: view the blob.
from rdflib import URIRef
from omlib.dimension import Dimension
from omlib.exceptions.dimensionexception import DimensionalException
from omlib.exceptions.unitconversionexception import ScaleConversionException
from omlib.exceptions.unitidentityexception import ScaleIdentityException
from omlib.thing import Thing
from omlib.unit import Unit
class Scale(Thing):
_scales = []
@staticmethod
def clear_cache():
Scale._scales.clear()
@staticmethod
def _add_when_not_in_cache(scale):
for test_scale in Scale._scales:
if test_scale == scale:
if test_scale.identifier == scale.identifier or not isinstance(scale.identifier, URIRef) \
or not isinstance(test_scale.identifier, URIRef):
if isinstance(test_scale, RatioScale) and isinstance(scale, RatioScale):
if test_scale.unit == scale.unit:
if isinstance(scale.identifier, URIRef) and not isinstance(test_scale.identifier, URIRef):
test_scale.identifier = scale.identifier
return test_scale
if isinstance(test_scale, IntervalScale) and isinstance(scale, IntervalScale):
if test_scale.unit == scale.unit and test_scale.baseScale == scale.baseScale \
and test_scale.offSet == scale.offSet:
if isinstance(scale.identifier, URIRef) and not isinstance(test_scale.identifier, URIRef):
test_scale.identifier = scale.identifier
return test_scale
if test_scale.identifier == scale.identifier:
raise ScaleIdentityException("The scale with identifier {} already exists but with different"
"properties. \n{} \nis not the same as \n{}".format(scale.identifier,
scale, test_scale))
Scale._scales.append(scale)
return scale
@staticmethod
def with_label(label):
return_scales = []
for test_scale in Scale._scales:
for test_label in test_scale.allLabels:
if test_label == label:
return_scales.append(test_label)
return return_scales
@staticmethod
def with_identifier(identifier):
for test_scale in Scale._scales:
if test_scale.identifier == identifier:
return test_scale
return None
@staticmethod
def with_dimensions(dimensions, in_system_of_units=None):
return_scales = []
for test_scale in Scale._scales:
if test_scale.dimensions == dimensions and test_scale.systemOfUnits == in_system_of_units:
return_scales.append(test_scale)
return return_scales
@staticmethod
def get_ratio_scale(unit, label=None, identifier=None):
scale = RatioScale(unit, label, identifier)
scale = Scale._add_when_not_in_cache(scale)
return scale
@staticmethod
def get_interval_scale(base_scale, unit, off_set, label=None, identifier=None):
scale = IntervalScale(base_scale, unit, off_set, label, identifier)
scale = Scale._add_when_not_in_cache(scale)
return scale
@staticmethod
def conversion_factor(from_scale, to_scale):
if (isinstance(from_scale, RatioScale) or isinstance(from_scale, IntervalScale)) and \
(isinstance(to_scale, RatioScale) or isinstance(to_scale, IntervalScale)):
can_convert = Unit.can_convert(from_scale.unit, to_scale.unit)
if not can_convert:
raise DimensionalException("A scale with dimensions {} cannot be converted to a scale with "
"dimensions {}."
.format(from_scale.dimensions, to_scale.dimensions))
factor = Unit.conversion_factor(from_scale.unit, to_scale.unit)
return factor
else:
raise ScaleConversionException("Cannot convert from {} to {} as they are not both cardinal scales."
.format(from_scale, to_scale))
@staticmethod
def conversion_off_set(from_scale, to_scale):
if (isinstance(from_scale, RatioScale) or isinstance(from_scale, IntervalScale)) and \
(isinstance(to_scale, RatioScale) or isinstance(to_scale, IntervalScale)):
from_ratio_scale = from_scale.base_ratio_scale()
to_ratio_scale = to_scale.base_ratio_scale()
from_ratio_factor = Unit.conversion_factor(from_scale.unit,from_ratio_scale[0].unit)
to_ratio_factor = Unit.conversion_factor(to_scale.unit,to_ratio_scale[0].unit)
if from_ratio_scale[0] == to_ratio_scale[0]:
off_set = (to_ratio_scale[1] * to_ratio_factor - from_ratio_scale[1]*from_ratio_factor) \
/ to_ratio_factor
return off_set
else:
raise ScaleConversionException("Cannot convert from {} to {} as they do not use the same base ratio "
"scale, i.e. they do not have the same known zero point."
.format(from_scale, to_scale))
else:
raise ScaleConversionException("Cannot convert from {} to {} as they are not both cardinal scales."
.format(from_scale, to_scale))
def __init__(self, label=None, identifier=None, dimensions=Dimension(), system_of_units=None):
super().__init__(label, identifier)
self.fixedPoints = []
self.dimensions = dimensions
self.systemOfUnits = system_of_units
def add_fixed_point(self, fixed_point):
self.fixedPoints.append(fixed_point)
def __str__(self):
return f'{self.label()}\t<{self.identifier}> dim: {self.dimensions}'
def __eq__(self, other):
if isinstance(other, Scale):
return str(self.identifier) == str(other.identifier)
return False
def __ne__(self, other):
return not (self == other)
class RatioScale(Scale):
def __init__(self, unit, label=None, identifier=None, system_of_units=None):
if not isinstance(unit, Unit):
raise ValueError("The unit parameter in RatioScale is required to be of type Unit.")
if system_of_units is None:
system_of_units = unit.systemOfUnits
super().__init__(label, identifier, dimensions=unit.dimensions, system_of_units=system_of_units)
self.unit = unit
def base_ratio_scale(self):
return self, 0.0
def __eq__(self, other):
if isinstance(other, RatioScale):
if str(self.identifier) == str(other.identifier):
return True
if isinstance(self.identifier, URIRef) and isinstance(other.identifier, URIRef):
return False
if self.unit == other.unit:
return True
return False
def __str__(self):
return f'{self.label()}\t<{self.identifier}> unit: {self.unit} dim: {self.dimensions}'
class IntervalScale(Scale):
def __init__(self, base_scale, unit, off_set, label=None, identifier=None, system_of_units=None):
if not isinstance(unit, Unit):
raise ValueError("The unit parameter in IntervalScale is required to be of type Unit.")
if not isinstance(base_scale, Scale):
raise ValueError("The base_scale parameter in IntervalScale is required to be of type Scale.")
if unit.dimensions != base_scale.dimensions:
raise DimensionalException("The dimensions of the base scale are not the same as the dimensions of the"
" unit used. {} is not the same as {}"
.format(base_scale.dimensions, unit.dimensions))
if system_of_units is None:
system_of_units = base_scale.systemOfUnits
if system_of_units is None:
system_of_units = unit.systemOfUnits
super().__init__(label, identifier, dimensions=unit.dimensions, system_of_units=system_of_units)
self.unit = unit
self.baseScale = base_scale
self.offSet = off_set
def base_ratio_scale(self):
base_base = self.baseScale.base_ratio_scale()
conversion_fac = Unit.conversion_factor(base_base[0].unit, self.unit)
return base_base[0], base_base[1] * conversion_fac + self.offSet
def __eq__(self, other):
if isinstance(other, IntervalScale):
if str(self.identifier) == str(other.identifier):
return True
if self.unit == other.unit and self.baseScale == other.baseScale and self.offSet == other.offSet:
return True
return False
def __str__(self):
return f'{self.label()}\t<{self.identifier}> base: {self.baseScale} unit: {self.unit} dim: {self.dimensions}'
from rdflib import BNode, URIRef, Literal, XSD
class Thing:
def __init__(self, label=None, identifier=None):
if identifier is None:
self.identifier = BNode()
else:
self.identifier = URIRef(identifier)
if label is None:
self.prefLabels = []
else:
self.prefLabels = []
self.__add_label_to_array(self.prefLabels, label, None)
self.altLabels = []
def __add_label_to_array(self, array, label, language=None):
if isinstance(label, list):
for item in label:
self.__add_label_to_array(array, item, language)
else:
if language is None:
if isinstance(label, Literal) and not label in array:
array.append(label)
else:
label_lit = Literal(label, datatype=XSD.string)
if not label_lit in array:
array.append(label_lit)
else:
if isinstance(label, Literal) and not label in array:
label_lit = Literal(label.normalize, language)
array.append(label_lit)
else:
label_lit = Literal(label, language)
if not label_lit in array:
array.append(label_lit)
def add_preferred_label(self, label, language=None):
# TODO Check if a preferred label in the specified language already exists, if so move the old one to alt labels
self.__add_label_to_array(self.prefLabels, label, language)
def add_alternative_label(self, label, language=None):
self.__add_label_to_array(self.altLabels, label, language)
def label(self):
label = self.preferred_label()
return label
def preferred_label(self, language=None):
result_label = None
if language is None:
for pref_label in self.prefLabels:
if pref_label.language is None:
result_label = pref_label
if result_label is None:
for pref_label in self.prefLabels:
if pref_label.language == 'en':
result_label = pref_label
else:
for pref_label in self.prefLabels:
if pref_label.language == language:
result_label = pref_label
return result_label
def all_labels(self):
labels = []
labels.extend(self.prefLabels)
labels.extend(self.altLabels)
return labels
class SymbolThing(Thing):
def __init__(self, name=None, symbol=None, identifier=None):
super().__init__(name, identifier)
self.dimensions = []
self.symbols = []
self.add_symbol(symbol)
def add_symbol(self, symbol, language=None):
if isinstance(symbol, list):
for item in symbol:
self.__add_label_to_array(self.symbols, item, language)
else:
if language is None:
if isinstance(symbol, Literal) and not symbol in self.symbols:
self.symbols.append(symbol)
else:
label_lit = Literal(symbol, datatype=XSD.string)
if not label_lit in self.symbols:
self.symbols.append(label_lit)
else:
if isinstance(symbol, Literal) and not symbol in self.symbols:
label_lit = Literal(symbol.normalize, language)
self.symbols.append(label_lit)
else:
label_lit = Literal(symbol, language)
if not label_lit in self.symbols:
self.symbols.append(label_lit)
def symbol(self):
symbol = self.preferred_symbol()
return symbol
def preferred_symbol(self, language=None):
result_symbol = None
if language is None:
for symbol in self.symbols:
if symbol.language is None:
result_symbol = symbol
if result_symbol is None:
for symbol in self.symbols:
if symbol.language == 'en':
result_symbol = symbol
else:
for symbol in self.symbols:
if symbol.language == language:
result_symbol = symbol
return result_symbol
def all_symbols(self):
return self.symbols
import math
from rdflib import URIRef
from omlib.exceptions.dimensionexception import DimensionalException
from omlib.exceptions.unitconversionexception import UnitConversionException
from omlib.exceptions.unitidentityexception import UnitIdentityException
from omlib.dimension import Dimension
from omlib.thing import SymbolThing
class Unit(SymbolThing):
_units = []
@staticmethod
def with_label(label):
get_units = []
for unit in Unit._units:
for u_label in unit.all_labels():
if str(u_label) == label:
get_units.append(unit)
break
return get_units
@staticmethod
def with_symbol(symbol):
get_units = []
for unit in Unit._units:
for u_symbol in unit.all_symbols():
if str(u_symbol) == symbol:
get_units.append(unit)
return get_units
@staticmethod
def with_identifier(identifier):
for unit in Unit._units:
if str(unit.identifier) == str(identifier):
return unit
return None
@staticmethod
def with_prefix(prefix, base):
test_units = Unit.with_dimensions(base.dimensions)
for unit in test_units:
if isinstance(unit, PrefixedUnit):
if unit.baseUnit == base and unit.prefix == prefix:
return unit
return None
@staticmethod
def with_multiplication(multiplier, multiplicand):
new_dimensions = multiplier.dimensions * multiplicand.dimensions
test_units = Unit.with_dimensions(new_dimensions)
for unit in test_units:
if isinstance(unit, UnitMultiplication):
if unit.multiplier == multiplier and unit.multiplicand == multiplicand:
return unit
return None
@staticmethod
def with_division(numerator, denominator):
new_dimensions = numerator.dimensions / denominator.dimensions
test_units = Unit.with_dimensions(new_dimensions)
for unit in test_units:
if isinstance(unit, UnitDivision):
if unit.numerator == numerator and unit.denominator == denominator:
return unit
return None
@staticmethod
def with_exponentiation(base, exponent):
new_dimensions = Dimension.pow(base.dimensions, exponent)
test_units = Unit.with_dimensions(new_dimensions)
for unit in test_units:
if isinstance(unit, UnitExponentiation):
if unit.base == base and unit.exponent == exponent:
return unit
return None
@staticmethod
def with_dimensions(dimensions, in_system_of_units=None):
if isinstance(dimensions, Dimension):
get_units = []
for test_unit in Unit._units:
if test_unit.dimensions == dimensions:
if in_system_of_units is None or test_unit.systemOfUnits == in_system_of_units:
get_units.append(test_unit)
return get_units
return None
@staticmethod
def get_base_units(for_unit, in_system_of_units=str('http://www.ontology-of-units-of-measure.org/resource/om-2/InternationalSystemOfUnits')): # WARNING If you change "SI" also change in constants.py
if isinstance(for_unit, SingularUnit) or isinstance(for_unit, PrefixedUnit) \
or isinstance(for_unit, UnitMultiple):
units = Unit.with_dimensions(for_unit.dimensions)
for unit in units:
if unit.isBaseUnit and unit.systemOfUnits == in_system_of_units:
return unit
if isinstance(for_unit, UnitMultiplication):
base_multiplier = Unit.get_base_units(for_unit.multiplier, in_system_of_units)
base_multiplicand = Unit.get_base_units(for_unit.multiplicand, in_system_of_units)
if base_multiplier is not None and base_multiplicand is not None:
unit = Unit.get_unit_multiplication(base_multiplier, base_multiplicand,
system_of_units=in_system_of_units)
return unit
if isinstance(for_unit, UnitDivision):
base_numerator = Unit.get_base_units(for_unit.numerator, in_system_of_units)
base_denominator = Unit.get_base_units(for_unit.denominator, in_system_of_units)
if base_numerator is not None and base_denominator is not None:
unit = Unit.get_unit_division(base_numerator, base_denominator, system_of_units=in_system_of_units)
return unit
if isinstance(for_unit, UnitExponentiation):
base_base = Unit.get_base_units(for_unit.base, in_system_of_units)
if base_base is not None:
unit = Unit.get_unit_exponentiation(base_base, for_unit.exponent, system_of_units=in_system_of_units)
return unit
raise UnitIdentityException("Cannot find a base unit for {} in the system of units: {}"
.format(for_unit, in_system_of_units))
@staticmethod
def __add_when_not_duplicate(unit):
result_unit = Unit.with_identifier(unit.identifier)
dim_units = Unit.with_dimensions(unit.dimensions)
if result_unit is None:
for test_unit in dim_units:
try:
conversion_factor = abs(Unit.conversion_factor(unit, test_unit) - 1.0)
if conversion_factor < 0.0000001:
result_unit = test_unit
break
except UnitConversionException as error:
pass
if result_unit is not None:
if unit.label() is not None or unit.symbol() is not None:
all_labels = result_unit.all_labels()
found = False
str_unit_label = str(unit.label())
for label in all_labels:
if str(label) == str_unit_label:
found = True
break
if not found:
result_unit = None
if result_unit is None:
result_unit = unit
Unit._units.append(unit)
return result_unit
@staticmethod
def simplified_compound_unit(unit):
# creates a simplified unit (e.g. kg.mg/g becomes kg), conversion to this unit is however needed afterwards.
if isinstance(unit, CompoundUnit):
if isinstance(unit, UnitDivision):
if not isinstance(unit.numerator, CompoundUnit) and not isinstance(unit.denominator, CompoundUnit) \
and unit.numerator.dimensions == unit.denominator.dimensions:
return unit
exponents = Unit.__exponents_of_units_of_same_dimensions(unit)
result = Unit.__create_unit_from_exponents(exponents)
result.identifier = unit.identifier
return result
return unit
@staticmethod
def __exponents_of_units_of_same_dimensions(unit):
# takes together the exponents of units that have the same dimensions.
# So if you have kg.g this will result in exponent 2 for kg.
reduced = []
if isinstance(unit, CompoundUnit):
bue = unit.get_units_exponents()
for exp in bue:
found = False
for red in reduced:
if exp[0].dimensions == red[0].dimensions:
red[1] = red[1] + exp[1]
found = True
if not found:
reduced.append([exp[0], exp[1]])
return reduced
reduced.append([unit, 1.0])
return reduced
@staticmethod
def __create_unit_from_exponents(exponents):
numerator = None
denominator = None
units_in_numerator = 0
units_in_denominator = 0
for red in exponents:
if red[1] > 0:
units_in_numerator += 1
if red[1] < 0:
units_in_denominator += 1
for red in exponents:
if red[1] > 0:
unit_exp = red[0]
if units_in_numerator <= 1 or str(
unit_exp.identifier) != "http://www.ontology-of-units-of-measure.org/resource/om-2/one":
if red[1] > 1:
unit_exp = UnitExponentiation(red[0], red[1])
if numerator is None:
numerator = unit_exp
else:
numerator = UnitMultiplication(numerator, unit_exp)
if red[1] < 0:
unit_exp = red[0]
if unit_exp.identifier != str("http://www.ontology-of-units-of-measure.org/resource/om-2/one"):
if red[1] < -1:
unit_exp = UnitExponentiation(red[0], -red[1])
if denominator is None:
denominator = unit_exp
else:
denominator = UnitMultiplication(denominator, unit_exp)
if numerator is None:
numerator = Unit.get_singular_unit('one', '', Dimension())
if denominator is None:
result = numerator
else:
result = UnitDivision(numerator, denominator)
return result
@staticmethod
def reduce_unit(unit):
result = unit
if isinstance(unit, CompoundUnit):
reduced = []
bue = unit.get_units_exponents()
for exponent in bue:
found = False
for red in reduced:
if exponent[0] == red[0]:
found = True
red[1] = red[1] + exponent[1]
if not found:
reduced.append(exponent)
reduced.sort(key=Unit.cmp_to_key(Unit.__exponents_cmp))
if isinstance(unit, UnitDivision) \
and not isinstance(unit.numerator, CompoundUnit) \
and not isinstance(unit.denominator, CompoundUnit) \
and unit.numerator.dimensions == unit.denominator.dimensions:
return unit
result = Unit.__create_unit_from_exponents(reduced)
result.identifier = unit.identifier
return result
@staticmethod
def __exponents_cmp(a, b):
if a[1] > 0 and b[1] < 0:
return -1
if a[1] < 0 and b[1] > 0:
return 1
if a[1] > 0 and b[1] < a[1]:
return 1
if 0 < a[1] < b[1]:
return -1
if 0 > a[1] > b[1]:
return -1
if a[1] < 0 and b[1] > a[1]:
return 1
return 0
@staticmethod
def cmp_to_key(mycmp):
'Convert a cmp= function into a key= function'
class K:
def __init__(self, obj, *args):
self.obj = obj
def __lt__(self, other):
return mycmp(self.obj, other.obj) < 0
def __gt__(self, other):
return mycmp(self.obj, other.obj) > 0
def __eq__(self, other):
return mycmp(self.obj, other.obj) == 0
def __le__(self, other):
return mycmp(self.obj, other.obj) <= 0
def __ge__(self, other):
return mycmp(self.obj, other.obj) >= 0
def __ne__(self, other):
return mycmp(self.obj, other.obj) != 0
return K
@staticmethod
def conversion_factor(from_unit, to_unit):
can_convert = Unit.can_convert(from_unit, to_unit)
if not can_convert:
raise DimensionalException("A unit with dimensions {} cannot be converted to a unit with dimensions {}."
.format(from_unit.dimensions, to_unit.dimensions))
from_base_units_exponents = from_unit.get_base_units_exponents()
to_base_units_exponents = to_unit.get_base_units_exponents()
from_norm = Unit.__normalise_base_units_exponents(from_base_units_exponents)
to_norm = Unit.__normalise_base_units_exponents(to_base_units_exponents)
if not Unit.__check_same_units_from_normalised(from_norm, to_norm):
raise UnitConversionException("Cannot convert from {} to {} as they do not have a common ancestor unit."
.format(from_unit, to_unit))
from_factor = Unit.__factor_from_normalised(from_norm)
to_factor = Unit.__factor_from_normalised(to_norm)
factor = from_factor / to_factor
return factor
@staticmethod
def __normalise_base_units_exponents(base_unit_exponents):
unit_dict = {}
for exponent in base_unit_exponents:
entry = [0, 1.0]
if exponent[0].identifier in unit_dict:
entry = unit_dict[exponent[0].identifier]
entry[0] = entry[0] + exponent[1]
entry[1] = entry[1] * pow(exponent[2], exponent[1])
unit_dict[exponent[0].identifier] = entry
return unit_dict
@staticmethod
def __factor_from_normalised(normalised):
factor = 1.0
for entry in normalised:
factor = factor * normalised.get(entry)[1]
return factor
@staticmethod
def __check_same_units_from_normalised(normalised_unit1, normalised_unit2):
keys1 = normalised_unit1.keys()
for ident in keys1:
exp_1 = normalised_unit1.get(ident)
exp_2 = normalised_unit2.get(ident)
if exp_2 is None or exp_1[0] != exp_2[0]:
return False
keys2 = normalised_unit2.keys()
for ident in keys2:
exp_1 = normalised_unit1.get(ident)
exp_2 = normalised_unit2.get(ident)
if exp_1 is None or exp_1[0] != exp_2[0]:
return False
return True
@staticmethod
def print_base_units_exponents(unit):
base_units_exponents = unit.get_base_units_exponents()
bue_string = ""
i = 0
for exponent in base_units_exponents:
bue_string += f'[{str(exponent[0].symbol()), exponent[1], exponent[2]}]'
i = i + 1
if i < len(base_units_exponents):
bue_string += ', '
print('BASE UNIT EXPONENTS: {}: [{}]'.format(unit.symbol(), bue_string))
@staticmethod
def can_convert(from_unit, to_unit):
if isinstance(from_unit, Unit) and isinstance(to_unit, Unit):
if from_unit.dimensions == to_unit.dimensions:
return True
return False
if not isinstance(from_unit, Unit) and not isinstance(to_unit, Unit):
raise ValueError("Both arguments of can_convert are required to be of type Unit.")
if not isinstance(from_unit, Unit):
raise ValueError("The first argument of can_convert is required to be of type Unit.")
if not isinstance(to_unit, Unit):
raise ValueError("The second argument of can_convert is required to be of type Unit.")
@staticmethod
def get_singular_unit(label, symbol, dimensions=Dimension(), base_unit=None, factor=1.0, identifier=None,
cache=True, system_of_units=None, is_base_unit=False):
unit = None
if identifier is not None:
test_unit = Unit.with_identifier(identifier)
if test_unit is not None:
if not isinstance(test_unit, SingularUnit):
raise UnitIdentityException("The identifier for a SingularUnit has been used earlier"
" for another type of unit")
else:
if base_unit is not None and base_unit != test_unit.baseUnit:
base_identifier = str(base_unit.identifier)
test_identifier = str(test_unit.baseUnit.identifier)
base_ok = False
factor_ok = False
if base_identifier == test_identifier:
base_ok = True
if factor == test_unit.factor:
factor_ok = True
else:
can_convert_base = Unit.can_convert(base_unit, test_unit.baseUnit)
if can_convert_base:
base_ok = True
conv_factor = Unit.conversion_factor(base_unit, test_unit.baseUnit)
tot_factor = conv_factor * factor
log_tot_factor = math.log10(tot_factor)
log_factor = math.log10(test_unit.factor)
diff = abs(log_tot_factor - log_factor)
if diff < 0.00001:
factor_ok = True
if not base_ok:
raise UnitIdentityException("The requested SingularUnit uses a different base unit"
" as the earlier defined unit with the same identifier.")
if not factor_ok:
raise UnitIdentityException("The requested SingularUnit uses a different conversion factor"
" as the earlier defined unit with the same identifier.")
if label is not None:
test_unit.add_preferred_label(label)
if symbol is not None:
test_unit.add_symbol(symbol)
if system_of_units is not None and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = system_of_units
if is_base_unit:
test_unit.isBaseUnit = is_base_unit
return test_unit
if unit is None:
if base_unit is not None and base_unit.systemOfUnits is not None and system_of_units is None \
and factor == 1.0:
system_of_units = base_unit.systemOfUnits
unit = SingularUnit(label, symbol, dimensions, base_unit, factor, identifier, system_of_units, is_base_unit)
if cache:
unit = Unit.__add_when_not_duplicate(unit)
return unit
@staticmethod
def get_prefixed_unit(prefix, base_unit, identifier=None, cache=True, system_of_units=None, is_base_unit=False):
unit = None
if isinstance(prefix, str):
prefix = Prefix.with_identifier(prefix)
if isinstance(prefix, URIRef):
prefix = Prefix.with_identifier(prefix.value)
if identifier is not None:
test_unit = Unit.with_identifier(identifier)
if test_unit is not None:
if not isinstance(test_unit, PrefixedUnit):
raise UnitIdentityException("The identifier for a PrefixedUnit has been used earlier"
" for another type of unit")
else:
base_identifier = str(base_unit.identifier)
test_identifier = str(test_unit.baseUnit.identifier)
if base_identifier != test_identifier:
raise UnitIdentityException("The requested PrefixedUnit uses a different base unit"
" as the earlier defined unit with the same identifier.")
if str(prefix.identifier) != str(test_unit.prefix.identifier):
raise UnitIdentityException("The requested PrefixedUnit uses a different prefix"
" as the earlier defined unit with the same identifier.")
if system_of_units is not None and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = system_of_units
if base_unit.systemOfUnits is not None and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = base_unit.systemOfUnits
if is_base_unit:
test_unit.isBaseUnit = is_base_unit
return test_unit
if unit is None:
test_unit = Unit.with_prefix(prefix, base_unit)
if test_unit is not None:
if identifier is not None and not isinstance(test_unit.identifier, URIRef):
test_unit.identifier = URIRef(identifier)
if system_of_units is not None and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = system_of_units
if base_unit.systemOfUnits is not None and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = base_unit.systemOfUnits
if is_base_unit and not test_unit.isBaseUnit:
test_unit.isBaseUnit = is_base_unit
return test_unit
if unit is None:
if system_of_units is None:
system_of_units = base_unit.systemOfUnits
unit = PrefixedUnit(prefix, base_unit, identifier, system_of_units=system_of_units,
is_base_unit=is_base_unit)
if cache:
unit = Unit.__add_when_not_duplicate(unit)
return unit
@staticmethod
def get_unit_multiple(base_unit, factor=1.0, identifier=None, label=None, symbol=None, cache=True,
system_of_units=None):
unit = None
if identifier is not None:
test_unit = Unit.with_identifier(identifier)
if test_unit is not None:
if not isinstance(test_unit, UnitMultiple):
raise UnitIdentityException("The identifier for a UnitMultiple has been used earlier"
" for another type of unit")
else:
base_identifier = str(base_unit.identifier)
test_identifier = str(test_unit.baseUnit.identifier)
if base_identifier != test_identifier:
raise UnitIdentityException("The requested UnitMultiple uses a different base unit"
" as the earlier defined unit with the same identifier.")
if label is not None:
test_unit.add_preferred_label(label)
if symbol is not None:
test_unit.add_symbol(symbol)
if system_of_units is not None and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = system_of_units
if base_unit.systemOfUnits is not None and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = base_unit.systemOfUnits
return test_unit
if unit is None:
unit = UnitMultiple(base_unit, factor, identifier, label, symbol, system_of_units)
if system_of_units is None:
system_of_units = base_unit.systemOfUnits
if cache:
unit = Unit.__add_when_not_duplicate(unit)
return unit
@staticmethod
def get_unit_multiplication(multiplier, multiplicand, identifier=None, cache=True,
system_of_units=None):
if identifier is not None:
for test_unit in Unit._units:
if str(test_unit.identifier) == str(identifier):
if not isinstance(test_unit, UnitMultiplication):
raise UnitIdentityException("The identifier for a UnitMultiplication has been used earlier"
" for another type of unit")
else:
if str(multiplier.identifier) != str(test_unit.multiplier.identifier) or \
str(multiplicand.identifier) != str(test_unit.multiplicand.identifier):
raise UnitIdentityException("The requested UnitMultiplication uses a different pair of"
" units as multiplier and multiplicand as the earlier defined"
" unit with the same identifier.")
if system_of_units is not None and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = system_of_units
if multiplier.systemOfUnits is not None and multiplicand.systemOfUnits is not None \
and multiplier.systemOfUnits == multiplicand.systemOfUnits \
and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = multiplier.systemOfUnits
return test_unit
unit = Unit.with_multiplication(multiplier, multiplicand)
if unit is not None:
if identifier is not None:
unit.identifier = identifier
if system_of_units is not None and unit.systemOfUnits is None:
unit.systemOfUnits = system_of_units
if multiplier.systemOfUnits is not None and multiplicand.systemOfUnits is not None \
and multiplier.systemOfUnits == multiplicand.systemOfUnits and unit.systemOfUnits is None:
unit.systemOfUnits = multiplier.systemOfUnits
if unit is None:
for unit in Unit._units:
if isinstance(unit, UnitMultiplication):
if str(multiplier.identifier) == str(unit.multiplier.identifier) and \
str(multiplicand.identifier) == str(unit.multiplicand.identifier):
if system_of_units is not None:
unit.systemOfUnits = system_of_units
return unit
if system_of_units is None and multiplier.systemOfUnits is not None and \
multiplicand.systemOfUnits is not None and multiplicand.systemOfUnits == multiplier.systemOfUnits:
system_of_units = multiplier.systemOfUnits
unit = UnitMultiplication(multiplier, multiplicand, identifier, system_of_units)
unit = Unit.reduce_unit(unit)
if cache:
unit = Unit.__add_when_not_duplicate(unit)
return unit
@staticmethod
def get_unit_division(numerator, denominator, identifier=None, cache=True, system_of_units=None):
if identifier is not None:
for test_unit in Unit._units:
if str(test_unit.identifier) == str(identifier):
if not isinstance(test_unit, UnitDivision):
raise UnitIdentityException("The identifier for a UnitDivision has been used earlier"
" for another type of unit")
else:
if str(numerator.identifier) != str(test_unit.numerator.identifier) or \
str(denominator.identifier) != str(test_unit.denominator.identifier):
raise UnitIdentityException("The requested UnitDivision uses a different pair of"
" units as numerator and denominator as the earlier "
"defined unit with the same identifier.")
if system_of_units is not None and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = system_of_units
if numerator.systemOfUnits is not None and denominator.systemOfUnits is not None \
and numerator.systemOfUnits == denominator.systemOfUnits \
and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = numerator.systemOfUnits
return test_unit
unit = Unit.with_division(numerator, denominator)
if unit is not None:
if identifier is not None:
unit.identifier = identifier
if system_of_units is not None:
unit.systemOfUnits = system_of_units
if numerator.systemOfUnits is not None and denominator.systemOfUnits is not None \
and numerator.systemOfUnits == denominator.systemOfUnits and unit.systemOfUnits is None:
unit.systemOfUnits = numerator.systemOfUnits
if unit is None:
for unit in Unit._units:
if isinstance(unit, UnitDivision):
if str(numerator.identifier) == str(unit.numerator.identifier) and \
str(denominator.identifier) == str(unit.denominator.identifier):
if system_of_units is not None:
unit.systemOfUnits = system_of_units
return unit
if system_of_units is None and denominator.systemOfUnits is not None and \
numerator.systemOfUnits is not None and numerator.systemOfUnits == denominator.systemOfUnits:
system_of_units = numerator.systemOfUnits
unit = UnitDivision(numerator, denominator, identifier, system_of_units)
unit = Unit.reduce_unit(unit)
if cache:
unit = Unit.__add_when_not_duplicate(unit)
return unit
@staticmethod
def get_unit_exponentiation(base, exponent, identifier=None, cache=True, system_of_units=None):
unit = None
if identifier is not None:
for test_unit in Unit._units:
if str(test_unit.identifier) == str(identifier):
if not isinstance(test_unit, UnitExponentiation):
raise UnitIdentityException("The identifier for a UnitExponentiation has been used earlier"
" for another type of unit")
else:
if str(base.identifier) != str(test_unit.base.identifier):
raise UnitIdentityException("The requested UnitExponentiation uses a different unit "
"as base as the earlier defined unit with the same identifier.")
if exponent != test_unit.exponent:
raise UnitIdentityException("The requested UnitExponentiation uses a different exponent as"
" the earlier defined unit with the same identifier.")
if system_of_units is not None and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = system_of_units
if base.systemOfUnits is not None and test_unit.systemOfUnits is None:
test_unit.systemOfUnits = base.systemOfUnits
return test_unit
unit = Unit.with_exponentiation(base, exponent)
if unit is not None:
if identifier is not None:
unit.identifier = identifier
if system_of_units is not None:
unit.systemOfUnits = system_of_units
if unit is None:
for unit in Unit._units:
if isinstance(unit, UnitExponentiation):
if str(base.identifier) == str(unit.base.identifier) and \
exponent == unit.exponent:
if system_of_units is not None:
unit.systemOfUnits = system_of_units
return unit
if system_of_units is None and base.systemOfUnits is not None:
system_of_units = base.systemOfUnits
unit = UnitExponentiation(base, exponent, identifier, system_of_units)
unit = Unit.reduce_unit(unit)
if cache:
unit = Unit.__add_when_not_duplicate(unit)
return unit
def __init__(self, label=None, symbol=None, dimensions=Dimension(), identifier=None, system_of_units=None,
is_base_unit=False):
super().__init__(label, symbol, identifier)
self.isBaseUnit = is_base_unit
self.systemOfUnits = system_of_units
self.dimensions = dimensions
def __str__(self):
return f'{self.label()}\t{self.symbol()}\t<{self.identifier}> dim: {self.dimensions}'
def __eq__(self, other):
if isinstance(other, Unit):
return str(self.identifier) == str(other.identifier)
return False
def __ne__(self, other):
return not (self == other)
class Prefix(object):
_prefixes = dict()
@staticmethod
def with_identifier(identifier):
identifier_string = str(identifier)
if identifier_string in Prefix._prefixes:
return Prefix._prefixes[identifier_string]
return None
def __init__(self, name, symbol, factor, identifier):
self.name = name
self.symbol = symbol
self.factor = factor
self.identifier = identifier
Prefix._prefixes[str(identifier)] = self
def __eq__(self, other):
if isinstance(other, Prefix):
return str(self.identifier) == str(other.identifier)
return False
def __ne__(self, other):
return not (self == other)
class SingularUnit(Unit):
def __init__(self, label, symbol, dimensions=Dimension(), base_unit=None, factor=1.0, identifier=None,
system_of_units=None, is_base_unit=False):
if base_unit is not None:
dimensions = base_unit.dimensions
if system_of_units is None and not is_base_unit and base_unit is not None and factor == 1.0:
system_of_units = base_unit.systemOfUnits
super().__init__(label, symbol, dimensions, identifier, system_of_units=system_of_units,
is_base_unit=is_base_unit)
self.factor = factor
self.baseUnit = base_unit
def __eq__(self, other):
if isinstance(other, SingularUnit):
if not isinstance(self.identifier, URIRef) or not isinstance(other.identifier, URIRef):
if self.baseUnit is None and other.baseUnit == self and other.factor == self.factor:
return True
if other.baseUnit is None and self.baseUnit == other and other.factor == self.factor:
return True
if other.baseUnit == self.baseUnit and other.factor == self.factor:
return True
else:
return str(self.identifier) == str(other.identifier)
return False
def get_units_exponents(self):
return [[self, 1, 1.0]]
def get_base_units_exponents(self):
if self.baseUnit is None:
return [[self, 1, 1.0]]
base_base = self.baseUnit.get_base_units_exponents()
result = []
factor_added = False
for exponents in base_base:
if not factor_added:
corrected_factor = pow(self.factor, 1 / (exponents[1]))
converted = [exponents[0], exponents[1], exponents[2] * corrected_factor]
factor_added = True
else:
converted = exponents
result.append(converted)
return result
class PrefixedUnit(Unit):
def __init__(self, prefix, base_unit, identifier=None, system_of_units=None, is_base_unit=False):
label = f'{prefix.name}{base_unit.label()}'
symbol = f'{prefix.symbol}{base_unit.symbol()}'
self.prefix = prefix
self.baseUnit = base_unit
if system_of_units is None and not is_base_unit and base_unit is not None:
system_of_units = base_unit.systemOfUnits
super().__init__(label, symbol, dimensions=self.baseUnit.dimensions, identifier=identifier,
system_of_units=system_of_units, is_base_unit=is_base_unit)
def __eq__(self, other):
if isinstance(other, PrefixedUnit):
if not isinstance(self.identifier, URIRef) or not isinstance(other.identifier, URIRef):
if other.baseUnit == self.baseUnit and other.prefix == self.prefix:
return True
else:
return str(self.identifier) == str(other.identifier)
return False
def get_units_exponents(self):
return [[self, 1, 1.0]]
def get_base_units_exponents(self):
base_base = self.baseUnit.get_base_units_exponents()
result = []
factor_added = False
for exponents in base_base:
if not factor_added:
corrected_factor = pow(self.prefix.factor, 1 / (exponents[1]))
converted = [exponents[0], exponents[1], exponents[2] * corrected_factor]
factor_added = True
else:
converted = exponents
result.append(converted)
return result
class UnitMultiple(SingularUnit):
def __init__(self, base_unit, factor=1.0, identifier=None, label=None, symbol=None, system_of_units=None,
is_base_unit=False):
if label is None:
label = f'{factor} {base_unit.label()}'
if symbol is None:
symbol = f'{factor}{base_unit.symbol()}'
super().__init__(label, symbol, base_unit.dimensions, base_unit, factor, identifier,
system_of_units=system_of_units, is_base_unit=is_base_unit)
class CompoundUnit(Unit):
def __init__(self, symbol, dimensions, identifier=None, system_of_units=None, is_base_unit=False):
super().__init__(None, symbol, dimensions, identifier, system_of_units=system_of_units,
is_base_unit=is_base_unit)
class UnitMultiplication(CompoundUnit):
def __init__(self, multiplier, multiplicand, identifier=None, system_of_units=None,
is_base_unit=False):
dimensions = multiplier.dimensions * multiplicand.dimensions
multiplier_str = str(multiplier.symbol())
if isinstance(multiplier, CompoundUnit) and not isinstance(multiplier, UnitExponentiation):
multiplier_str = f'{multiplier_str}'
multiplicand_str = str(multiplicand.symbol())
if isinstance(multiplicand, CompoundUnit) and not isinstance(multiplicand, UnitExponentiation):
multiplicand_str = f'{multiplicand_str}'
if system_of_units is None and multiplier.systemOfUnits is not None and multiplicand.systemOfUnits is not None \
and multiplier.systemOfUnits == multiplicand.systemOfUnits:
system_of_units = multiplier.systemOfUnits
symbol = f'{multiplier_str}·{multiplicand_str}'
super().__init__(symbol, dimensions, identifier, system_of_units=system_of_units, is_base_unit=is_base_unit)
self.multiplier = multiplier
self.multiplicand = multiplicand
def __eq__(self, other):
if isinstance(other, UnitMultiplication):
if not isinstance(self.identifier, URIRef) or not isinstance(other.identifier, URIRef):
if other.multiplier == self.multiplier and other.multiplicand == self.multiplicand:
return True
if other.multiplier == self.multiplicand and other.multiplicand == self.multiplier:
return True
else:
return str(self.identifier) == str(other.identifier)
return False
def get_units_exponents(self):
multiplier_exp = self.multiplier.get_units_exponents()
multiplicand_exp = self.multiplicand.get_units_exponents()
result = []
result.extend(multiplier_exp)
result.extend(multiplicand_exp)
return result
def get_base_units_exponents(self):
multiplier_base = self.multiplier.get_base_units_exponents()
multiplicand_base = self.multiplicand.get_base_units_exponents()
result = []
result.extend(multiplier_base)
result.extend(multiplicand_base)
return result
class UnitDivision(CompoundUnit):
def __init__(self, numerator, denominator, identifier=None, system_of_units=None, is_base_unit=False):
dimensions = numerator.dimensions / denominator.dimensions
numerator_str = str(numerator.symbol())
if isinstance(numerator, CompoundUnit) and not isinstance(numerator, UnitExponentiation):
numerator_str = f'({numerator_str})'
denominator_str = str(denominator.symbol())
if isinstance(denominator, CompoundUnit) and not isinstance(denominator, UnitExponentiation):
denominator_str = f'({denominator_str})'
if system_of_units is None and numerator.systemOfUnits is not None and denominator.systemOfUnits is not None \
and numerator.systemOfUnits == denominator.systemOfUnits:
system_of_units = numerator.systemOfUnits
symbol = f'{numerator_str}/{denominator_str}'
super().__init__(symbol, dimensions, identifier, system_of_units=system_of_units, is_base_unit=is_base_unit)
self.numerator = numerator
self.denominator = denominator
def __eq__(self, other):
if isinstance(other, UnitDivision):
if not isinstance(self.identifier, URIRef) or not isinstance(other.identifier, URIRef):
if other.numerator == self.numerator and other.denominator == self.denominator:
return True
else:
return str(self.identifier) == str(other.identifier)
return False
def get_units_exponents(self):
numerator_exp = self.numerator.get_units_exponents()
denominator_exp = self.denominator.get_units_exponents()
result = []
result.extend(numerator_exp)
for exponents in denominator_exp:
result.append([exponents[0], -exponents[1], exponents[2]])
return result
def get_base_units_exponents(self):
numerator_base = self.numerator.get_base_units_exponents()
denominator_base = self.denominator.get_base_units_exponents()
result = []
result.extend(numerator_base)
for exponents in denominator_base:
result.append([exponents[0], -exponents[1], exponents[2]])
return result
class UnitExponentiation(CompoundUnit):
def __init__(self, base, exponent, identifier=None, system_of_units=None, is_base_unit=False):
dimensions = Dimension.pow(base.dimensions, exponent)
base_str = str(base.symbol())
if isinstance(base, CompoundUnit):
base_str = f'({base_str})'
if system_of_units is None and base.systemOfUnits is not None:
system_of_units = base.systemOfUnits
symbol = f'{base_str}{exponent}'
super().__init__(symbol, dimensions, identifier, system_of_units=system_of_units, is_base_unit=is_base_unit)
self.base = base
self.exponent = exponent
def __eq__(self, other):
if isinstance(other, UnitExponentiation):
if not isinstance(self.identifier, URIRef) or not isinstance(other.identifier, URIRef):
if other.base == self.base and other.exponent == self.exponent:
return True
else:
return str(self.identifier) == str(other.identifier)
return False
def get_units_exponents(self):
base_exp = self.base.get_units_exponents()
result = []
for exponents in base_exp:
result.append([exponents[0], exponents[1] * self.exponent, exponents[2]])
return result
def get_base_units_exponents(self):
base_base = self.base.get_base_units_exponents()
result = []
for exponents in base_base:
result.append([exponents[0], exponents[1] * self.exponent, exponents[2]])
return result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment