Skip to content
Snippets Groups Projects
Select Git revision
  • 3ca954bc21392120f3fa5d50d185b15da6571bb3
  • master default protected
  • dev protected
  • 551-init-broker-service-permissions
  • release-1.10 protected
  • 549-test-oai-pmh
  • 545-saving-multiple-times-breaks-pid-metadata
  • release-1.9 protected
  • 499-standalone-compute-service-2
  • 539-load-tests
  • hotfix/helm-chart
  • luca_ba_new_interface
  • 534-bug-when-adding-access-to-user-that-is-not-registered-at-dashboard-service
  • release-1.8 protected
  • 533-integrate-semantic-recommendation
  • feature/openshift
  • 518-spark-doesn-t-map-the-headers-correct
  • 485-fixity-checks
  • 530-various-schema-problems-with-subsets
  • release-1.7 protected
  • fix/auth-service
  • v1.10.1 protected
  • v1.10.0-rc13 protected
  • v1.10.0-rc12 protected
  • v1.10.0-rc11 protected
  • v1.10.0-rc10 protected
  • v1.10.0-rc9 protected
  • v1.10.0-rc8 protected
  • v1.10.0-rc7 protected
  • v1.10.0-rc6 protected
  • v1.10.0-rc5 protected
  • v1.10.0-rc4 protected
  • v1.10.0-rc3 protected
  • v1.10.0-rc2 protected
  • v1.10.0rc1 protected
  • v1.10.0rc0 protected
  • v1.10.0 protected
  • v1.9.3 protected
  • v1.9.2 protected
  • v1.9.2-rc0 protected
  • v1.9.1 protected
41 results

app.py

Blame
  • Martin Weise's avatar
    3ca954bc
    History
    app.py 4.95 KiB
    import json
    import os
    import logging
    from typing import List
    
    import opensearchpy.exceptions
    from dbrepo.RestClient import RestClient
    from logging.config import dictConfig
    from pathlib import Path
    
    from dbrepo.api.dto import Database
    from opensearchpy import OpenSearch
    
    level = os.getenv("LOG_LEVEL", "DEBUG").upper()
    logging.basicConfig(level=level)
    
    # logging configuration
    dictConfig({
        'version': 1,
        'formatters': {
            'default': {
                'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s',
            },
            'simple': {
                'format': '[%(asctime)s] %(levelname)s: %(message)s',
            },
        },
        'handlers': {'wsgi': {
            'class': 'logging.StreamHandler',
            'stream': 'ext://flask.logging.wsgi_errors_stream',
            'formatter': 'simple'  # default
        }},
        'root': {
            'level': 'DEBUG',
            'handlers': ['wsgi']
        }
    })
    
    
    class App:
        """
        The client to communicate with the OpenSearch database.
        """
        metadata_service_endpoint: str = None
        search_host: str = None
        search_port: int = None
        search_username: str = None
        search_password: str = None
        search_instance: OpenSearch = None
    
        def __init__(self):
            self.metadata_service_endpoint = os.getenv("METADATA_SERVICE_ENDPOINT", "http://metadata-service:8080")
            self.search_host = os.getenv("OPENSEARCH_HOST", "search-db")
            self.search_port = int(os.getenv("OPENSEARCH_PORT", "9200"))
            self.search_username = os.getenv("OPENSEARCH_USERNAME", "admin")
            self.search_password = os.getenv("OPENSEARCH_PASSWORD", "admin")
    
        def _instance(self) -> OpenSearch:
            """
            Wrapper method to get the instance singleton.
    
            @returns: The opensearch instance singleton, if successful.
            """
            if self.search_instance is None:
                self.search_instance = OpenSearch(hosts=[{"host": self.search_host, "port": self.search_port}],
                                                  http_compress=True,
                                                  http_auth=(self.search_username, self.search_password))
                logging.debug(f"create instance {self.search_host}:{self.search_port}")
            return self.search_instance
    
        def index_exists(self):
            return self._instance().indices.exists(index="database")
    
        def database_exists(self, database_id: int):
            try:
                self._instance().get(index="database", id=database_id)
                return True
            except opensearchpy.exceptions.NotFoundError:
                return False
    
        def index_update(self, is_created: bool) -> bool:
            """
    
            :param is_created:
            :return: True if the index was updated
            """
            if is_created:
                logging.debug(f"index 'database' does not exist, creating...")
                with open('./database.json', 'r') as f:
                    self._instance().indices.create(index="database", body=json.load(f))
                logging.info(f"Created index 'database'")
                return True
            mapping = dict(self._instance().indices.get_mapping(index="database"))
            identifier_props = mapping["database"]["mappings"]["properties"]["identifiers"]["properties"]
            if "status" in identifier_props:
                logging.debug(f"found mapping database.identifiers.status: detected current mapping")
                return False
            logging.debug(f"index 'database' exists, updating mapping...")
            with open('./database.json', 'r') as f:
                self._instance().indices.put_mapping(index="database", body=json.load(f))
            logging.info(f"Updated index 'database'")
            return True
    
        def fetch_databases(self) -> List[Database]:
            logging.debug(f"fetching database from endpoint: {self.metadata_service_endpoint}")
            client = RestClient(endpoint=self.metadata_service_endpoint)
            databases = []
            for index, database in enumerate(client.get_databases()):
                logging.debug(f"fetching database {index}/{len(databases)} details for database id: {database.id}")
                databases.append(client.get_database(database_id=database.id))
            logging.debug(f"fetched {len(databases)} database(s)")
            return databases
    
        def save_databases(self, databases: List[Database]):
            logging.debug(f"save {len(databases)} database(s)")
            for doc in databases:
                doc: Database = doc
                try:
                    self._instance().delete(index="database", id=doc.id)
                    logging.debug(f"deleted database with id {doc.id}")
                except opensearchpy.NotFoundError:
                    logging.warning(f"Database with id {doc.id} does not exist, skip.")
                self._instance().create(index="database", id=doc.id, body=doc.model_dump())
                logging.debug(f"created database with id {doc.id}")
    
    
    if __name__ == "__main__":
        app = App()
        create = not app.index_exists()
        update = app.index_update(is_created=create)
        app.save_databases(databases=app.fetch_databases())
        logging.info("Finished. Exiting.")