Select Git revision
analysis_only.py
app.py 4.65 KiB
import json
import os
import logging
from typing import List
import opensearchpy.exceptions
from dbrepo.RestClient import RestClient
from logging.config import dictConfig
from dbrepo.api.dto import Database
from opensearchpy import OpenSearch
level = os.getenv("LOG_LEVEL", "DEBUG").upper()
logging.basicConfig(level=level)
# logging configuration
dictConfig({
'version': 1,
'formatters': {
'default': {
'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s',
},
'simple': {
'format': '[%(asctime)s] %(levelname)s: %(message)s',
},
},
'handlers': {'wsgi': {
'class': 'logging.StreamHandler',
'stream': 'ext://flask.logging.wsgi_errors_stream',
'formatter': 'simple' # default
}},
'root': {
'level': 'DEBUG',
'handlers': ['wsgi']
}
})
class App:
"""
The client to communicate with the OpenSearch database.
"""
metadata_service_endpoint: str = None
search_host: str = None
search_port: int = None
search_username: str = None
search_password: str = None
search_instance: OpenSearch = None
def __init__(self):
self.metadata_service_endpoint = os.getenv("METADATA_SERVICE_ENDPOINT")
self.search_host = os.getenv("OPENSEARCH_HOST")
self.search_port = int(os.getenv("OPENSEARCH_PORT"))
self.search_username = os.getenv("OPENSEARCH_USERNAME")
self.search_password = os.getenv("OPENSEARCH_PASSWORD")
def _instance(self) -> OpenSearch:
"""
Wrapper method to get the instance singleton.
@returns: The opensearch instance singleton, if successful.
"""
if self.search_instance is None:
self.search_instance = OpenSearch(hosts=[{"host": self.search_host, "port": self.search_port}],
http_compress=True,
http_auth=(self.search_username, self.search_password))
logging.debug(f"create instance {self.search_host}:{self.search_port}")
return self.search_instance
def index_exists(self):
return self._instance().indices.exists(index="database")
def database_exists(self, database_id: int):
try:
self._instance().get(index="database", id=database_id)
return True
except opensearchpy.exceptions.NotFoundError:
return False
def index_update(self, is_created: bool) -> bool:
"""
:param is_created:
:return: True if the index was updated
"""
if is_created:
logging.debug(f"index 'database' does not exist, creating...")
with open('./database.json', 'r') as f:
self._instance().indices.create(index="database", body=json.load(f))
logging.info(f"Created index 'database'")
return True
mapping = dict(self._instance().indices.get_mapping(index="database"))
identifier_props = mapping["database"]["mappings"]["properties"]["identifiers"]["properties"]
if "status" in identifier_props:
logging.debug(f"found mapping database.identifiers.status: detected current mapping")
return False
logging.debug(f"index 'database' exists, updating mapping...")
with open('./database.json', 'r') as f:
self._instance().indices.put_mapping(index="database", body=json.load(f))
logging.info(f"Updated index 'database'")
return True
def fetch_databases(self) -> List[Database]:
client = RestClient(endpoint=self.metadata_service_endpoint)
databases = []
for database in client.get_databases():
databases.append(client.get_database(database_id=database.id))
logging.debug(f"fetched {len(databases)} database(s)")
return databases
def save_databases(self, databases: List[Database]):
logging.debug(
f"save {len(databases)} database(s)")
for doc in databases:
doc: Database = doc
try:
self._instance().delete(index="database", id=doc.id)
logging.debug(f"deleted database with id {doc.id}")
except opensearchpy.NotFoundError:
logging.warning(f"Database with id {doc.id} does not exist, skip.")
self._instance().create(index="database", id=doc.id, body=doc.model_dump())
logging.debug(f"created database with id {doc.id}")
if __name__ == "__main__":
app = App()
create = not app.index_exists()
update = app.index_update(is_created=create)
app.save_databases(databases=app.fetch_databases())
logging.info("Finished. Exiting.")