Skip to content
Snippets Groups Projects
Verified Commit dca6a7f3 authored by Martin Weise's avatar Martin Weise
Browse files

Hotfix MQTT support

parent 3ca954bc
No related branches found
No related tags found
7 merge requests!345Updated docs and endpoints:,!341Fixed mapping problem where UK and FK share columns they are inserted,!339Fixed mapping problem where UK and FK share columns they are inserted,!338Fixed mapping problem where UK and FK share columns they are inserted,!334Fixed mapping problem where UK and FK share columns they are inserted,!333Fixed mapping problem where UK and FK share columns they are inserted,!328Hotfix/mapping
Showing
with 209 additions and 104 deletions
[rabbitmq_prometheus,rabbitmq_auth_backend_ldap,rabbitmq_auth_mechanism_ssl,rabbitmq_management].
\ No newline at end of file
[rabbitmq_prometheus,rabbitmq_auth_backend_ldap,rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_mqtt].
\ No newline at end of file
......@@ -14,6 +14,11 @@ log.console = true
log.console.level = warning
auth_ldap.log = true
# MQTT
mqtt.vhost = dbrepo
mqtt.exchange = dbrepo
mqtt.prefetch = 10
# Obviously your authentication server cannot vouch for itself, so you'll need another backend with at least one user in
# it. You should probably use the internal database
auth_backends.1.authn = ldap
......
......@@ -191,6 +191,7 @@ services:
image: docker.io/bitnami/rabbitmq:3.12-debian-12
ports:
- 5672:5672
- 1883:1883
volumes:
- ./config/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./config/advanced.config:/etc/rabbitmq/advanced.config
......
......@@ -19,6 +19,13 @@ It holds exchanges and topics responsible for holding AMQP messages for later co
use [RabbitMQ](https://www.rabbitmq.com/) in the implementation. By default, the endpoint listens to the insecure port `5672` for incoming
AMQP tuples and insecure port `15672` for the management UI.
## Supported Protocols
* AMQP (v0.9.1, v1.0), see [RabbitMQ docs](https://www.rabbitmq.com/docs/next/amqp).
* MQTT (v3.1, v3.1.1, v5), see [RabbitMQ docs](https://www.rabbitmq.com/docs/mqtt).
## Authentication
The default configuration allows any user in the `cn=system,ou=users,dc=dbrepo,dc=at` from the
[Identity Service](../identity-service) to access the Broker Service as user with `administrator` role, i.e. the
`cn=admin,dc=dbrepo,dc=at` user that is created by default.
......@@ -28,6 +35,8 @@ The Broker Service allows two ways of authentication for AMQP tuples:
1. LDAP
2. Plain (RabbitMQ's internal authentication)
## Architecture
The queue architecture of the Broker Service is very simple. There is only one durable, topic exchange `dbrepo` and one
quorum queue `dbrepo`, connected with a binding of `dbrepo.#` which routes all tuples with routing key prefix `dbrepo.`
to this queue.
......
# Broker Service
Supports MQTT v3, v4 and v5 (https://www.rabbitmq.com/blog/2023/07/21/mqtt5)
## Advanced Config
https://www.rabbitmq.com/docs/ldap
\ No newline at end of file
[
{
rabbit, [
{forced_feature_flags_on_init, [quorum_queue, mqtt_v5]}
]
},
{
rabbitmq_auth_backend_ldap,
[
......
[rabbitmq_prometheus,rabbitmq_auth_backend_ldap,rabbitmq_auth_mechanism_ssl,rabbitmq_management].
\ No newline at end of file
[rabbitmq_prometheus,rabbitmq_auth_backend_ldap,rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_mqtt].
\ No newline at end of file
......@@ -14,6 +14,11 @@ log.console = true
log.console.level = warning
auth_ldap.log = true
# MQTT
mqtt.vhost = dbrepo
mqtt.exchange = dbrepo
mqtt.prefetch = 10
# Obviously your authentication server cannot vouch for itself, so you'll need another backend with at least one user in
# it. You should probably use the internal database
auth_backends.1.authn = ldap
......
......@@ -9,7 +9,7 @@ opensearch-py = "~=2.2"
python-dotenv = "~=1.0"
testcontainers-opensearch = "*"
pytest = "*"
dbrepo = {path = "./lib/dbrepo-1.4.4.tar.gz"}
dbrepo = {path = "./lib/dbrepo-1.4.6.tar.gz"}
[dev-packages]
coverage = "*"
......
File deleted
File deleted
import logging
import pytest
from app import app
from flask import current_app
from testcontainers.opensearch import OpenSearchContainer
@pytest.fixture(scope="session", autouse=True)
def session(request):
"""
Create one OpenSearch container per test run only (admin:admin)
:param request: /
:return: The OpenSearch container
"""
logging.debug("[fixture] creating opensearch container")
container = OpenSearchContainer()
logging.debug("[fixture] starting opensearch container")
container.start()
with app.app_context():
current_app.config['OPENSEARCH_HOST'] = container.get_container_host_ip()
current_app.config['OPENSEARCH_PORT'] = container.get_exposed_port(9200)
# destructor
def stop_opensearch():
container.stop()
request.addfinalizer(stop_opensearch)
return container
# @pytest.fixture(scope="function", autouse=True)
# def cleanup(request, session):
# """
# Clean up after each test by removing the buckets and re-adding them (=so they are empty again)
# :param request: /
# :param session: /
# :return:
# """
# logging.info("[fixture] truncate buckets")
# for bucket in ["dbrepo-upload", "dbrepo-download"]:
# objects = []
# for obj in session.get_client().list_objects(bucket):
# objects.append(DeleteObject(obj.object_name))
# logging.info(f'request to remove objects {objects}')
# errors = session.get_client().remove_objects(bucket, objects)
# for error in errors:
# raise ConnectionError(f'Failed to delete object with key {error.object_name} of bucket {bucket}')
import datetime
import unittest
from app import app
from clients.opensearch_client import OpenSearchClient
class OpenSearchClientTest(unittest.TestCase):
def test_index_exists_succeeds(self):
with app.app_context():
client = RestClient(endpoint=self.metadata_service_endpoint)
# mock
client.update_database(database_id=1, data=req)
# test
req.tables = [Table(id=1,
name="Test Table",
internal_name="test_table",
queue_name="dbrepo",
routing_key="dbrepo.test_tuw1.test_table",
is_public=True,
database_id=req.id,
constraints=Constraints(uniques=[], foreign_keys=[], checks=[],
primary_key=[PrimaryKey(id=1,
table=TableMinimal(id=1, database_id=1),
column=ColumnMinimal(id=1, table_id=1,
database_id=1))]),
is_versioned=True,
created_by="c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502",
creator=User(id="c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502",
username="foo",
attributes=UserAttributes(theme="dark")),
owner=User(id="c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502",
username="foo",
attributes=UserAttributes(theme="dark")),
created=datetime.datetime(2024, 4, 25, 17, 44, tzinfo=datetime.timezone.utc),
columns=[Column(id=1,
name="ID",
internal_name="id",
database_id=req.id,
table_id=1,
auto_generated=True,
column_type=ColumnType.BIGINT,
is_public=True,
is_null_allowed=False)])]
database = client.update_database(database_id=1, data=req)
self.assertEqual(1, database.id)
self.assertEqual("Test", database.name)
self.assertEqual("test_tuw1", database.internal_name)
self.assertEqual("c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", database.creator.id)
self.assertEqual("foo", database.creator.username)
self.assertEqual("dark", database.creator.attributes.theme)
self.assertEqual("c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", database.owner.id)
self.assertEqual("foo", database.owner.username)
self.assertEqual("dark", database.owner.attributes.theme)
self.assertEqual("c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", database.contact.id)
self.assertEqual("foo", database.contact.username)
self.assertEqual("dark", database.contact.attributes.theme)
self.assertEqual(datetime.datetime(2024, 3, 25, 16, tzinfo=datetime.timezone.utc), database.created)
self.assertEqual("dbrepo", database.exchange_name)
self.assertEqual(True, database.is_public)
self.assertEqual(1, database.container.id)
# ...
self.assertEqual(1, database.container.image.id)
# ...
self.assertEqual(1, len(database.tables))
self.assertEqual(1, database.tables[0].id)
self.assertEqual("Test Table", database.tables[0].name)
self.assertEqual("test_table", database.tables[0].internal_name)
self.assertEqual("dbrepo", database.tables[0].queue_name)
self.assertEqual("dbrepo.test_tuw1.test_table", database.tables[0].routing_key)
self.assertEqual(True, database.tables[0].is_public)
self.assertEqual(1, database.tables[0].database_id)
self.assertEqual(True, database.tables[0].is_versioned)
self.assertEqual("c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", database.tables[0].created_by)
self.assertEqual("c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", database.tables[0].creator.id)
self.assertEqual("foo", database.tables[0].creator.username)
self.assertEqual("dark", database.tables[0].creator.attributes.theme)
self.assertEqual("c6b71ef5-2d2f-48b2-9d79-b8f23a3a0502", database.tables[0].owner.id)
self.assertEqual("foo", database.tables[0].owner.username)
self.assertEqual("dark", database.tables[0].owner.attributes.theme)
self.assertEqual(datetime.datetime(2024, 4, 25, 17, 44, tzinfo=datetime.timezone.utc),
database.tables[0].created)
self.assertEqual(1, len(database.tables[0].columns))
self.assertEqual(1, database.tables[0].columns[0].id)
self.assertEqual("ID", database.tables[0].columns[0].name)
self.assertEqual("id", database.tables[0].columns[0].internal_name)
self.assertEqual(ColumnType.BIGINT, database.tables[0].columns[0].column_type)
self.assertEqual(1, database.tables[0].columns[0].database_id)
self.assertEqual(1, database.tables[0].columns[0].table_id)
self.assertEqual(True, database.tables[0].columns[0].auto_generated)
self.assertEqual(True, database.tables[0].columns[0].is_public)
self.assertEqual(False, database.tables[0].columns[0].is_null_allowed)
"""
This script spins up docker containers running an opensearch db with predefined entries.
This is useful e.g. if you want to run tests on the functionality of the opensearch_client.
note: The port of the test container should be 9200, but it's somehow kinda random,
and using environmet variables also doesn't really work,
so the correct port number is just saved in the .testpickle
"""
from testcontainers.opensearch import OpenSearchContainer
import pprint
import time
import os
import pickle
doc1 = {
"author": "aaa",
"name": "Hi! My name is",
"description":"here's some description text",
"created": "2023-07-27",
"docID":1,
"public":True,
"details": {
"nestedObject1": "something",
"nestedObject2": "something else",
"evenMoreNested": {
"bla":"blib",
"blob":"blub"
}
}
}
doc2 = {
"author": "max",
"name": "Bla Bla",
"public": False,
"description": "here's another description text, about a fictional entry with some random measurement data",
"created": "2023-07-27",
"docID":2,
"details": {
"nestedObject1": "something",
"nestedObject2": "something else"
}
}
doc3 = {
"author": "mweise",
"name": "databaseName",
"public": True,
"description": "here is a really old entry",
"created":"2022-07-27",
"docID":3,
"details": {
"nestedObject1": "something",
"nestedObject2": "something else"
}
}
placeholderDoc = {
"blib":"blub",
"public": False
}
with OpenSearchContainer(port_to_expose=9200) as opensearch:
client = opensearch.get_client()
creation_result = client.index(index="database", body=doc1)
creation_result = client.index(index="database", body=doc2)
creation_result = client.index(index="database", body=doc3)
creation_result = client.index(index="user", body=placeholderDoc)
creation_result = client.index(index="table", body=placeholderDoc)
creation_result = client.index(index="column", body=placeholderDoc)
creation_result = client.index(index="identifier", body=placeholderDoc)
refresh_result = client.indices.refresh(index="database")
search_result = client.search(index="database", body={"query": {"match_all": {}}})
pp = pprint.PrettyPrinter(indent=1)
config = opensearch.get_config()
os.environ["TEST_OPENSEARCH_HOST"] = config["host"]
os.putenv("TEST_OPENSEARCH_HOST", config["host"])
os.environ["TEST_OPENSEARCH_PORT"] = config["port"]
os.environ["TEST_OPENSEARCH_USERNAME"] = config["user"]
os.environ["TEST_OPENSEARCH_PASSWORD"] = config["password"]
pickle_info = {}
pickle_info["port"] = config["port"]
pickle_info["host"] = config["host"]
with open(".testpickle", "ab") as outfile:
pickle.dump(pickle_info, outfile)
print(f"serving on port: {config['port']}")
while True:
time.sleep(1)
......@@ -207,10 +207,10 @@ services:
restart: "no"
container_name: dbrepo-broker-service
hostname: broker-service
image: docker.io/bitnami/rabbitmq:3.12-debian-12
image: docker.io/bitnami/rabbitmq:3.13.7-debian-12-r4
ports:
- 15672:15672
- 5672:5672
- 1883:1883
volumes:
- ./dbrepo-broker-service/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./dbrepo-broker-service/advanced.config:/etc/rabbitmq/advanced.config
......
No preview for this file type
......@@ -386,6 +386,23 @@
},
"service": {
"properties": {
"extraPorts": {
"items": {
"properties": {
"name": {
"type": "string"
},
"port": {
"type": "integer"
},
"targetPort": {
"type": "integer"
}
},
"type": "object"
},
"type": "array"
},
"managerPortEnabled": {
"type": "boolean"
},
......
......@@ -262,7 +262,7 @@ brokerservice:
enabled: true
existingSecret: broker-service-secret
## @param brokerservice.extraPlugins The list of plugins to be activated.
extraPlugins: rabbitmq_prometheus rabbitmq_auth_backend_ldap rabbitmq_auth_mechanism_ssl
extraPlugins: rabbitmq_prometheus rabbitmq_auth_backend_ldap rabbitmq_auth_mechanism_ssl rabbitmq_mqtt
persistence:
## @param brokerservice.persistence.enabled If set to true, a PVC will be created.
enabled: false
......@@ -270,6 +270,10 @@ brokerservice:
service:
type: ClusterIP
managerPortEnabled: true
extraPorts:
- name: mqtt
port: 1883
targetPort: 1883
# loadBalancerIP:
## @param brokerservice.replicaCount The number of replicas.
replicaCount: 1
......
......@@ -4,6 +4,8 @@ import sys
import json
import logging
from dbrepo.api.exceptions import AuthenticationError
logging.basicConfig(format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s', level=logging.INFO,
stream=sys.stdout)
......@@ -14,9 +16,9 @@ class AmqpClient:
via environment variables, e.g. set endpoint with DBREPO_ENDPOINT. You can override the constructor parameters \
with the environment variables.
:param broker_host: The AMQP API host. Optional. Default: "broker-service"
:param broker_port: The AMQP API port. Optional. Default: 5672
:param broker_virtual_host: The AMQP API virtual host. Optional. Default: "/"
:param broker_host: The AMQP API host. Optional. Default: "localhost".
:param broker_port: The AMQP API port. Optional. Default: 5672,
:param broker_virtual_host: The AMQP API virtual host. Optional. Default: "dbrepo".
:param username: The AMQP API username. Optional.
:param password: The AMQP API password. Optional.
"""
......@@ -27,9 +29,9 @@ class AmqpClient:
password: str = None
def __init__(self,
broker_host: str = 'broker-service',
broker_host: str = 'localhost',
broker_port: int = 5672,
broker_virtual_host: str = '/',
broker_virtual_host: str = 'dbrepo',
username: str = None,
password: str = None) -> None:
self.broker_host = os.environ.get('AMQP_API_HOST', broker_host)
......@@ -41,14 +43,16 @@ class AmqpClient:
self.username = os.environ.get('AMQP_API_USERNAME', username)
self.password = os.environ.get('AMQP_API_PASSWORD', password)
def publish(self, exchange: str, routing_key: str, data=dict) -> None:
def publish(self, routing_key: str, data=dict, exchange: str = 'dbrepo') -> None:
"""
Publishes data to a given exchange with the given routing key with a blocking connection.
:param exchange: The exchange name.
:param routing_key: The routing key.
:param data: The data.
:param exchange: The exchange name. Default: "dbrepo".
"""
if self.username is None or self.password is None:
raise AuthenticationError(f"Failed to perform request: authentication required")
parameters = pika.ConnectionParameters(host=self.broker_host, port=self.broker_port,
virtual_host=self.broker_virtual_host,
credentials=pika.credentials.PlainCredentials(self.username,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment