Skip to content
Snippets Groups Projects
Select Git revision
  • 28dc55745a7720af4d54db349882852025b63e06
  • master default protected
  • replication_test
  • release-1.10 protected
  • dev protected
  • 556-usage-statistics
  • 553-semantic-recommendation-2
  • 553-semantic-recommendation
  • release-1.9 protected
  • 551-init-broker-service-permissions
  • 549-test-oai-pmh
  • 545-saving-multiple-times-breaks-pid-metadata
  • 499-standalone-compute-service-2
  • 539-load-tests
  • hotfix/helm-chart
  • luca_ba_new_interface
  • 534-bug-when-adding-access-to-user-that-is-not-registered-at-dashboard-service
  • release-1.8 protected
  • 533-integrate-semantic-recommendation
  • feature/openshift
  • 518-spark-doesn-t-map-the-headers-correct
  • v1.10.4 protected
  • v1.10.3 protected
  • v1.10.2 protected
  • v1.10.1 protected
  • v1.10.0-rc13 protected
  • v1.10.0-rc12 protected
  • v1.10.0-rc11 protected
  • v1.10.0-rc10 protected
  • v1.10.0-rc9 protected
  • v1.10.0-rc8 protected
  • v1.10.0-rc7 protected
  • v1.10.0-rc6 protected
  • v1.10.0-rc5 protected
  • v1.10.0-rc4 protected
  • v1.10.0-rc3 protected
  • v1.10.0-rc2 protected
  • v1.10.0rc1 protected
  • v1.10.0rc0 protected
  • v1.10.0 protected
  • v1.9.3 protected
41 results

install.sh

Blame
  • app.py 18.44 KiB
    import logging
    import math
    import os
    from json import dumps
    from typing import List, Any
    
    from dbrepo.api.dto import Database, ApiError
    from dbrepo.core.client.auth import User, AuthServiceClient
    from dbrepo.core.client.search import SearchServiceClient, flatten
    from flasgger import LazyJSONEncoder, Swagger, swag_from
    from flask import Flask, request, Response
    from flask_cors import CORS
    from flask_httpauth import HTTPTokenAuth, HTTPBasicAuth, MultiAuth
    from opensearchpy import NotFoundError
    from prometheus_flask_exporter import PrometheusMetrics
    from pydantic import ValidationError
    from pydantic.deprecated.json import pydantic_encoder
    
    logging.addLevelName(level=logging.NOTSET, levelName='TRACE')
    logging.basicConfig(level=logging.DEBUG)
    
    from logging.config import dictConfig
    
    # logging configuration
    dictConfig({
        'version': 1,
        'formatters': {
            'default': {
                'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s',
            },
            'simple': {
                'format': '[%(asctime)s] [%(levelname)s] %(message)s',
            },
            "ecs": {
                "()": "ecs_logging.StdlibFormatter"
            },
        },
        'handlers': {
            'wsgi': {
                'class': 'logging.StreamHandler',
                'stream': 'ext://flask.logging.wsgi_errors_stream',
                'formatter': 'simple'
            },
            'file': {
                'class': 'logging.handlers.TimedRotatingFileHandler',
                'formatter': 'ecs',
                'filename': '/var/log/app/service/search/app.log',
                'when': 'm',
                'interval': 1,
                'backupCount': 5,
                'encoding': 'utf8'
            },
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['wsgi', 'file']
        }
    })
    
    # create app object
    app = Flask(__name__)
    
    cors = CORS(app, resources={r"/api/*": {"origins": "*"}})
    
    metrics = PrometheusMetrics(app)
    metrics.info("app_info", "Application info", version="0.0.1")
    app.config["SWAGGER"] = {"openapi": "3.0.1", "title": "Swagger UI", "uiversion": 3}
    
    token_auth = HTTPTokenAuth(scheme='Bearer')
    basic_auth = HTTPBasicAuth()
    auth = MultiAuth(token_auth, basic_auth)
    
    swagger_config = {
        "headers": [],
        "specs": [
            {
                "endpoint": "api-docs",
                "route": "/api-docs.json",
                "rule_filter": lambda rule: rule.endpoint.startswith('search'),
                "model_filter": lambda tag: True,  # all in
            }
        ],
        "static_url_path": "/flasgger_static",
        "swagger_ui": True,
        "specs_route": "/swagger-ui/",
    }
    
    template = {
        "openapi": "3.0.0",
        "components": {
            "schemas": {
                "IndexDto": {
                    "required": ["results", "type"],
                    "properties": {
                        "results": {
                            "type": "array",
                            "items": {
                                "type": "object",
                            }
                        },
                        "type": {
                            "type": "string",
                            "description": "Same as the requested type",
                            "enum": ["database", "table", "view", "column", "user", "identifier", "concept", "unit"]
                        }
                    }
                },
                "IndexFieldsDto": {
                    "required": ["results"],
                    "type": "object",
                    "properties": {
                        "results": {
                            "type": "array",
                            "items": {
                                "$ref": "#/components/schemas/IndexFieldDto"
                            }
                        }
                    }
                },
                "IndexFieldDto": {
                    "required": ["attr_name", "attr_friendly_name", "type"],
                    "type": "object",
                    "properties": {
                        "attr_name": {
                            "type": "string",
                            "example": "name"
                        },
                        "attr_friendly_name": {
                            "type": "string",
                            "example": "Name"
                        },
                        "type": {
                            "type": "string",
                            "example": "string",
                            "description": "OpenSearch data types."
                        }
                    }
                },
                "SearchRequestDto": {
                    "required": ["search_term", "field_value_pairs"],
                    "type": "object",
                    "properties": {
                        "search_term": {
                            "type": "string"
                        },
                        "field_value_pairs": {
                            "type": "object"
                        }
                    }
                },
                "ApiError": {
                    "properties": {
                        "message": {
                            "example": "Message",
                            "type": "string"
                        },
                        "status": {
                            "example": "BAD_REQUEST",
                            "type": "string"
                        },
                        "code": {
                            "example": "error.dashboard.create",
                            "type": "string"
                        }
                    },
                    "type": "object"
                },
            },
            "securitySchemes": {
                "bearerAuth": {
                    "type": "http",
                    "scheme": "bearer",
                    "bearerFormat": "JWT",
                    "in": "header"
                },
                "basicAuth": {
                    "type": "http",
                    "scheme": "basic",
                    "in": "header"
                }
            },
        },
        "info": {
            "title": "Database Repository Search Service API",
            "description": "Service that searches the search database",
            "version": "1.9.0",
            "contact": {
                "name": "Prof. Andreas Rauber",
                "email": "andreas.rauber@tuwien.ac.at"
            },
            "license": {
                "name": "Apache 2.0",
                "url": "https://www.apache.org/licenses/LICENSE-2.0"
            },
        },
        "externalDocs": {
            "description": "Sourcecode Documentation",
            "url": "https://www.ifs.tuwien.ac.at/infrastructures/dbrepo/1.7/"
        },
        "servers": [
            {
                "url": "http://localhost:4000",
                "description": "Generated server url"
            },
            {
                "url": "https://test.dbrepo.tuwien.ac.at",
                "description": "Sandbox"
            }
        ]
    }
    
    swagger = Swagger(app, config=swagger_config, template=template)
    app.config["METADATA_SERVICE_ENDPOINT"] = os.getenv("METADATA_SERVICE_ENDPOINT", "http://metadata-service:8080")
    app.config["JWT_ALGORITHM"] = "HS256"
    app.config["JWT_PUBKEY"] = '-----BEGIN PUBLIC KEY-----\n' + os.getenv("JWT_PUBKEY",
                                                                          "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqqnHQ2BWWW9vDNLRCcxD++xZg/16oqMo/c1l+lcFEjjAIJjJp/HqrPYU/U9GvquGE6PbVFtTzW1KcKawOW+FJNOA3CGo8Q1TFEfz43B8rZpKsFbJKvQGVv1Z4HaKPvLUm7iMm8Hv91cLduuoWx6Q3DPe2vg13GKKEZe7UFghF+0T9u8EKzA/XqQ0OiICmsmYPbwvf9N3bCKsB/Y10EYmZRb8IhCoV9mmO5TxgWgiuNeCTtNCv2ePYqL/U0WvyGFW0reasIK8eg3KrAUj8DpyOgPOVBn3lBGf+3KFSYi+0bwZbJZWqbC/Xlk20Go1YfeJPRIt7ImxD27R/lNjgDO/MwIDAQAB") + '\n-----END PUBLIC KEY-----'
    app.config["AUTH_SERVICE_ENDPOINT"] = os.getenv("AUTH_SERVICE_ENDPOINT", "http://localhost:8080")
    app.config["AUTH_SERVICE_CLIENT"] = os.getenv("AUTH_SERVICE_CLIENT", "dbrepo-client")
    app.config["AUTH_SERVICE_CLIENT_SECRET"] = os.getenv("AUTH_SERVICE_CLIENT_SECRET", "MUwRc7yfXSJwX8AdRMWaQC3Nep1VjwgG")
    app.config["OPENSEARCH_HOST"] = os.getenv('OPENSEARCH_HOST', 'search-db')
    app.config["OPENSEARCH_PORT"] = os.getenv('OPENSEARCH_PORT', '9200')
    app.config["OPENSEARCH_USERNAME"] = os.getenv('OPENSEARCH_USERNAME', 'admin')
    app.config["OPENSEARCH_PASSWORD"] = os.getenv('OPENSEARCH_PASSWORD', 'admin')
    
    app.json_encoder = LazyJSONEncoder
    
    auth_client = AuthServiceClient(app.config["AUTH_SERVICE_ENDPOINT"], app.config["AUTH_SERVICE_CLIENT"],
                                    app.config["AUTH_SERVICE_CLIENT_SECRET"], app.config["JWT_PUBKEY"])
    
    
    @token_auth.verify_token
    def verify_token(token: str) -> bool | User:
        return auth_client.is_valid_token(token)
    
    
    @basic_auth.verify_password
    def verify_password(username: str, password: str) -> Any:
        return auth_client.is_valid_password(username, password)
    
    
    @token_auth.get_user_roles
    def get_user_roles(user: User) -> List[str]:
        return auth_client.get_user_roles(user)
    
    
    @basic_auth.get_user_roles
    def get_user_roles(user: User) -> List[str]:
        return auth_client.get_user_roles(user)
    
    
    headers = {'Content-Type': 'application/json'}
    
    
    def general_filter(index, results):
        """
        Applies filtering to the result of opensearch queries.
    
        we only want to return specific entries of the result dict to the user, depending on the queried index.
        the keys for the entries per index that shouldn't be deleted are specified in the important_keys dict.
    
        :param index: the search index the query results are about
        :param results: the raw response of the query_index_by_term_opensearch function.
        :return:
        """
        important_keys = {
            "column": ["id", "name", "column_type"],
            "table": ["id", "name", "description"],
            "identifier": ["id", "type", "creator"],
            "user": ["id", "username"],
            "database": ["id", "name", "is_public", "is_schema_public", "details"],
            "concept": ["uri", "name"],
            "unit": [],
            "view": ["id", "name", "creator"],
        }
        if index not in important_keys.keys():
            raise KeyError(f"Failed to find index {index} in: {important_keys.keys()}")
        for result in results:
            result_keys_copy = tuple(result.keys())
            for key in result_keys_copy:
                if key not in important_keys[index]:
                    del result[key]
        logging.debug('general filter results: %s', results)
        return results
    
    
    def search_client():
        return SearchServiceClient(app.config["OPENSEARCH_HOST"], int(app.config["OPENSEARCH_PORT"]),
                                   app.config["OPENSEARCH_USERNAME"], app.config["OPENSEARCH_PASSWORD"])
    
    
    @app.route("/health", methods=["GET"], endpoint="actuator_health")
    def health():
        return dict({"status": "UP"}), 200, headers
    
    
    @app.route("/api/search/<string:index>", methods=["GET"], endpoint="search_get_index")
    @metrics.gauge(name='dbrepo_search_index_list', description='Time needed to list search index')
    @swag_from("/app/os-yml/get_index.yml")
    def get_index(index: str):
        """
        returns all entries in a specific index
        :param index: desired index
        :return: list of the results
        """
        logging.debug(f'endpoint get search type: {index}')
        results = search_client().query_index_by_term_opensearch("*", "contains")
        try:
            results = general_filter(index, results)
    
            results_per_page = min(request.args.get("results_per_page", 50, type=int), 500)
            max_pages = math.ceil(len(results) / results_per_page)
            page = min(request.args.get("page", 1, type=int), max_pages)
            results = results[(results_per_page * (page - 1)): (results_per_page * page)]
            return Response(dumps(results, default=pydantic_encoder)), 200, headers
        except KeyError:
            return ApiError(status='NOT_FOUND', message=f'Failed to find get index: {index}',
                            code='search.index.missing').model_dump(), 404, headers
    
    
    @app.route("/api/search/<string:field_type>/fields", methods=["GET"], endpoint="search_get_index_fields")
    @metrics.gauge(name='dbrepo_search_type_list', description='Time needed to list search types')
    @swag_from("/app/os-yml/get_fields.yml")
    def get_fields(field_type: str):
        """
        returns a list of attributes of the data for a specific index.
        :param field_type: The search type
        :return:
        """
        logging.debug(f'endpoint get search type fields: {field_type}')
        try:
            fields = search_client().get_fields_for_index(field_type)
            logging.debug(f'get fields for field_type {field_type} resulted in {len(fields)} field(s)')
            return Response(dumps(fields, default=pydantic_encoder)), 200, headers
        except NotFoundError:
            return ApiError(status='NOT_FOUND', message=f'Failed to find fields for search type {field_type}',
                            code='search.type.missing').model_dump(), 404, headers
    
    
    @app.route("/api/search", methods=["GET"], endpoint="search_fuzzy_search")
    @metrics.gauge(name='dbrepo_search_fuzzy', description='Time needed to search fuzzy')
    @swag_from("/app/os-yml/get_fuzzy_search.yml")
    def get_fuzzy_search():
        """
        Main endpoint for fuzzy searching.
        :return:
        """
        search_term: str | None = request.args.get('q')
        logging.debug(f'endpoint get fuzzy search, q={search_term}')
        if search_term is None or len(search_term) == 0:
            return ApiError(status='BAD_REQUEST', message='Provide a search term with ?q=term',
                            code='search.fuzzy.invalid').model_dump(), 400
        logging.debug(f"search request query: {search_term}")
        user_id, error, status = auth_client.get_user_id(request.headers.get('Authorization'))
        if error is not None and status is not None:
            return error, status, headers
        results = search_client().fuzzy_search(search_term=search_term,
                                               user_id=user_id,
                                               user_token=request.headers.get('Authorization'))
        return Response(dumps(results, default=pydantic_encoder)), 200, headers
    
    
    @app.route("/api/search/<string:field_type>", methods=["POST"], endpoint="search_post_general_search")
    @metrics.gauge(name='dbrepo_search_type', description='Time needed to search by type')
    @swag_from("/app/os-yml/post_general_search.yml")
    def post_general_search(field_type):
        """
        Main endpoint for fuzzy searching.
        :return:
        """
        if request.content_type != "application/json":
            return ApiError(status='UNSUPPORTED_MEDIA_TYPE', message='Content type needs to be application/json',
                            code='search.general.media').model_dump(), 415
        value_pairs = request.json
        logging.debug(f'endpoint get general search, field_type={field_type}, value_pairs={value_pairs}')
        t1 = request.args.get("t1")
        if not str(t1).isdigit():
            t1 = None
        t2 = request.args.get("t2")
        if not str(t2).isdigit():
            t2 = None
        user_id, error, status = auth_client.get_user_id(request.headers.get('Authorization'))
        if error is not None and status is not None:
            return error, status
        if t1 is not None and t2 is not None and "unit.uri" in value_pairs and "concept.uri" in value_pairs:
            response = search_client().unit_independent_search(t1, t2, value_pairs, user_id)
        else:
            response = search_client().general_search(field_type=field_type,
                                                      field_value_pairs=value_pairs,
                                                      user_id=user_id,
                                                      user_token=request.headers.get('Authorization'))
        # filter by type
        tables = [table for table in flatten([database.tables for database in response]) if
                  table.is_public or table.is_schema_public or (user_id is not None and table.owner.id == user_id)]
        views = [view for view in flatten([database.views for database in response]) if
                 view.is_public or view.is_schema_public or (user_id is not None and view.owner.id == user_id)]
        if field_type == 'table':
            logging.debug(f'filtered to {len(tables)} tables')
            response = tables
        if field_type == 'identifier':
            tmp = []
            for database in response:
                if database["identifiers"] is not None:
                    for identifier in database['identifiers']:
                        tmp.append(identifier)
                if database["subsets"] is not None:
                    for identifier in database['subsets']:
                        tmp.append(identifier)
                if database["tables"] is not None:
                    for table in database['tables']:
                        if database["identifiers"] is not None:
                            for identifier in table['identifiers']:
                                tmp.append(identifier)
            for view in [x for xs in response for x in xs["views"]]:
                if 'identifier' in view:
                    tmp.append(view['identifier'])
            response = tmp
        elif field_type == 'column':
            response = flatten([table.columns for table in tables])
        elif field_type == 'concept':
            tmp = []
            tables = [x for xs in response for x in xs["tables"]]
            for column in [x for xs in tables for x in xs["columns"]]:
                if 'concept' in column and column["concept"] is not None:
                    tmp.append(column["concept"])
            response = tmp
        elif field_type == 'unit':
            tmp = []
            tables = [x for xs in response for x in xs["tables"]]
            for column in [x for xs in tables for x in xs["columns"]]:
                if 'unit' in column and column["unit"] is not None:
                    tmp.append(column["unit"])
            response = tmp
        elif field_type == 'view':
            response = views
        return Response(dumps(response, default=pydantic_encoder)), 200, headers
    
    
    @app.route("/api/search/database/<string:database_id>", methods=["PUT"], endpoint="search_save_database")
    @metrics.gauge(name='dbrepo_search_save_database',
                   description='Time needed to update a database in the search database')
    @auth.login_required(role=['update-search-index'])
    def save_database(database_id: str):
        logging.debug(f"save database with id: {database_id}")
        try:
            payload = Database.model_validate(request.json)
        except ValidationError as e:
            logging.error(f"Failed to validate: {str(e).strip()}")
            return ApiError(status='BAD_REQUEST', message=f'Malformed payload: {str(e).strip()}',
                            code='search.general.missing').model_dump(), 400
        search_client().save_database(database_id, payload)
        return Response(), 202, headers
    
    
    @app.route("/api/search/database/<string:database_id>", methods=["DELETE"], endpoint="database_delete_database")
    @metrics.gauge(name='dbrepo_search_delete_database',
                   description='Time needed to delete a database in the search database')
    @auth.login_required(role=['system'])
    def delete_database(database_id: str):
        logging.debug(f"delete database with id: {database_id}")
        try:
            search_client().delete_database(database_id)
            return Response(dumps({})), 202, headers
        except NotFoundError:
            return ApiError(status='NOT_FOUND', message='Failed to find database',
                            code='search.database.missing').model_dump(), 404