Skip to content
Snippets Groups Projects
Verified Commit 12f7aa5d authored by Martin Weise's avatar Martin Weise
Browse files

Updated docs & deleted untracked files

parent 6f06b3a5
No related branches found
No related tags found
3 merge requests!356Dev,!354Hotfix/docs,!353Updated docs & deleted untracked files
Showing
with 15 additions and 44385 deletions
# Example values to override for non-test deployments
#BASE_URL=https://example.com
#ADMIN_EMAIL=noreply@example.com
#LOG_LEVEL=info
#IDENTITY_SERVICE_ADMIN_PASSWORD=admin
#AUTH_SERVICE_ADMIN_PASSWORD=admin
#METADATA_DB_PASSWORD=dbrepo
#DATA_DB_PASSWORD=dbrepo
#AUTH_DB_PASSWORD=dbrepo
#S3_ACCESS_KEY_ID=seaweedfsadmin
#S3_SECRET_ACCESS_KEY=seaweedfsadmin
#SYSTEM_PASSWORD=admin
...@@ -18,10 +18,10 @@ environments. ...@@ -18,10 +18,10 @@ environments.
## Architecture ## Architecture
The repository is designed as a service-based architecture to ensure scalability and the utilization of various The repository is designed as a service-based architecture to ensure scalability and the utilization of various
technologies. The conceptualized microservices operate the basic database operations, data versioning as well as technologies. The conceptualized microservices (c.f. [Fig. 1](#fig1)) operate the basic database operations, data versioning as well as
*findability*, *accessability*, *interoperability* and *reuseability* (FAIR). *findability*, *accessability*, *interoperability* and *reuseability* (FAIR).
<figure markdown> <figure id="fig1" markdown>
![DBRepo architecture](../images/architecture-docker-compose.svg) ![DBRepo architecture](../images/architecture-docker-compose.svg)
<figcaption>Architecture of the services deployed via Docker Compose</figcaption> <figcaption>Fig. 1: Architecture of the services deployed via Docker Compose</figcaption>
</figure> </figure>
\ No newline at end of file
No preview for this file type
:root, :root,
[data-md-color-accent=indigo] { [data-md-color-accent=indigo] {
--md-hue: 230;
--md-default-bg-color: hsla(230, 15%, 21%, 1);
--md-primary-fg-color: #006699; --md-primary-fg-color: #006699;
--md-accent-fg-color: #005c8a; --md-accent-fg-color: #005c8a;
/* darken 10% */ /* darken 10% */
--md-primary-fg-color--dark: #00537c; --md-primary-fg-color--dark: #00537c;
/* darken 10% */ } /* darken 10% */ }
[data-md-color-scheme="slate"] {
--md-hue: 230;
--md-default-bg-color: hsla(230, 15%, 21%, 1); }
img.img-border { img.img-border {
border: 1px solid #b3b3b3; } border: 1px solid #b3b3b3; }
...@@ -30,3 +32,5 @@ img.img-border { ...@@ -30,3 +32,5 @@ img.img-border {
.md-banner { .md-banner {
background-color: var(--md-primary-fg-color--dark); } background-color: var(--md-primary-fg-color--dark); }
/*# sourceMappingURL=extra.css.map */
{ {
"version": 3, "version": 3,
"mappings": "AAAA;6BAC8B;EAC5B,qBAAqB,CAAC,QAAQ;EAC9B,oBAAoB,CAAC,QAAQ;EAAE,gBAAgB;EAC/C,2BAA2B,CAAC,QAAQ;EAAE,gBAAgB;;AAGxD,cAAe;EACb,MAAM,EAAE,iBAAiB;;AAG3B,yCAA0C;EACxC,KAAK,EAAE,gBAAgB;;AAIvB,oGACQ;EACN,KAAK,EAAE,gCAAgC;EACvC,UAAU,EAAE,OAAO;;AAIvB;0DAC2D;EACzD,KAAK,EAAE,0BAA0B;EACjC,aAAa,EAAE,oCAAoC;EAEnD;;kEACQ;IACN,KAAK,EAAE,gCAAgC;IACvC,aAAa,EAAE,0CAA0C;;AAK7D,UAAW;EACT,gBAAgB,EAAE,gCAAgC", "mappings": "AAAA;6BAC8B;EAC5B,qBAAqB,CAAC,QAAQ;EAC9B,oBAAoB,CAAC,QAAQ;EAAE,gBAAgB;EAC/C,2BAA2B,CAAC,QAAQ;EAAE,gBAAgB;;AAGxD,8BAA+B;EAC7B,QAAQ,CAAC,IAAI;EACb,qBAAqB,CAAC,uBAAuB;;AAG/C,cAAe;EACb,MAAM,EAAE,iBAAiB;;AAG3B,yCAA0C;EACxC,KAAK,EAAE,gBAAgB;;AAIvB,oGACQ;EACN,KAAK,EAAE,gCAAgC;EACvC,UAAU,EAAE,OAAO;;AAIvB;0DAC2D;EACzD,KAAK,EAAE,0BAA0B;EACjC,aAAa,EAAE,oCAAoC;EAEnD;;kEACQ;IACN,KAAK,EAAE,gCAAgC;IACvC,aAAa,EAAE,0CAA0C;;AAK7D,UAAW;EACT,gBAAgB,EAAE,gCAAgC",
"sources": ["extra.scss"], "sources": ["extra.scss"],
"names": [], "names": [],
"file": "extra.css" "file": "extra.css"
......
:root, :root,
[data-md-color-accent=indigo] { [data-md-color-accent=indigo] {
--md-hue: 230;
--md-default-bg-color: hsla(230, 15%, 21%, 1);
--md-primary-fg-color: #006699; --md-primary-fg-color: #006699;
--md-accent-fg-color: #005c8a; /* darken 10% */ --md-accent-fg-color: #005c8a; /* darken 10% */
--md-primary-fg-color--dark: #00537c; /* darken 10% */ --md-primary-fg-color--dark: #00537c; /* darken 10% */
} }
[data-md-color-scheme="slate"] {
--md-hue: 230;
--md-default-bg-color: hsla(230, 15%, 21%, 1);
}
img.img-border { img.img-border {
border: 1px solid #b3b3b3; border: 1px solid #b3b3b3;
} }
......
import logging
from dataclasses import dataclass
import requests
from flask import current_app
from typing import List
from jwt import jwk_from_pem, JWT
@dataclass(init=True, eq=True)
class User:
username: str
roles: List[str]
class KeycloakClient:
def obtain_user_token(self, username: str, password: str) -> str:
response = requests.post(
f"{current_app.config['AUTH_SERVICE_ENDPOINT']}/realms/dbrepo/protocol/openid-connect/token",
data={
"username": username,
"password": password,
"grant_type": "password",
"client_id": current_app.config["AUTH_SERVICE_CLIENT"],
"client_secret": current_app.config["AUTH_SERVICE_CLIENT_SECRET"]
})
body = response.json()
if "access_token" not in body:
raise AssertionError("Failed to obtain user token(s)")
return response.json()["access_token"]
def verify_jwt(self, access_token: str) -> User:
public_key = jwk_from_pem(str(current_app.config["JWT_PUBKEY"]).encode('utf-8'))
payload = JWT().decode(message=access_token, key=public_key, do_time_check=True)
logging.debug(f"JWT token client_id={payload.get('client_id')} and realm_access={payload.get('realm_access')}")
return User(username=payload.get('client_id'), roles=payload.get('realm_access')["roles"])
"""
The opensearch_client.py is used by the different API endpoints in routes.py to handle requests to the opensearch db
"""
import os
from json import dumps, load
import logging
from dbrepo.api.dto import Database
from collections.abc import MutableMapping
from opensearchpy import OpenSearch, TransportError, RequestError, NotFoundError
from omlib.measure import om
from omlib.constants import OM_IDS
from omlib.omconstants import OM
from omlib.unit import Unit
class OpenSearchClient:
"""
The client to communicate with the OpenSearch database.
"""
host: str = None
port: int = None
username: str = None
password: str = None
instance: OpenSearch = None
def __init__(self, host: str = None, port: int = None, username: str = None, password: str = None):
self.host = os.getenv('OPENSEARCH_HOST', host)
self.port = int(os.getenv('OPENSEARCH_PORT', port))
self.username = os.getenv('OPENSEARCH_USERNAME', username)
self.password = os.getenv('OPENSEARCH_PASSWORD', password)
def _instance(self) -> OpenSearch:
"""
Wrapper method to get the instance singleton.
@returns: The opensearch instance singleton, if successful.
"""
if self.instance is None:
self.instance = OpenSearch(hosts=[{"host": self.host, "port": self.port}],
http_compress=True,
http_auth=(self.username, self.password))
return self.instance
def get_database(self, database_id: int) -> Database:
"""
Gets a database by given id.
@param database_id: The database id.
@returns: The database, if successful.
@throws: opensearchpy.exceptions.NotFoundError If the database was not found in the Search Database.
"""
response: dict = self._instance().get(index="database", id=database_id)
return Database.parse_obj(response["_source"])
def update_database(self, database_id: int, data: Database) -> Database:
"""
Updates the database data with given id.
@param database_id: The database id.
@param data: The database data.
@returns: The updated database, if successful.
@throws: opensearchpy.exceptions.NotFoundError If the database was not found in the Search Database.
"""
logging.debug(f"updating database with id: {database_id} in search database")
self._instance().index(index="database", id=database_id, body=dumps(data.model_dump()))
response: dict = self._instance().get(index="database", id=database_id)
database = Database.parse_obj(response["_source"])
logging.info(f"Updated database with id {database_id} in index 'database'")
return database
def delete_database(self, database_id: int) -> None:
"""
Deletes the database data with given id.
@param database_id: The database id.
@throws: opensearchpy.exceptions.NotFoundError If the database was not found in the Search Database.
"""
self._instance().delete(index="database", id=database_id)
logging.info(f"Deleted database with id {database_id} in index 'database'")
def query_index_by_term_opensearch(self, term, mode):
"""
old code, is effectively replaced by general_search() now
sends an opensearch query
:return list of dicts
"""
query_str = ""
if mode == "exact":
query_str = f"{term}"
elif mode == "contains":
query_str = f"*{term}*"
response = self._instance().search(
index="database",
body={
"query": {
"query_string": {
"query": query_str,
"allow_leading_wildcard": "true", # default true
}
},
},
)
results = [hit["_source"] for hit in response["hits"]["hits"]]
return results
def get_fields_for_index(self, field_type: str):
"""
returns a list of attributes of the data for a specific index.
:param field_type: The search type
:return: list of fields
"""
fields = {
"database": "*",
"table": "tables.*",
"column": "tables.columns.*",
"concept": "tables.columns.concept.*",
"unit": "tables.columns.unit.*",
"identifier": "identifiers.*",
"view": "views.*",
"user": "creator.*",
}
if field_type not in fields.keys():
raise NotFoundError(f"Failed to find field type: {field_type}")
logging.debug(f'requesting field(s) {fields[field_type]} for filter: {field_type}')
fields = self._instance().indices.get_field_mapping(fields[field_type])
fields_list = []
fd = flatten_dict(fields)
for key in fd.keys():
if not key.startswith('database'):
continue
entry = {}
if key.split(".")[-1] == "type":
entry["attr_name"] = key_to_attr_name(key)
entry["attr_friendly_name"] = attr_name_to_attr_friendly_name(entry["attr_name"])
entry["type"] = fd[key]
fields_list.append(entry)
return fields_list
def fuzzy_search(self, search_term=None):
logging.info(f"Performing fuzzy search")
fuzzy_body = {
"query": {
"multi_match": {
"query": search_term,
"fuzziness": "AUTO",
"fuzzy_transpositions": True,
"minimum_should_match": 3
}
}
}
logging.debug(f'search body: {fuzzy_body}')
response = self._instance().search(
index="database",
body=fuzzy_body
)
logging.info(f"Found {len(response['hits']['hits'])} result(s)")
return response
def general_search(self, field_type: str = None, field_value_pairs: dict = None):
"""
Main method for searching stuff in the opensearch db
all parameters are optional
:param field_type: The index to be searched. Optional.
:param field_value_pairs: The key-value pair of properties that need to match. Optional.
:return: The object of results and HTTP status code. e.g. { "hits": { "hits": [] } }, 200
"""
musts = []
if field_value_pairs is not None and len(field_value_pairs) > 0:
logging.debug(f'field_value_pairs present: {field_value_pairs}')
for key, value in field_value_pairs.items():
if field_value_pairs[key] == None:
logging.debug(f"skip empty key: {key}")
continue
logging.debug(f"processing key: {key}")
if '.' in key:
logging.debug(f'key {key} is nested: use nested query')
musts.append({
"match": {
key: value
}
})
else:
logging.debug(f'key {key} is flat: use bool query')
musts.append({
"match": {
key: {"query": value, "minimum_should_match": "90%"}
}
})
body = {
"query": {"bool": {"must": musts}}
}
logging.debug(f'search in index database for type: {field_type}')
logging.debug(f'search body: {dumps(body)}')
response = self._instance().search(
index="database",
body=dumps(body)
)
results = [hit["_source"] for hit in response["hits"]["hits"]]
return results
def unit_independent_search(self, t1: float, t2: float, field_value_pairs):
"""
Main method for searching stuff in the opensearch db
:param t1: start value
:param t2: end value
:param field_value_pairs: the key-value pairs
:return:
"""
logging.info(f"Performing unit-independent search")
searches = []
body = {
"size": 0,
"aggs": {
"units": {
"terms": {"field": "unit.uri", "size": 500}
}
}
}
response = self._instance().search(
index="database",
body=dumps(body)
)
unit_uris = [hit["key"] for hit in response["aggregations"]["units"]["buckets"]]
logging.debug(f"found {len(unit_uris)} unit(s) in column index")
if len(unit_uris) == 0:
raise NotFoundError("Failed to search: no unit assigned")
base_unit = unit_uri_to_unit(field_value_pairs["unit.uri"])
for unit_uri in unit_uris:
gte = t1
lte = t2
if unit_uri != field_value_pairs["unit.uri"]:
target_unit = unit_uri_to_unit(unit_uri)
if not Unit.can_convert(base_unit, target_unit):
logging.error(f"Cannot convert unit {field_value_pairs['unit.uri']} to target unit {unit_uri}")
continue
gte = om(t1, base_unit).convert(target_unit)
lte = om(t2, base_unit).convert(target_unit)
logging.debug(
f"converted original range [{t1},{t2}] for base unit {base_unit} to mapped range [{gte},{lte}] for target unit={target_unit}")
searches.append({'index': 'column'})
searches.append({
"query": {
"bool": {
"must": [
{
"match": {
"concept.uri": {
"query": field_value_pairs["concept.uri"]
}
}
},
{
"range": {
"val_min": {
"gte": gte
}
}
},
{
"range": {
"val_max": {
"lte": lte
}
}
},
{
"match": {
"unit.uri": {
"query": unit_uri
}
}
}
]
}
}
})
logging.debug('searches: %s', searches)
body = ''
for search in searches:
body += '%s \n' % dumps(search)
responses = self._instance().msearch(
body=dumps(body)
)
response = {
"hits": {
"hits": flatten([hits["hits"]["hits"] for hits in responses["responses"]])
},
"took": responses["took"]
}
return response
def key_to_attr_name(key: str) -> str:
"""
Maps an attribute key to a machine-readable representation
:param key: The attribute key
:return: The machine-readable representation of the attribute key
"""
parts = []
previous = None
for part in key.split(".")[1:-1]: # remove the first and last sub-item database.xxx.yyy.zzz.type -> xxx.yyy.zzz
if part == "mappings" or part == "mapping": # remove the mapping sub-item(s)
continue
if part == previous: # remove redundant sub-item(s)
continue
previous = part
parts.append(part)
return ".".join(parts)
def attr_name_to_attr_friendly_name(key: str) -> str:
"""
Maps an attribute key to a human-readable representation
:param key: The attribute key
:return: The human-readable representation of the attribute key
"""
with open('friendly_names_overrides.json') as json_data:
d = load(json_data)
for json_key in d.keys():
if json_key == key:
logging.debug(f"friendly name exists for key {json_key}")
return d[json_key]
return ''.join(key.replace('_', ' ').title().split('.')[-1:])
def flatten_dict(
d: MutableMapping, parent_key: str = "", sep: str = "."
) -> MutableMapping:
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, MutableMapping):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def flatten(mylist):
return [item for sublist in mylist for item in sublist]
def unit_uri_to_unit(uri):
base_identifier = uri[len(OM_IDS.NAMESPACE):].replace("-", "")
return getattr(OM, base_identifier)
import rdflib
from rdflib import URIRef
from omlib.dimension import Dimension
from omlib.scale import Scale
from omlib.unit import Prefix, Unit
class OM_IDS:
NAMESPACE = 'http://www.ontology-of-units-of-measure.org/resource/om-2/'
class SI:
SYSTEM_OF_UNITS = str(OM_IDS.NAMESPACE + 'InternationalSystemOfUnits') # WARNING If you change "SI" also change in constants.py
# SI Prefixes
YOTTA = Prefix('yotta', 'Y', 1e24, OM_IDS.NAMESPACE + 'yotta')
ZETTA = Prefix('zetta', 'Z', 1e21, OM_IDS.NAMESPACE + 'zetta')
EXA = Prefix('exa', 'E', 1e18, OM_IDS.NAMESPACE + 'exa')
PETA = Prefix('peta', 'P', 1e15, OM_IDS.NAMESPACE + 'peta')
TERA = Prefix('tera', 'T', 1e12, OM_IDS.NAMESPACE + 'tera')
GIGA = Prefix('giga', 'G', 1e9, OM_IDS.NAMESPACE + 'giga')
MEGA = Prefix('mega', 'M', 1e6, OM_IDS.NAMESPACE + 'mega')
KILO = Prefix('kilo', 'k', 1e3, OM_IDS.NAMESPACE + 'kilo')
HECTO = Prefix('hecto', 'h', 1e2, OM_IDS.NAMESPACE + 'hecto')
DECA = Prefix('deca', 'da', 1e1, OM_IDS.NAMESPACE + 'deca')
DECI = Prefix('deci', 'd', 1e-1, OM_IDS.NAMESPACE + 'deci')
CENTI = Prefix('centi', 'c', 1e-2, OM_IDS.NAMESPACE + 'centi')
MILLI = Prefix('milli', 'm', 1e-3, OM_IDS.NAMESPACE + 'milli')
MICRO = Prefix('micro', 'μ', 1e-6, OM_IDS.NAMESPACE + 'micro')
NANO = Prefix('nano', 'n', 1e-9, OM_IDS.NAMESPACE + 'nano')
PICO = Prefix('pico', 'p', 1e-12, OM_IDS.NAMESPACE + 'pico')
FEMTO = Prefix('femto', 'f', 1e-15, OM_IDS.NAMESPACE + 'femto')
ATTO = Prefix('atto', 'a', 1e-18, OM_IDS.NAMESPACE + 'atto')
ZEPTO = Prefix('zepto', 'z', 1e-21, OM_IDS.NAMESPACE + 'zepto')
YOCTO = Prefix('yocto', 'y', 1e-24, OM_IDS.NAMESPACE + 'yocto')
# SI Base Units
SECOND = Unit.get_singular_unit('second', 's', Dimension(1, 0, 0, 0, 0, 0, 0),
identifier=OM_IDS.NAMESPACE + 'second-Time',
system_of_units=SYSTEM_OF_UNITS, is_base_unit=True)
METRE = Unit.get_singular_unit('metre', 'm', Dimension(0, 1, 0, 0, 0, 0, 0), identifier=OM_IDS.NAMESPACE + 'metre',
system_of_units=SYSTEM_OF_UNITS, is_base_unit=True)
GRAM = Unit.get_singular_unit('gram', 'g', Dimension(0, 0, 1, 0, 0, 0, 0), identifier=OM_IDS.NAMESPACE + 'gram')
KILOGRAM = Unit.get_prefixed_unit(KILO, GRAM, identifier=OM_IDS.NAMESPACE + 'kilogram',
system_of_units=SYSTEM_OF_UNITS,
is_base_unit=True)
AMPERE = Unit.get_singular_unit('ampere', 'A', Dimension(0, 0, 0, 1, 0, 0, 0),
identifier=OM_IDS.NAMESPACE + 'ampere',
system_of_units=SYSTEM_OF_UNITS, is_base_unit=True)
KELVIN = Unit.get_singular_unit('kelvin', 'K', Dimension(0, 0, 0, 0, 1, 0, 0),
identifier=OM_IDS.NAMESPACE + 'kelvin',
system_of_units=SYSTEM_OF_UNITS, is_base_unit=True)
MOLE = Unit.get_singular_unit('mole', 'mol', Dimension(0, 0, 0, 0, 0, 1, 0),
identifier=OM_IDS.NAMESPACE + 'mole',
system_of_units=SYSTEM_OF_UNITS, is_base_unit=True)
CANDELA = Unit.get_singular_unit('candela', 'cd', Dimension(0, 0, 0, 0, 0, 0, 1),
identifier=OM_IDS.NAMESPACE + 'candela', system_of_units=SYSTEM_OF_UNITS,
is_base_unit=True)
class IEC:
KIBI = Prefix('kibi', 'Ki', pow(2, 10), OM_IDS.NAMESPACE + 'kibi')
MEBI = Prefix('mebi', 'Mi', pow(2, 20), OM_IDS.NAMESPACE + 'mebi')
GIBI = Prefix('gibi', 'Gi', pow(2, 30), OM_IDS.NAMESPACE + 'gibi')
TEBI = Prefix('tebi', 'Ti', pow(2, 40), OM_IDS.NAMESPACE + 'tebi')
PEBI = Prefix('pebi', 'Pi', pow(2, 50), OM_IDS.NAMESPACE + 'pebi')
EXBI = Prefix('exbi', 'Ei', pow(2, 60), OM_IDS.NAMESPACE + 'exbi')
ZEBI = Prefix('zebi', 'Zi', pow(2, 70), OM_IDS.NAMESPACE + 'zebi')
YOBI = Prefix('yobi', 'Yi', pow(2, 80), OM_IDS.NAMESPACE + 'yobi')
class JEDEC:
KILO = Prefix('kilo', 'k', pow(2, 10), OM_IDS.NAMESPACE + 'jedec-kilo')
MEGA = Prefix('mega', 'M', pow(2, 20), OM_IDS.NAMESPACE + 'jedec-mega')
GIGA = Prefix('giga', 'G', pow(2, 30), OM_IDS.NAMESPACE + 'jedec-giga')
from omlib.exceptions.dimensionexception import DimensionalException
class Dimension:
def __init__(self, T=0, L=0, M=0, I=0, Theta=0, N=0, J=0):
self.T = T
self.L = L
self.M = M
self.I = I
self.Theta = Theta
self.N = N
self.J = J
def time_dimension_exponent(self):
return self.T
def length_dimension_exponent(self):
return self.L
def mass_dimension_exponent(self):
return self.M
def electric_current_dimension_exponent(self):
return self.I
def thermodynamic_temperature_dimension_exponent(self):
return self.Theta
def amount_of_substance_dimension_exponent(self):
return self.N
def luminous_intensity_dimension_exponent(self):
return self.J
def __eq__(self, other):
return (self.T == other.T and self.L == other.L and self.M == other.M and self.I == other.I and
self.Theta == other.Theta and self.N == other.N and self.J == other.J)
def __str__(self):
return f'(T={self.T}, L={self.L}, M={self.M}, I={self.I}, θ={self.Theta}, N={self.N}, J={self.J})'
def __add__(self, other):
if not self == other:
raise DimensionalException("Entities of different dimensions cannot be added together. {} != {}"
.format(self, other))
return self
def __sub__(self, other):
if not self == other:
raise DimensionalException("Entities of different dimensions cannot be subtracted from each other. {} != {}"
.format(self, other))
return self
def __mul__(self, other):
return Dimension(self.T + other.T, self.L + other.L, self.M + other.M, self.I + other.I,
self.Theta + other.Theta, self.N + other.N, self.J + other.J)
def __truediv__(self, other):
return Dimension(self.T - other.T, self.L - other.L, self.M - other.M, self.I - other.I,
self.Theta - other.Theta, self.N - other.N, self.J - other.J)
@staticmethod
def pow(base, exponent):
if isinstance(base, Dimension):
return Dimension(base.T * exponent, base.L * exponent, base.M * exponent, base.I * exponent,
base.Theta * exponent, base.N * exponent, base.J * exponent)
else:
return Dimension()
class DimensionalException(Exception):
def __init__(self, message):
self.message = message
class UnitConversionException(Exception):
def __init__(self, message):
self.message = message
class ScaleConversionException(Exception):
def __init__(self, message):
self.message = message
class UnitIdentityException(Exception):
def __init__(self, message):
self.message = message
class ScaleIdentityException(Exception):
def __init__(self, message):
self.message = message
import math
from omlib.constants import SI
from omlib.exceptions.dimensionexception import DimensionalException
from omlib.scale import Scale
from omlib.thing import Thing
from omlib.unit import Unit, PrefixedUnit, SingularUnit
def om(numerical_value, unit_or_scale, identifier=None):
if isinstance(unit_or_scale, Unit):
return Measure(numerical_value, unit_or_scale, identifier)
if isinstance(unit_or_scale, Scale):
return Point(numerical_value, unit_or_scale, identifier)
return None
class Point(Thing):
@staticmethod
def create_by_converting(point, to_scale):
if not isinstance(point, Point):
raise ValueError("The parameter to the convert method is not of the correct type (Point).")
if not isinstance(to_scale, Scale):
raise ValueError("The parameter to the convert method is not of the correct type (Scale).")
new_point = Point(point.numericalValue, point.scale)
new_point.convert(to_scale)
return new_point
@staticmethod
def create_by_converting_to_ratio_scale(point):
if not isinstance(point, Point):
raise ValueError("The parameter to the convert method is not of the correct type (Point).")
new_point = Point(point.numericalValue, point.scale)
new_point.convert_to_ratio_scale()
return new_point
def __init__(self, numerical_value, scale, identifier=None):
super().__init__(identifier=identifier)
self.numericalValue = numerical_value
self.scale = scale
def convert(self, to_scale):
if not isinstance(to_scale, Scale):
raise ValueError("The parameter to the convert method is not of the correct type (Scale).")
factor = Scale.conversion_factor(self.scale, to_scale)
off_set = Scale.conversion_off_set(self.scale, to_scale)
self.numericalValue = self.numericalValue * factor + off_set
self.scale = to_scale
def convert_to_ratio_scale(self):
base = self.scale.base_ratio_scale()
factor = Scale.conversion_factor(self.scale, base[0])
off_set = Scale.conversion_off_set(self.scale, base[0])
self.numericalValue = self.numericalValue * factor + off_set
self.scale = base[0]
def __str__(self):
return f'{self.numericalValue} {self.scale.unit.symbol()}'
def __new_value_for_comparisson(self, other):
if isinstance(other, Point):
factor = Scale.conversion_factor(self.scale, other.scale)
off_set = Scale.conversion_off_set(self.scale, other.scale)
new_value = self.numericalValue * factor + off_set
return new_value
return None
def __eq__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value == other.numericalValue
return False
def __ne__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value != other.numericalValue
return False
def __lt__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value < other.numericalValue
return False
def __le__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value <= other.numericalValue
return False
def __gt__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value > other.numericalValue
return False
def __ge__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value >= other.numericalValue
return False
def __add__(self, other):
if not isinstance(other, Measure):
raise ValueError('The value to be added is not a measure and only measures can be added to a point.')
if not self.scale.dimensions == other.unit.dimensions:
raise DimensionalException("Measures and Points with units of different dimensions cannot be added "
"together. {} != {}"
.format(self.scale.unit, other.unit))
new_measure = Measure.create_by_converting(other, self.scale.unit)
return_point = Point(self.numericalValue + new_measure.numericalValue, self.scale)
return return_point
def __sub__(self, other):
if not isinstance(other, Measure) and not isinstance(other, Point):
raise ValueError('The value to be subtracted is not a point or a measure and only measures or points '
'can be subtracted from a point.')
if isinstance(other, Measure):
if not self.scale.dimensions == other.unit.dimensions:
raise DimensionalException("Measures and Points with units of different dimensions cannot be "
"subtracted from each other. {} != {}".format(self.scale.unit, other.unit))
new_measure = Measure.create_by_converting(other, self.scale.unit)
return_point = Point(self.numericalValue - new_measure.numericalValue, self.scale)
return return_point
if isinstance(other, Point):
if not self.scale.dimensions == other.scale.dimensions:
raise DimensionalException("Measures and Points with units of different dimensions cannot be "
"subtracted from each other. {} != {}".format(self.scale.unit, other.unit))
new_point = Point.create_by_converting(other, self.scale)
return_measure = Measure(self.numericalValue - new_point.numericalValue, self.scale.unit)
return return_measure
def __mul__(self, other):
as_measure = Measure(self.numericalValue, self.scale.unit)
return as_measure * other
def __truediv__(self, other):
as_measure = Measure(self.numericalValue, self.scale.unit)
return as_measure / other
class Measure(Thing):
@staticmethod
def create_by_converting(measure, to_unit):
if not isinstance(measure, Measure):
raise ValueError("The parameter to the convert method is not of the correct type (Measure).")
if not isinstance(to_unit, Unit):
raise ValueError("The parameter to the convert method is not of the correct type (Unit).")
new_measure = Measure(measure.numericalValue, measure.unit)
new_measure.convert(to_unit)
return new_measure
@staticmethod
def create_by_converting_to_base_units(measure, in_system_of_units=SI):
if not isinstance(measure, Measure):
raise ValueError("The parameter to the convert method is not of the correct type (Measure).")
new_measure = Measure(measure.numericalValue, measure.unit)
new_measure.convert_to_base_units(in_system_of_units)
return new_measure
@staticmethod
def create_by_converting_to_convenient_units(measure, in_system_of_units=None, use_prefixes=True):
if not isinstance(measure, Measure):
raise ValueError("The parameter to the convert method is not of the correct type (Measure).")
new_measure = Measure(measure.numericalValue, measure.unit)
new_measure.convert_to_convenient_units(in_system_of_units, use_prefixes=use_prefixes)
return new_measure
def __init__(self, numerical_value, unit, identifier=None):
super().__init__(identifier=identifier)
self.numericalValue = numerical_value
self.unit = unit
def convert(self, to_unit):
if not isinstance(to_unit, Unit):
raise ValueError("The parameter to the convert method is not of the correct type (Unit).")
factor = Unit.conversion_factor(self.unit, to_unit)
self.numericalValue = self.numericalValue * factor
self.unit = to_unit
def convert_to_base_units(self, in_system_of_units=None):
if in_system_of_units is None:
in_system_of_units = self.unit.systemOfUnits
if in_system_of_units is None:
in_system_of_units = SI.SYSTEM_OF_UNITS
base = Unit.get_base_units(self.unit, in_system_of_units)
self.convert(base)
def convert_to_convenient_units(self, system_of_units=None, use_prefixes=True):
if system_of_units is None:
system_of_units = self.unit.systemOfUnits
test_units = self.unit.with_dimensions(self.unit.dimensions, in_system_of_units=system_of_units)
selected_unit = self.unit
log_selected_value = Measure.__get_log_value(self.numericalValue, selected_unit)
for test_unit in test_units:
if use_prefixes or not isinstance(test_unit, PrefixedUnit):
factor = Unit.conversion_factor(self.unit, test_unit)
value = abs(self.numericalValue * factor)
log_value = Measure.__get_log_value(value, test_unit)
if log_value < log_selected_value:
log_selected_value = log_value
selected_unit = test_unit
if abs(log_value - log_selected_value) < 0.1 and isinstance(test_unit, SingularUnit):
log_selected_value = log_value
selected_unit = test_unit
if abs(log_value - log_selected_value) < 0.1 and isinstance(test_unit, PrefixedUnit)\
and not isinstance(selected_unit, SingularUnit):
log_selected_value = log_value
selected_unit = test_unit
self.convert(selected_unit)
@staticmethod
def __get_log_value(value, unit):
if value == 0.0:
return 0
log_value = abs(math.log10(value))
if value < 1:
log_value = log_value + 2
if not isinstance(unit, SingularUnit):
log_value = log_value + 1
return log_value
def __str__(self):
return f'{self.numericalValue} {self.unit.symbol()}'
def __new_value_for_comparisson(self, other):
if isinstance(other, Measure):
factor = Unit.conversion_factor(self.unit, other.unit)
new_value = self.numericalValue * factor
return new_value
return None
def __eq__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value == other.numericalValue
return False
def __ne__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value != other.numericalValue
return False
def __lt__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value < other.numericalValue
return False
def __le__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value <= other.numericalValue
return False
def __gt__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value > other.numericalValue
return False
def __ge__(self, other):
new_value = self.__new_value_for_comparisson(other)
if new_value is not None:
return new_value >= other.numericalValue
return False
def __add__(self, other):
if not isinstance(other, Measure):
raise ValueError('The value to be added is not a measure.')
if not self.unit.dimensions == other.unit.dimensions:
raise DimensionalException("Measures with units of different dimensions cannot be added together. {} != {}"
.format(self.unit, other.unit))
new_measure = Measure.create_by_converting(other, self.unit)
return_measure = Measure(self.numericalValue + new_measure.numericalValue, self.unit)
return return_measure
def __sub__(self, other):
if not isinstance(other, Measure):
raise ValueError('The value to be subtracted is not a measure.')
if not self.unit.dimensions == other.unit.dimensions:
raise DimensionalException("Measures with units of different dimensions cannot be subtracted from each "
"other. {} != {}".format(self.unit, other.unit))
new_measure = Measure.create_by_converting(other, self.unit)
return_measure = Measure(self.numericalValue - new_measure.numericalValue, self.unit)
return return_measure
def __mul__(self, other):
if isinstance(other, float) or isinstance(other, int):
new_measure = Measure(self.numericalValue * other, self.unit)
return new_measure
if not isinstance(other, Measure) and not isinstance(other, Point):
raise ValueError('The multiplicand is not a measure, a point, a float, or an int.')
other_unit = None
if isinstance(other, Measure):
other_unit = other.unit
if isinstance(other, Point):
other_unit = other.scale.unit
new_value = self.numericalValue * other.numericalValue
new_unit = Unit.get_unit_multiplication(self.unit, other_unit)
new_measure = Measure(new_value, new_unit)
simple_unit = Unit.simplified_compound_unit(new_unit)
new_measure.convert(simple_unit)
return new_measure
def __truediv__(self, other):
if isinstance(other, float) or isinstance(other, int):
new_measure = Measure(self.numericalValue / other, self.unit)
return new_measure
if not isinstance(other, Measure) and not isinstance(other, Point):
raise ValueError('The denominator is not a measure, a point, a float, or an int.')
other_unit = None
if isinstance(other, Measure):
other_unit = other.unit
if isinstance(other, Point):
other_unit = other.scale.unit
new_value = self.numericalValue / other.numericalValue
new_unit = Unit.get_unit_division(self.unit, other_unit)
new_measure = Measure(new_value, new_unit)
simple_unit = Unit.simplified_compound_unit(new_unit)
new_measure.convert(simple_unit)
return new_measure
Source diff could not be displayed: it is too large. Options to address this: view the blob.
This diff is collapsed.
from rdflib import URIRef
from omlib.dimension import Dimension
from omlib.exceptions.dimensionexception import DimensionalException
from omlib.exceptions.unitconversionexception import ScaleConversionException
from omlib.exceptions.unitidentityexception import ScaleIdentityException
from omlib.thing import Thing
from omlib.unit import Unit
class Scale(Thing):
_scales = []
@staticmethod
def clear_cache():
Scale._scales.clear()
@staticmethod
def _add_when_not_in_cache(scale):
for test_scale in Scale._scales:
if test_scale == scale:
if test_scale.identifier == scale.identifier or not isinstance(scale.identifier, URIRef) \
or not isinstance(test_scale.identifier, URIRef):
if isinstance(test_scale, RatioScale) and isinstance(scale, RatioScale):
if test_scale.unit == scale.unit:
if isinstance(scale.identifier, URIRef) and not isinstance(test_scale.identifier, URIRef):
test_scale.identifier = scale.identifier
return test_scale
if isinstance(test_scale, IntervalScale) and isinstance(scale, IntervalScale):
if test_scale.unit == scale.unit and test_scale.baseScale == scale.baseScale \
and test_scale.offSet == scale.offSet:
if isinstance(scale.identifier, URIRef) and not isinstance(test_scale.identifier, URIRef):
test_scale.identifier = scale.identifier
return test_scale
if test_scale.identifier == scale.identifier:
raise ScaleIdentityException("The scale with identifier {} already exists but with different"
"properties. \n{} \nis not the same as \n{}".format(scale.identifier,
scale, test_scale))
Scale._scales.append(scale)
return scale
@staticmethod
def with_label(label):
return_scales = []
for test_scale in Scale._scales:
for test_label in test_scale.allLabels:
if test_label == label:
return_scales.append(test_label)
return return_scales
@staticmethod
def with_identifier(identifier):
for test_scale in Scale._scales:
if test_scale.identifier == identifier:
return test_scale
return None
@staticmethod
def with_dimensions(dimensions, in_system_of_units=None):
return_scales = []
for test_scale in Scale._scales:
if test_scale.dimensions == dimensions and test_scale.systemOfUnits == in_system_of_units:
return_scales.append(test_scale)
return return_scales
@staticmethod
def get_ratio_scale(unit, label=None, identifier=None):
scale = RatioScale(unit, label, identifier)
scale = Scale._add_when_not_in_cache(scale)
return scale
@staticmethod
def get_interval_scale(base_scale, unit, off_set, label=None, identifier=None):
scale = IntervalScale(base_scale, unit, off_set, label, identifier)
scale = Scale._add_when_not_in_cache(scale)
return scale
@staticmethod
def conversion_factor(from_scale, to_scale):
if (isinstance(from_scale, RatioScale) or isinstance(from_scale, IntervalScale)) and \
(isinstance(to_scale, RatioScale) or isinstance(to_scale, IntervalScale)):
can_convert = Unit.can_convert(from_scale.unit, to_scale.unit)
if not can_convert:
raise DimensionalException("A scale with dimensions {} cannot be converted to a scale with "
"dimensions {}."
.format(from_scale.dimensions, to_scale.dimensions))
factor = Unit.conversion_factor(from_scale.unit, to_scale.unit)
return factor
else:
raise ScaleConversionException("Cannot convert from {} to {} as they are not both cardinal scales."
.format(from_scale, to_scale))
@staticmethod
def conversion_off_set(from_scale, to_scale):
if (isinstance(from_scale, RatioScale) or isinstance(from_scale, IntervalScale)) and \
(isinstance(to_scale, RatioScale) or isinstance(to_scale, IntervalScale)):
from_ratio_scale = from_scale.base_ratio_scale()
to_ratio_scale = to_scale.base_ratio_scale()
from_ratio_factor = Unit.conversion_factor(from_scale.unit,from_ratio_scale[0].unit)
to_ratio_factor = Unit.conversion_factor(to_scale.unit,to_ratio_scale[0].unit)
if from_ratio_scale[0] == to_ratio_scale[0]:
off_set = (to_ratio_scale[1] * to_ratio_factor - from_ratio_scale[1]*from_ratio_factor) \
/ to_ratio_factor
return off_set
else:
raise ScaleConversionException("Cannot convert from {} to {} as they do not use the same base ratio "
"scale, i.e. they do not have the same known zero point."
.format(from_scale, to_scale))
else:
raise ScaleConversionException("Cannot convert from {} to {} as they are not both cardinal scales."
.format(from_scale, to_scale))
def __init__(self, label=None, identifier=None, dimensions=Dimension(), system_of_units=None):
super().__init__(label, identifier)
self.fixedPoints = []
self.dimensions = dimensions
self.systemOfUnits = system_of_units
def add_fixed_point(self, fixed_point):
self.fixedPoints.append(fixed_point)
def __str__(self):
return f'{self.label()}\t<{self.identifier}> dim: {self.dimensions}'
def __eq__(self, other):
if isinstance(other, Scale):
return str(self.identifier) == str(other.identifier)
return False
def __ne__(self, other):
return not (self == other)
class RatioScale(Scale):
def __init__(self, unit, label=None, identifier=None, system_of_units=None):
if not isinstance(unit, Unit):
raise ValueError("The unit parameter in RatioScale is required to be of type Unit.")
if system_of_units is None:
system_of_units = unit.systemOfUnits
super().__init__(label, identifier, dimensions=unit.dimensions, system_of_units=system_of_units)
self.unit = unit
def base_ratio_scale(self):
return self, 0.0
def __eq__(self, other):
if isinstance(other, RatioScale):
if str(self.identifier) == str(other.identifier):
return True
if isinstance(self.identifier, URIRef) and isinstance(other.identifier, URIRef):
return False
if self.unit == other.unit:
return True
return False
def __str__(self):
return f'{self.label()}\t<{self.identifier}> unit: {self.unit} dim: {self.dimensions}'
class IntervalScale(Scale):
def __init__(self, base_scale, unit, off_set, label=None, identifier=None, system_of_units=None):
if not isinstance(unit, Unit):
raise ValueError("The unit parameter in IntervalScale is required to be of type Unit.")
if not isinstance(base_scale, Scale):
raise ValueError("The base_scale parameter in IntervalScale is required to be of type Scale.")
if unit.dimensions != base_scale.dimensions:
raise DimensionalException("The dimensions of the base scale are not the same as the dimensions of the"
" unit used. {} is not the same as {}"
.format(base_scale.dimensions, unit.dimensions))
if system_of_units is None:
system_of_units = base_scale.systemOfUnits
if system_of_units is None:
system_of_units = unit.systemOfUnits
super().__init__(label, identifier, dimensions=unit.dimensions, system_of_units=system_of_units)
self.unit = unit
self.baseScale = base_scale
self.offSet = off_set
def base_ratio_scale(self):
base_base = self.baseScale.base_ratio_scale()
conversion_fac = Unit.conversion_factor(base_base[0].unit, self.unit)
return base_base[0], base_base[1] * conversion_fac + self.offSet
def __eq__(self, other):
if isinstance(other, IntervalScale):
if str(self.identifier) == str(other.identifier):
return True
if self.unit == other.unit and self.baseScale == other.baseScale and self.offSet == other.offSet:
return True
return False
def __str__(self):
return f'{self.label()}\t<{self.identifier}> base: {self.baseScale} unit: {self.unit} dim: {self.dimensions}'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment