Skip to content
Snippets Groups Projects
Select Git revision
  • 3f5f57676715157adc4aa8108d7e4f6e625f8ebc
  • consistent_config default protected
2 results

analysis_only.py

Blame
  • app.py 4.65 KiB
    import json
    import os
    import logging
    from typing import List
    
    import opensearchpy.exceptions
    from dbrepo.RestClient import RestClient
    from logging.config import dictConfig
    
    from dbrepo.api.dto import Database
    from opensearchpy import OpenSearch
    
    level = os.getenv("LOG_LEVEL", "DEBUG").upper()
    logging.basicConfig(level=level)
    
    # logging configuration
    dictConfig({
        'version': 1,
        'formatters': {
            'default': {
                'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s',
            },
            'simple': {
                'format': '[%(asctime)s] %(levelname)s: %(message)s',
            },
        },
        'handlers': {'wsgi': {
            'class': 'logging.StreamHandler',
            'stream': 'ext://flask.logging.wsgi_errors_stream',
            'formatter': 'simple'  # default
        }},
        'root': {
            'level': 'DEBUG',
            'handlers': ['wsgi']
        }
    })
    
    
    class App:
        """
        The client to communicate with the OpenSearch database.
        """
        metadata_service_endpoint: str = None
        search_host: str = None
        search_port: int = None
        search_username: str = None
        search_password: str = None
        search_instance: OpenSearch = None
    
        def __init__(self):
            self.metadata_service_endpoint = os.getenv("METADATA_SERVICE_ENDPOINT")
            self.search_host = os.getenv("OPENSEARCH_HOST")
            self.search_port = int(os.getenv("OPENSEARCH_PORT"))
            self.search_username = os.getenv("OPENSEARCH_USERNAME")
            self.search_password = os.getenv("OPENSEARCH_PASSWORD")
    
        def _instance(self) -> OpenSearch:
            """
            Wrapper method to get the instance singleton.
    
            @returns: The opensearch instance singleton, if successful.
            """
            if self.search_instance is None:
                self.search_instance = OpenSearch(hosts=[{"host": self.search_host, "port": self.search_port}],
                                                  http_compress=True,
                                                  http_auth=(self.search_username, self.search_password))
                logging.debug(f"create instance {self.search_host}:{self.search_port}")
            return self.search_instance
    
        def index_exists(self):
            return self._instance().indices.exists(index="database")
    
        def database_exists(self, database_id: int):
            try:
                self._instance().get(index="database", id=database_id)
                return True
            except opensearchpy.exceptions.NotFoundError:
                return False
    
        def index_update(self, is_created: bool) -> bool:
            """
    
            :param is_created:
            :return: True if the index was updated
            """
            if is_created:
                logging.debug(f"index 'database' does not exist, creating...")
                with open('./database.json', 'r') as f:
                    self._instance().indices.create(index="database", body=json.load(f))
                logging.info(f"Created index 'database'")
                return True
            mapping = dict(self._instance().indices.get_mapping(index="database"))
            identifier_props = mapping["database"]["mappings"]["properties"]["identifiers"]["properties"]
            if "status" in identifier_props:
                logging.debug(f"found mapping database.identifiers.status: detected current mapping")
                return False
            logging.debug(f"index 'database' exists, updating mapping...")
            with open('./database.json', 'r') as f:
                self._instance().indices.put_mapping(index="database", body=json.load(f))
            logging.info(f"Updated index 'database'")
            return True
    
        def fetch_databases(self) -> List[Database]:
            client = RestClient(endpoint=self.metadata_service_endpoint)
            databases = []
            for database in client.get_databases():
                databases.append(client.get_database(database_id=database.id))
            logging.debug(f"fetched {len(databases)} database(s)")
            return databases
    
        def save_databases(self, databases: List[Database]):
            logging.debug(
                f"save {len(databases)} database(s)")
            for doc in databases:
                doc: Database = doc
                try:
                    self._instance().delete(index="database", id=doc.id)
                    logging.debug(f"deleted database with id {doc.id}")
                except opensearchpy.NotFoundError:
                    logging.warning(f"Database with id {doc.id} does not exist, skip.")
                self._instance().create(index="database", id=doc.id, body=doc.model_dump())
                logging.debug(f"created database with id {doc.id}")
    
    
    if __name__ == "__main__":
        app = App()
        create = not app.index_exists()
        update = app.index_update(is_created=create)
        app.save_databases(databases=app.fetch_databases())
        logging.info("Finished. Exiting.")