Skip to content
Snippets Groups Projects
Select Git revision
  • 202ccdd6e7acd753041b98b63911e6ec5ed42a4a
  • master default protected
  • replication_test
  • release-1.10 protected
  • dev protected
  • 556-usage-statistics
  • 553-semantic-recommendation-2
  • 553-semantic-recommendation
  • release-1.9 protected
  • 551-init-broker-service-permissions
  • 549-test-oai-pmh
  • 545-saving-multiple-times-breaks-pid-metadata
  • 499-standalone-compute-service-2
  • 539-load-tests
  • hotfix/helm-chart
  • luca_ba_new_interface
  • 534-bug-when-adding-access-to-user-that-is-not-registered-at-dashboard-service
  • release-1.8 protected
  • 533-integrate-semantic-recommendation
  • feature/openshift
  • 518-spark-doesn-t-map-the-headers-correct
  • v1.10.4 protected
  • v1.10.3 protected
  • v1.10.2 protected
  • v1.10.1 protected
  • v1.10.0-rc13 protected
  • v1.10.0-rc12 protected
  • v1.10.0-rc11 protected
  • v1.10.0-rc10 protected
  • v1.10.0-rc9 protected
  • v1.10.0-rc8 protected
  • v1.10.0-rc7 protected
  • v1.10.0-rc6 protected
  • v1.10.0-rc5 protected
  • v1.10.0-rc4 protected
  • v1.10.0-rc3 protected
  • v1.10.0-rc2 protected
  • v1.10.0rc1 protected
  • v1.10.0rc0 protected
  • v1.10.0 protected
  • v1.9.3 protected
41 results

app.py

Blame
  • app.py 17.08 KiB
    import logging
    import math
    import os
    from json import dumps
    from typing import List, Any
    
    import requests
    from clients.keycloak_client import User, KeycloakClient
    from clients.opensearch_client import OpenSearchClient
    from dbrepo.api.dto import Database, ApiError
    from flasgger import LazyJSONEncoder, Swagger, swag_from
    from flask import Flask, request
    from flask_cors import CORS
    from flask_httpauth import HTTPTokenAuth, HTTPBasicAuth, MultiAuth
    from jwt.exceptions import JWTDecodeError
    from opensearchpy import NotFoundError
    from prometheus_flask_exporter import PrometheusMetrics
    from pydantic import ValidationError
    
    logging.addLevelName(level=logging.NOTSET, levelName='TRACE')
    logging.basicConfig(level=logging.DEBUG)
    
    from logging.config import dictConfig
    
    # logging configuration
    dictConfig({
        'version': 1,
        'formatters': {
            'default': {
                'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s',
            },
            'simple': {
                'format': '[%(asctime)s] %(levelname)s: %(message)s',
            },
        },
        'handlers': {'wsgi': {
            'class': 'logging.StreamHandler',
            'stream': 'ext://flask.logging.wsgi_errors_stream',
            'formatter': 'simple'  # default
        }},
        'root': {
            'level': 'DEBUG',
            'handlers': ['wsgi']
        }
    })
    
    # create app object
    app = Flask(__name__)
    
    cors = CORS(app, resources={r"/api/*": {"origins": "*"}})
    
    metrics = PrometheusMetrics(app)
    metrics.info("app_info", "Application info", version="0.0.1")
    app.config["SWAGGER"] = {"openapi": "3.0.1", "title": "Swagger UI", "uiversion": 3}
    
    token_auth = HTTPTokenAuth(scheme='Bearer')
    basic_auth = HTTPBasicAuth()
    auth = MultiAuth(token_auth, basic_auth)
    
    swagger_config = {
        "headers": [],
        "specs": [
            {
                "endpoint": "api-search",
                "route": "/api-search.json",
                "rule_filter": lambda rule: rule.endpoint.startswith('search'),
                "model_filter": lambda tag: True,  # all in
            }
        ],
        "static_url_path": "/flasgger_static",
        "swagger_ui": True,
        "specs_route": "/swagger-ui/",
    }
    
    template = {
        "openapi": "3.0.0",
        "components": {
            "schemas": {
                "IndexDto": {
                    "required": ["results", "type"],
                    "properties": {
                        "results": {
                            "type": "array",
                            "items": {
                                "type": "object",
                            }
                        },
                        "type": {
                            "type": "string",
                            "description": "Same as the requested type",
                            "enum": ["database", "table", "view", "column", "user", "identifier", "concept", "unit"]
                        }
                    }
                },
                "IndexFieldsDto": {
                    "required": ["results"],
                    "type": "object",
                    "properties": {
                        "results": {
                            "type": "array",
                            "items": {
                                "$ref": "#/components/schemas/IndexFieldDto"
                            }
                        }
                    }
                },
                "IndexFieldDto": {
                    "required": ["attr_name", "attr_friendly_name", "type"],
                    "type": "object",
                    "properties": {
                        "attr_name": {
                            "type": "string",
                            "example": "name"
                        },
                        "attr_friendly_name": {
                            "type": "string",
                            "example": "Name"
                        },
                        "type": {
                            "type": "string",
                            "example": "string",
                            "description": "OpenSearch data types."
                        }
                    }
                },
                "SearchResultDto": {
                    "required": ["results"],
                    "type": "object",
                    "properties": {
                        "results": {
                            "type": "array",
                            "items": {
                                "type": "object"
                            }
                        }
                    }
                },
                "SearchRequestDto": {
                    "required": ["search_term", "field_value_pairs"],
                    "type": "object",
                    "properties": {
                        "search_term": {
                            "type": "string"
                        },
                        "field_value_pairs": {
                            "type": "object"
                        }
                    }
                }
            },
            "securitySchemes": {
                "bearerAuth": {
                    "type": "http",
                    "scheme": "bearer",
                    "bearerFormat": "JWT",
                    "in": "header"
                },
                "basicAuth": {
                    "type": "http",
                    "scheme": "basic",
                    "in": "header"
                }
            },
        },
        "info": {
            "title": "Database Repository Search Service API",
            "description": "Service that searches the search database",
            "version": "1.5",
            "contact": {
                "name": "Prof. Andreas Rauber",
                "email": "andreas.rauber@tuwien.ac.at"
            },
            "license": {
                "name": "Apache 2.0",
                "url": "https://www.apache.org/licenses/LICENSE-2.0"
            },
        },
        "externalDocs": {
            "description": "Sourcecode Documentation",
            "url": "https://www.ifs.tuwien.ac.at/infrastructures/dbrepo/1.5/"
        },
        "servers": [
            {
                "url": "http://localhost:4000",
                "description": "Generated server url"
            },
            {
                "url": "https://test.dbrepo.tuwien.ac.at",
                "description": "Sandbox"
            }
        ]
    }
    
    swagger = Swagger(app, config=swagger_config, template=template)
    app.config["METADATA_SERVICE_ENDPOINT"] = os.getenv("METADATA_SERVICE_ENDPOINT", "http://metadata-service:8080")
    app.config["JWT_ALGORITHM"] = "HS256"
    app.config["JWT_PUBKEY"] = '-----BEGIN PUBLIC KEY-----\n' + os.getenv("JWT_PUBKEY",
                                                                          "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqqnHQ2BWWW9vDNLRCcxD++xZg/16oqMo/c1l+lcFEjjAIJjJp/HqrPYU/U9GvquGE6PbVFtTzW1KcKawOW+FJNOA3CGo8Q1TFEfz43B8rZpKsFbJKvQGVv1Z4HaKPvLUm7iMm8Hv91cLduuoWx6Q3DPe2vg13GKKEZe7UFghF+0T9u8EKzA/XqQ0OiICmsmYPbwvf9N3bCKsB/Y10EYmZRb8IhCoV9mmO5TxgWgiuNeCTtNCv2ePYqL/U0WvyGFW0reasIK8eg3KrAUj8DpyOgPOVBn3lBGf+3KFSYi+0bwZbJZWqbC/Xlk20Go1YfeJPRIt7ImxD27R/lNjgDO/MwIDAQAB") + '\n-----END PUBLIC KEY-----'
    app.config["AUTH_SERVICE_ENDPOINT"] = os.getenv("AUTH_SERVICE_ENDPOINT", "http://auth-service:8080/api/auth")
    app.config["AUTH_SERVICE_CLIENT"] = os.getenv("AUTH_SERVICE_CLIENT", "dbrepo-client")
    app.config["AUTH_SERVICE_CLIENT_SECRET"] = os.getenv("AUTH_SERVICE_CLIENT_SECRET", "MUwRc7yfXSJwX8AdRMWaQC3Nep1VjwgG")
    app.config["OPENSEARCH_HOST"] = os.getenv('OPENSEARCH_HOST', 'search-db')
    app.config["OPENSEARCH_PORT"] = os.getenv('OPENSEARCH_PORT', '9200')
    app.config["OPENSEARCH_USERNAME"] = os.getenv('OPENSEARCH_USERNAME', 'admin')
    app.config["OPENSEARCH_PASSWORD"] = os.getenv('OPENSEARCH_PASSWORD', 'admin')
    
    app.json_encoder = LazyJSONEncoder
    
    
    @token_auth.verify_token
    def verify_token(token: str):
        if token is None or token == "":
            return False
        try:
            client = KeycloakClient()
            return client.verify_jwt(access_token=token)
        except JWTDecodeError as error:
            return False
    
    
    @basic_auth.verify_password
    def verify_password(username: str, password: str) -> Any:
        if username is None or username == "" or password is None or password == "":
            return False
        client = KeycloakClient()
        try:
            return client.verify_jwt(access_token=client.obtain_user_token(username=username, password=password))
        except AssertionError as error:
            logging.error(error)
            return False
        except requests.exceptions.ConnectionError as error:
            logging.error(f"Failed to connect to Authentication Service {error}")
            return False
    
    
    @token_auth.get_user_roles
    def get_user_roles(user: User) -> List[str]:
        return user.roles
    
    
    @basic_auth.get_user_roles
    def get_user_roles(user: User) -> List[str]:
        return user.roles
    
    
    def general_filter(index, results):
        """
        Applies filtering to the result of opensearch queries.
    
        we only want to return specific entries of the result dict to the user, depending on the queried index.
        the keys for the entries per index that shouldn't be deleted are specified in the important_keys dict.
    
        :param index: the search index the query results are about
        :param results: the raw response of the query_index_by_term_opensearch function.
        :return:
        """
        important_keys = {
            "column": ["id", "name", "column_type"],
            "table": ["id", "name", "description"],
            "identifier": ["id", "type", "creator"],
            "user": ["id", "username"],
            "database": ["id", "name", "is_public", "details"],
            "concept": ["uri", "name"],
            "unit": [],
            "view": ["id", "name", "creator", " created"],
        }
        if index not in important_keys.keys():
            raise KeyError(f"Failed to find index {index} in: {important_keys.keys()}")
        for result in results:
            result_keys_copy = tuple(result.keys())
            for key in result_keys_copy:
                if key not in important_keys[index]:
                    del result[key]
        logging.debug('general filter results: %s', results)
        return results
    
    
    @app.route("/health", methods=["GET"], endpoint="actuator_health")
    def health():
        return dict({"status": "UP"}), 200
    
    
    @app.route("/api/search/<string:index>", methods=["GET"], endpoint="search_get_index")
    @metrics.gauge(name='dbrepo_search_index_list', description='Time needed to list search index')
    @swag_from("os-yml/get_index.yml")
    def get_index(index: str):
        """
        returns all entries in a specific index
        :param index: desired index
        :return: list of the results
        """
        logging.info(f'Searching for index: {index}')
        results = OpenSearchClient().query_index_by_term_opensearch("*", "contains")
        try:
            results = general_filter(index, results)
    
            results_per_page = min(request.args.get("results_per_page", 50, type=int), 500)
            max_pages = math.ceil(len(results) / results_per_page)
            page = min(request.args.get("page", 1, type=int), max_pages)
            results = results[(results_per_page * (page - 1)): (results_per_page * page)]
            return dict({"results": results}), 200
        except KeyError:
            return ApiError(status='NOT_FOUND', message=f'Failed to find get index: {index}',
                            code='search.index.missing').model_dump(), 404
    
    
    @app.route("/api/search/<string:field_type>/fields", methods=["GET"], endpoint="search_get_index_fields")
    @metrics.gauge(name='dbrepo_search_type_list', description='Time needed to list search types')
    @swag_from("os-yml/get_fields.yml")
    def get_fields(field_type: str):
        """
        returns a list of attributes of the data for a specific index.
        :param field_type: The search type
        :return:
        """
        logging.info(f'Searching in index database for type: {field_type}')
        try:
            fields = OpenSearchClient().get_fields_for_index(field_type)
            logging.debug(f'get fields for field_type {field_type} resulted in {len(fields)} field(s)')
            return fields, 200
        except NotFoundError:
            return ApiError(status='NOT_FOUND', message=f'Failed to find fields for search type {field_type}',
                            code='search.type.missing').model_dump(), 404
    
    
    @app.route("/api/search", methods=["GET"], endpoint="search_fuzzy_search")
    @metrics.gauge(name='dbrepo_search_fuzzy', description='Time needed to search fuzzy')
    @swag_from("os-yml/get_fuzzy_search.yml")
    def get_fuzzy_search():
        """
        Main endpoint for fuzzy searching.
        :return:
        """
        search_term: str = request.args.get('q')
        if search_term is None or len(search_term) == 0:
            return ApiError(status='BAD_REQUEST', message='Provide a search term with ?q=term',
                            code='search.fuzzy.invalid').model_dump(), 400
        logging.debug(f"search request query: {search_term}")
        results = OpenSearchClient().fuzzy_search(search_term)
        if "hits" in results and "hits" in results["hits"]:
            results = [hit["_source"] for hit in results["hits"]["hits"]]
        return dict({"results": results}), 200
    
    
    @app.route("/api/search/<string:field_type>", methods=["POST"], endpoint="search_post_general_search")
    @metrics.gauge(name='dbrepo_search_type', description='Time needed to search by type')
    @swag_from("os-yml/post_general_search.yml")
    def post_general_search(field_type):
        """
        Main endpoint for fuzzy searching.
        :return:
        """
        if request.content_type != "application/json":
            return ApiError(status='UNSUPPORTED_MEDIA_TYPE', message='Content type needs to be application/json',
                            code='search.general.media').model_dump(), 415
        req_body = request.json
        logging.info(f'Searching in index database for type: {field_type}')
        t1 = request.args.get("t1")
        if not str(t1).isdigit():
            t1 = None
        t2 = request.args.get("t2")
        if not str(t2).isdigit():
            t2 = None
        if t1 is not None and t2 is not None and "unit.uri" in req_body and "concept.uri" in req_body:
            response = OpenSearchClient().unit_independent_search(t1, t2, req_body)
        else:
            response = OpenSearchClient().general_search(field_type, req_body)
        # filter by type
        if field_type == 'table':
            tmp = []
            for database in response:
                if database["tables"] is not None:
                    for table in database["tables"]:
                        table["is_public"] = database["is_public"]
                        tmp.append(table)
            response = tmp
        if field_type == 'identifier':
            tmp = []
            for database in response:
                if database["identifiers"] is not None:
                    for identifier in database['identifiers']:
                        tmp.append(identifier)
                if database["subsets"] is not None:
                    for identifier in database['subsets']:
                        tmp.append(identifier)
                if database["tables"] is not None:
                    for table in database['tables']:
                        if database["identifiers"] is not None:
                            for identifier in table['identifiers']:
                                tmp.append(identifier)
            for view in [x for xs in response for x in xs["views"]]:
                if 'identifier' in view:
                    tmp.append(view['identifier'])
            response = tmp
        elif field_type == 'column':
            response = [x for xs in response for x in xs["tables"]]
            for table in response:
                for column in table["columns"]:
                    column["table_id"] = table["id"]
                    column["database_id"] = table["database_id"]
            response = [x for xs in response for x in xs["columns"]]
        elif field_type == 'concept':
            tmp = []
            tables = [x for xs in response for x in xs["tables"]]
            for column in [x for xs in tables for x in xs["columns"]]:
                if 'concept' in column and column["concept"] is not None:
                    tmp.append(column["concept"])
            response = tmp
        elif field_type == 'unit':
            tmp = []
            tables = [x for xs in response for x in xs["tables"]]
            for column in [x for xs in tables for x in xs["columns"]]:
                if 'unit' in column and column["unit"] is not None:
                    tmp.append(column["unit"])
            response = tmp
        elif field_type == 'view':
            response = [x for xs in response for x in xs["views"]]
        return dict({'results': response, 'type': field_type}), 200
    
    
    @app.route("/api/search/database/<int:database_id>", methods=["PUT"], endpoint="search_put_database")
    @metrics.gauge(name='dbrepo_search_update_database',
                   description='Time needed to update a database in the search database')
    @auth.login_required(role=['update-search-index'])
    def update_database(database_id: int) -> Database | ApiError:
        logging.debug(f"updating database with id: {database_id}")
        try:
            payload: Database = Database.model_validate(request.json)
        except ValidationError as e:
            logging.error(f"Failed to validate: {e}")
            return ApiError(status='BAD_REQUEST', message=f'Malformed payload: {e}',
                            code='search.general.missing').model_dump(), 400
        database = OpenSearchClient().update_database(database_id, payload)
        logging.info(f"Updated database with id : {database_id}")
        return database.model_dump(), 202
    
    
    @app.route("/api/search/database/<int:database_id>", methods=["DELETE"], endpoint="database_delete_database")
    @metrics.gauge(name='dbrepo_search_delete_database',
                   description='Time needed to delete a database in the search database')
    @auth.login_required(role=['admin'])
    def delete_database(database_id: int):
        try:
            OpenSearchClient().delete_database(database_id)
            return dumps({}), 202
        except NotFoundError:
            return ApiError(status='NOT_FOUND', message='Failed to find database',
                            code='search.database.missing').model_dump(), 404