diff --git a/.docker/config/enabled_plugins b/.docker/config/enabled_plugins index 95f1c0014dd4ee232580adea29176756a25274ed..db0ae888499ea44c2dd7d40f5ac9c8fcc0ca0567 100644 --- a/.docker/config/enabled_plugins +++ b/.docker/config/enabled_plugins @@ -1 +1 @@ -[rabbitmq_prometheus,rabbitmq_auth_backend_ldap,rabbitmq_auth_mechanism_ssl,rabbitmq_management]. \ No newline at end of file +[rabbitmq_prometheus,rabbitmq_auth_backend_ldap,rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_mqtt]. \ No newline at end of file diff --git a/.docker/config/rabbitmq.conf b/.docker/config/rabbitmq.conf index ff592bb3ecd4b003d180dbb44d8bd9acc5a70394..635c0476f618cd312edb2a5a9165d05336c719c1 100644 --- a/.docker/config/rabbitmq.conf +++ b/.docker/config/rabbitmq.conf @@ -14,6 +14,11 @@ log.console = true log.console.level = warning auth_ldap.log = true +# MQTT +mqtt.vhost = dbrepo +mqtt.exchange = dbrepo +mqtt.prefetch = 10 + # Obviously your authentication server cannot vouch for itself, so you'll need another backend with at least one user in # it. You should probably use the internal database auth_backends.1.authn = ldap diff --git a/.docker/docker-compose.yml b/.docker/docker-compose.yml index ec7a87d25295fd3284589152120263b6204b183a..06b23ed1b39a050b5660ad3130520d384436d801 100644 --- a/.docker/docker-compose.yml +++ b/.docker/docker-compose.yml @@ -191,6 +191,7 @@ services: image: docker.io/bitnami/rabbitmq:3.12-debian-12 ports: - 5672:5672 + - 1883:1883 volumes: - ./config/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf - ./config/advanced.config:/etc/rabbitmq/advanced.config diff --git a/.docs/api/broker-service.md b/.docs/api/broker-service.md index f2f684c4a944f4e726a53f4d050bb19b2f368a8d..e923ff9b9f4032037e914788b3e70b948234c63c 100644 --- a/.docs/api/broker-service.md +++ b/.docs/api/broker-service.md @@ -19,6 +19,13 @@ It holds exchanges and topics responsible for holding AMQP messages for later co use [RabbitMQ](https://www.rabbitmq.com/) in the implementation. By default, the endpoint listens to the insecure port `5672` for incoming AMQP tuples and insecure port `15672` for the management UI. +## Supported Protocols + +* AMQP (v0.9.1, v1.0), see [RabbitMQ docs](https://www.rabbitmq.com/docs/next/amqp). +* MQTT (v3.1, v3.1.1, v5), see [RabbitMQ docs](https://www.rabbitmq.com/docs/mqtt). + +## Authentication + The default configuration allows any user in the `cn=system,ou=users,dc=dbrepo,dc=at` from the [Identity Service](../identity-service) to access the Broker Service as user with `administrator` role, i.e. the `cn=admin,dc=dbrepo,dc=at` user that is created by default. @@ -28,6 +35,8 @@ The Broker Service allows two ways of authentication for AMQP tuples: 1. LDAP 2. Plain (RabbitMQ's internal authentication) +## Architecture + The queue architecture of the Broker Service is very simple. There is only one durable, topic exchange `dbrepo` and one quorum queue `dbrepo`, connected with a binding of `dbrepo.#` which routes all tuples with routing key prefix `dbrepo.` to this queue. diff --git a/dbrepo-broker-service/README.md b/dbrepo-broker-service/README.md index 95e5afaefdfc73751db6856526ca8c5e3a8f4c7c..6cff53bb912b21c916f30b2a07f6a7c90a76dc5a 100644 --- a/dbrepo-broker-service/README.md +++ b/dbrepo-broker-service/README.md @@ -1,5 +1,7 @@ # Broker Service +Supports MQTT v3, v4 and v5 (https://www.rabbitmq.com/blog/2023/07/21/mqtt5) + ## Advanced Config https://www.rabbitmq.com/docs/ldap \ No newline at end of file diff --git a/dbrepo-broker-service/advanced.config b/dbrepo-broker-service/advanced.config index 4445ea601954e5c93c32edeba1638135c5af5e59..584bcc52325e61a8d6019d2f092590bb7f989530 100644 --- a/dbrepo-broker-service/advanced.config +++ b/dbrepo-broker-service/advanced.config @@ -1,4 +1,9 @@ [ + { + rabbit, [ + {forced_feature_flags_on_init, [quorum_queue, mqtt_v5]} + ] + }, { rabbitmq_auth_backend_ldap, [ diff --git a/dbrepo-broker-service/enabled_plugins b/dbrepo-broker-service/enabled_plugins index 95f1c0014dd4ee232580adea29176756a25274ed..db0ae888499ea44c2dd7d40f5ac9c8fcc0ca0567 100644 --- a/dbrepo-broker-service/enabled_plugins +++ b/dbrepo-broker-service/enabled_plugins @@ -1 +1 @@ -[rabbitmq_prometheus,rabbitmq_auth_backend_ldap,rabbitmq_auth_mechanism_ssl,rabbitmq_management]. \ No newline at end of file +[rabbitmq_prometheus,rabbitmq_auth_backend_ldap,rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_mqtt]. \ No newline at end of file diff --git a/dbrepo-broker-service/rabbitmq.conf b/dbrepo-broker-service/rabbitmq.conf index ff592bb3ecd4b003d180dbb44d8bd9acc5a70394..635c0476f618cd312edb2a5a9165d05336c719c1 100644 --- a/dbrepo-broker-service/rabbitmq.conf +++ b/dbrepo-broker-service/rabbitmq.conf @@ -14,6 +14,11 @@ log.console = true log.console.level = warning auth_ldap.log = true +# MQTT +mqtt.vhost = dbrepo +mqtt.exchange = dbrepo +mqtt.prefetch = 10 + # Obviously your authentication server cannot vouch for itself, so you'll need another backend with at least one user in # it. You should probably use the internal database auth_backends.1.authn = ldap diff --git a/dbrepo-search-service/init/Pipfile b/dbrepo-search-service/init/Pipfile index 517796af748f40cf55f52bac420a000c04c11b23..9647c2ca4e1866754e9b0d695023fb6ba252a61b 100644 --- a/dbrepo-search-service/init/Pipfile +++ b/dbrepo-search-service/init/Pipfile @@ -9,7 +9,7 @@ opensearch-py = "~=2.2" python-dotenv = "~=1.0" testcontainers-opensearch = "*" pytest = "*" -dbrepo = {path = "./lib/dbrepo-1.4.4.tar.gz"} +dbrepo = {path = "./lib/dbrepo-1.4.6.tar.gz"} [dev-packages] coverage = "*" diff --git a/dbrepo-search-service/init/lib/dbrepo-1.4.6rc1-py3-none-any.whl b/dbrepo-search-service/init/lib/dbrepo-1.4.6rc1-py3-none-any.whl deleted file mode 100644 index 83944ce88d8aec5a3b767aa09caf9a8700323104..0000000000000000000000000000000000000000 Binary files a/dbrepo-search-service/init/lib/dbrepo-1.4.6rc1-py3-none-any.whl and /dev/null differ diff --git a/dbrepo-search-service/init/lib/dbrepo-1.4.6rc1.tar.gz b/dbrepo-search-service/init/lib/dbrepo-1.4.6rc1.tar.gz deleted file mode 100644 index a0c8432134f3c21359cd7fb8ee1a341812a6c034..0000000000000000000000000000000000000000 Binary files a/dbrepo-search-service/init/lib/dbrepo-1.4.6rc1.tar.gz and /dev/null differ diff --git a/dbrepo-search-service/init/test/conftest.py b/dbrepo-search-service/init/test/conftest.py new file mode 100644 index 0000000000000000000000000000000000000000..2a21f689702d7f78e14e73b6170715753e32b49c --- /dev/null +++ b/dbrepo-search-service/init/test/conftest.py @@ -0,0 +1,49 @@ +import logging + +import pytest +from app import app +from flask import current_app + +from testcontainers.opensearch import OpenSearchContainer + + +@pytest.fixture(scope="session", autouse=True) +def session(request): + """ + Create one OpenSearch container per test run only (admin:admin) + :param request: / + :return: The OpenSearch container + """ + logging.debug("[fixture] creating opensearch container") + container = OpenSearchContainer() + logging.debug("[fixture] starting opensearch container") + container.start() + + with app.app_context(): + current_app.config['OPENSEARCH_HOST'] = container.get_container_host_ip() + current_app.config['OPENSEARCH_PORT'] = container.get_exposed_port(9200) + + # destructor + def stop_opensearch(): + container.stop() + + request.addfinalizer(stop_opensearch) + return container + +# @pytest.fixture(scope="function", autouse=True) +# def cleanup(request, session): +# """ +# Clean up after each test by removing the buckets and re-adding them (=so they are empty again) +# :param request: / +# :param session: / +# :return: +# """ +# logging.info("[fixture] truncate buckets") +# for bucket in ["dbrepo-upload", "dbrepo-download"]: +# objects = [] +# for obj in session.get_client().list_objects(bucket): +# objects.append(DeleteObject(obj.object_name)) +# logging.info(f'request to remove objects {objects}') +# errors = session.get_client().remove_objects(bucket, objects) +# for error in errors: +# raise ConnectionError(f'Failed to delete object with key {error.object_name} of bucket {bucket}') diff --git a/dbrepo-search-service/init/test/test_app.py b/dbrepo-search-service/init/test/test_app.py new file mode 100644 index 0000000000000000000000000000000000000000..a8e6d9755b63d35ac6af4c2b5f1d2d23e311e085 --- /dev/null +++ b/dbrepo-search-service/init/test/test_app.py @@ -0,0 +1,95 @@ +import datetime +import unittest + +from app import app + +from clients.opensearch_client import OpenSearchClient + + +class OpenSearchClientTest(unittest.TestCase): + + def test_index_exists_succeeds(self): + with app.app_context(): + client = RestClient(endpoint=self.metadata_service_endpoint) + # mock + client.update_database(database_id=1, data=req) + + # test + req.tables = [Table(id=1, + name="Test Table", + internal_name="test_table", + queue_name="dbrepo", + routing_key="dbrepo.test_tuw1.test_table", + is_public=True, + database_id=req.id, + constraints=Constraints(uniques=[], foreign_keys=[], checks=[], + primary_key=[PrimaryKey(id=1, + table=TableMinimal(id=1, database_id=1), + column=ColumnMinimal(id=1, table_id=1, + database_id=1))]), + is_versioned=True, + created_by="c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", + creator=User(id="c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", + username="foo", + attributes=UserAttributes(theme="dark")), + owner=User(id="c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", + username="foo", + attributes=UserAttributes(theme="dark")), + created=datetime.datetime(2024, 4, 25, 17, 44, tzinfo=datetime.timezone.utc), + columns=[Column(id=1, + name="ID", + internal_name="id", + database_id=req.id, + table_id=1, + auto_generated=True, + column_type=ColumnType.BIGINT, + is_public=True, + is_null_allowed=False)])] + database = client.update_database(database_id=1, data=req) + self.assertEqual(1, database.id) + self.assertEqual("Test", database.name) + self.assertEqual("test_tuw1", database.internal_name) + self.assertEqual("c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", database.creator.id) + self.assertEqual("foo", database.creator.username) + self.assertEqual("dark", database.creator.attributes.theme) + self.assertEqual("c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", database.owner.id) + self.assertEqual("foo", database.owner.username) + self.assertEqual("dark", database.owner.attributes.theme) + self.assertEqual("c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", database.contact.id) + self.assertEqual("foo", database.contact.username) + self.assertEqual("dark", database.contact.attributes.theme) + self.assertEqual(datetime.datetime(2024, 3, 25, 16, tzinfo=datetime.timezone.utc), database.created) + self.assertEqual("dbrepo", database.exchange_name) + self.assertEqual(True, database.is_public) + self.assertEqual(1, database.container.id) + # ... + self.assertEqual(1, database.container.image.id) + # ... + self.assertEqual(1, len(database.tables)) + self.assertEqual(1, database.tables[0].id) + self.assertEqual("Test Table", database.tables[0].name) + self.assertEqual("test_table", database.tables[0].internal_name) + self.assertEqual("dbrepo", database.tables[0].queue_name) + self.assertEqual("dbrepo.test_tuw1.test_table", database.tables[0].routing_key) + self.assertEqual(True, database.tables[0].is_public) + self.assertEqual(1, database.tables[0].database_id) + self.assertEqual(True, database.tables[0].is_versioned) + self.assertEqual("c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", database.tables[0].created_by) + self.assertEqual("c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", database.tables[0].creator.id) + self.assertEqual("foo", database.tables[0].creator.username) + self.assertEqual("dark", database.tables[0].creator.attributes.theme) + self.assertEqual("c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", database.tables[0].owner.id) + self.assertEqual("foo", database.tables[0].owner.username) + self.assertEqual("dark", database.tables[0].owner.attributes.theme) + self.assertEqual(datetime.datetime(2024, 4, 25, 17, 44, tzinfo=datetime.timezone.utc), + database.tables[0].created) + self.assertEqual(1, len(database.tables[0].columns)) + self.assertEqual(1, database.tables[0].columns[0].id) + self.assertEqual("ID", database.tables[0].columns[0].name) + self.assertEqual("id", database.tables[0].columns[0].internal_name) + self.assertEqual(ColumnType.BIGINT, database.tables[0].columns[0].column_type) + self.assertEqual(1, database.tables[0].columns[0].database_id) + self.assertEqual(1, database.tables[0].columns[0].table_id) + self.assertEqual(True, database.tables[0].columns[0].auto_generated) + self.assertEqual(True, database.tables[0].columns[0].is_public) + self.assertEqual(False, database.tables[0].columns[0].is_null_allowed) diff --git a/dbrepo-search-service/test/run_testindicies.py b/dbrepo-search-service/test/run_testindicies.py deleted file mode 100644 index b547573dd65369cd88be1750f1224f05c333a883..0000000000000000000000000000000000000000 --- a/dbrepo-search-service/test/run_testindicies.py +++ /dev/null @@ -1,91 +0,0 @@ -""" -This script spins up docker containers running an opensearch db with predefined entries. -This is useful e.g. if you want to run tests on the functionality of the opensearch_client. - -note: The port of the test container should be 9200, but it's somehow kinda random, -and using environmet variables also doesn't really work, -so the correct port number is just saved in the .testpickle -""" - -from testcontainers.opensearch import OpenSearchContainer -import pprint -import time -import os -import pickle - - -doc1 = { - "author": "aaa", - "name": "Hi! My name is", - "description":"here's some description text", - "created": "2023-07-27", - "docID":1, - "public":True, - "details": { - "nestedObject1": "something", - "nestedObject2": "something else", - "evenMoreNested": { - "bla":"blib", - "blob":"blub" - } - } -} - -doc2 = { - "author": "max", - "name": "Bla Bla", - "public": False, - "description": "here's another description text, about a fictional entry with some random measurement data", - "created": "2023-07-27", - "docID":2, - "details": { - "nestedObject1": "something", - "nestedObject2": "something else" - } -} - -doc3 = { - "author": "mweise", - "name": "databaseName", - "public": True, - "description": "here is a really old entry", - "created":"2022-07-27", - "docID":3, - "details": { - "nestedObject1": "something", - "nestedObject2": "something else" - } -} -placeholderDoc = { - "blib":"blub", - "public": False -} - -with OpenSearchContainer(port_to_expose=9200) as opensearch: - client = opensearch.get_client() - creation_result = client.index(index="database", body=doc1) - creation_result = client.index(index="database", body=doc2) - creation_result = client.index(index="database", body=doc3) - creation_result = client.index(index="user", body=placeholderDoc) - creation_result = client.index(index="table", body=placeholderDoc) - creation_result = client.index(index="column", body=placeholderDoc) - creation_result = client.index(index="identifier", body=placeholderDoc) - refresh_result = client.indices.refresh(index="database") - search_result = client.search(index="database", body={"query": {"match_all": {}}}) - pp = pprint.PrettyPrinter(indent=1) - config = opensearch.get_config() - os.environ["TEST_OPENSEARCH_HOST"] = config["host"] - os.putenv("TEST_OPENSEARCH_HOST", config["host"]) - os.environ["TEST_OPENSEARCH_PORT"] = config["port"] - os.environ["TEST_OPENSEARCH_USERNAME"] = config["user"] - os.environ["TEST_OPENSEARCH_PASSWORD"] = config["password"] - - pickle_info = {} - pickle_info["port"] = config["port"] - pickle_info["host"] = config["host"] - with open(".testpickle", "ab") as outfile: - pickle.dump(pickle_info, outfile) - print(f"serving on port: {config['port']}") - while True: - time.sleep(1) - diff --git a/docker-compose.yml b/docker-compose.yml index 5effbfdb0b75bd013f09e2b85f3ac55a755e7b3a..d36d80c450fab91dc55c8ef3c4dfec6297c70353 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -207,10 +207,10 @@ services: restart: "no" container_name: dbrepo-broker-service hostname: broker-service - image: docker.io/bitnami/rabbitmq:3.12-debian-12 + image: docker.io/bitnami/rabbitmq:3.13.7-debian-12-r4 ports: - - 15672:15672 - 5672:5672 + - 1883:1883 volumes: - ./dbrepo-broker-service/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf - ./dbrepo-broker-service/advanced.config:/etc/rabbitmq/advanced.config diff --git a/helm/dbrepo/charts/dbrepo-mariadb-galera-1.4.6.tgz b/helm/dbrepo/charts/dbrepo-mariadb-galera-1.4.6.tgz index f5a16c907c311685e859a0f971aeae9e603300bc..4af22ff6e998199f69a8e1ff43fd96c3f55aa8ec 100644 Binary files a/helm/dbrepo/charts/dbrepo-mariadb-galera-1.4.6.tgz and b/helm/dbrepo/charts/dbrepo-mariadb-galera-1.4.6.tgz differ diff --git a/helm/dbrepo/values.schema.json b/helm/dbrepo/values.schema.json index 072efb6fe131830c6534580d7a31da547908a4aa..a8e7bbbf743a45c32d8a3a43c8785f4c279fc0ba 100644 --- a/helm/dbrepo/values.schema.json +++ b/helm/dbrepo/values.schema.json @@ -386,6 +386,23 @@ }, "service": { "properties": { + "extraPorts": { + "items": { + "properties": { + "name": { + "type": "string" + }, + "port": { + "type": "integer" + }, + "targetPort": { + "type": "integer" + } + }, + "type": "object" + }, + "type": "array" + }, "managerPortEnabled": { "type": "boolean" }, diff --git a/helm/dbrepo/values.yaml b/helm/dbrepo/values.yaml index d4133c84655da607738f728674877962ef75998a..d889cc85b7bb3e6efce433eaf96fc12409724662 100644 --- a/helm/dbrepo/values.yaml +++ b/helm/dbrepo/values.yaml @@ -262,7 +262,7 @@ brokerservice: enabled: true existingSecret: broker-service-secret ## @param brokerservice.extraPlugins The list of plugins to be activated. - extraPlugins: rabbitmq_prometheus rabbitmq_auth_backend_ldap rabbitmq_auth_mechanism_ssl + extraPlugins: rabbitmq_prometheus rabbitmq_auth_backend_ldap rabbitmq_auth_mechanism_ssl rabbitmq_mqtt persistence: ## @param brokerservice.persistence.enabled If set to true, a PVC will be created. enabled: false @@ -270,6 +270,10 @@ brokerservice: service: type: ClusterIP managerPortEnabled: true + extraPorts: + - name: mqtt + port: 1883 + targetPort: 1883 # loadBalancerIP: ## @param brokerservice.replicaCount The number of replicas. replicaCount: 1 diff --git a/lib/python/dbrepo/AmqpClient.py b/lib/python/dbrepo/AmqpClient.py index 27f7fc4f0fd702e7f40135102f844bb8108971d2..cd0b1140996d0572586985ee0eb2e1285e556c27 100644 --- a/lib/python/dbrepo/AmqpClient.py +++ b/lib/python/dbrepo/AmqpClient.py @@ -4,6 +4,8 @@ import sys import json import logging +from dbrepo.api.exceptions import AuthenticationError + logging.basicConfig(format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s', level=logging.INFO, stream=sys.stdout) @@ -14,9 +16,9 @@ class AmqpClient: via environment variables, e.g. set endpoint with DBREPO_ENDPOINT. You can override the constructor parameters \ with the environment variables. - :param broker_host: The AMQP API host. Optional. Default: "broker-service" - :param broker_port: The AMQP API port. Optional. Default: 5672 - :param broker_virtual_host: The AMQP API virtual host. Optional. Default: "/" + :param broker_host: The AMQP API host. Optional. Default: "localhost". + :param broker_port: The AMQP API port. Optional. Default: 5672, + :param broker_virtual_host: The AMQP API virtual host. Optional. Default: "dbrepo". :param username: The AMQP API username. Optional. :param password: The AMQP API password. Optional. """ @@ -27,9 +29,9 @@ class AmqpClient: password: str = None def __init__(self, - broker_host: str = 'broker-service', + broker_host: str = 'localhost', broker_port: int = 5672, - broker_virtual_host: str = '/', + broker_virtual_host: str = 'dbrepo', username: str = None, password: str = None) -> None: self.broker_host = os.environ.get('AMQP_API_HOST', broker_host) @@ -41,14 +43,16 @@ class AmqpClient: self.username = os.environ.get('AMQP_API_USERNAME', username) self.password = os.environ.get('AMQP_API_PASSWORD', password) - def publish(self, exchange: str, routing_key: str, data=dict) -> None: + def publish(self, routing_key: str, data=dict, exchange: str = 'dbrepo') -> None: """ Publishes data to a given exchange with the given routing key with a blocking connection. - :param exchange: The exchange name. :param routing_key: The routing key. :param data: The data. + :param exchange: The exchange name. Default: "dbrepo". """ + if self.username is None or self.password is None: + raise AuthenticationError(f"Failed to perform request: authentication required") parameters = pika.ConnectionParameters(host=self.broker_host, port=self.broker_port, virtual_host=self.broker_virtual_host, credentials=pika.credentials.PlainCredentials(self.username,