Skip to content
Snippets Groups Projects
Verified Commit e630a3d4 authored by Martin Weise's avatar Martin Weise
Browse files

Fixed the test

parent 64c88d4a
No related branches found
No related tags found
4 merge requests!231CI: Remove build for log-service,!228Better error message handling in the frontend,!223Release of version 1.4.0,!213Resolve "Some bugs regarding data"
......@@ -49,7 +49,7 @@ public class ImageServiceIntegrationTest extends BaseUnitTest {
public void create_succeeds() throws ImageAlreadyExistsException {
final ImageCreateDto request = ImageCreateDto.builder()
.name(IMAGE_1_NAME)
.version(IMAGE_1_VERSION)
.version("11.1.3")
.jdbcMethod(IMAGE_1_JDBC)
.dialect(IMAGE_1_DIALECT)
.driverClass(IMAGE_1_DRIVER)
......
......@@ -15,6 +15,7 @@ prometheus-flask-exporter = "~=0.22"
python-dotenv = "~=1.0"
sqlalchemy-utils = "*"
testcontainers-opensearch = "*"
pytest = "*"
[dev-packages]
......
This diff is collapsed.
import os
import pytest
import logging
from testcontainers.opensearch import OpenSearchContainer
@pytest.fixture(scope="session")
def session(request):
"""
Create one OpenSearch container per test run only (admin:admin)
:param request: /
:return: The OpenSearch container
"""
logging.debug("[fixture] creating opensearch container")
container = OpenSearchContainer()
logging.debug("[fixture] starting opensearch container")
container.start()
# set the environment for the client
os.environ['SEARCH_HOST'] = container.get_container_host_ip()
os.environ['SEARCH_PORT'] = container.get_exposed_port(9200)
client = container.get_client()
# destructor
def stop_opensearch():
container.stop()
request.addfinalizer(stop_opensearch)
return container
# @pytest.fixture(scope="function", autouse=True)
# def cleanup(request, session):
# """
# Clean up after each test by removing the buckets and re-adding them (=so they are empty again)
# :param request: /
# :param session: /
# :return:
# """
# logging.info("[fixture] truncate buckets")
# for bucket in ["dbrepo-upload", "dbrepo-download"]:
# objects = []
# for obj in session.get_client().list_objects(bucket):
# objects.append(DeleteObject(obj.object_name))
# logging.info(f'request to remove objects {objects}')
# errors = session.get_client().remove_objects(bucket, objects)
# for error in errors:
# raise ConnectionError(f'Failed to delete object with key {error.object_name} of bucket {bucket}')
......@@ -8,37 +8,43 @@ run the tests via 'pytest' or 'pipenv run pytest'
* enter the port number manually (you prolly have to do that twice if you start it for the first time)
* run the tests via 'pytest' or 'pipenv run pytest'
"""
import requests
def send_request(path, data):
url = f"http://localhost:4000/api/search{path}"
response = requests.post(url, json=data)
if response.status_code == 200:
return response.json()
else:
raise Exception(response.json())
import unittest
from requests import post
def test_textsearch():
class DetermineDatatypesTest(unittest.TestCase):
# @Test
def test_textsearch(self):
print("search for entries that contain the word 'measurement data'")
data = {"search_term": "measurement data"}
result = send_request("", data)
docIDs = [hit["_source"]["docID"] for hit in result["hits"]["hits"]]
response = post(f"http://localhost:4000/api/search", json={
"search_term": "measurement data"
})
if response.status_code != 200:
self.fail("Invalid response code")
docIDs = [hit["_source"]["docID"] for hit in response.json()["hits"]["hits"]]
assert docIDs == [2]
def test_timerange():
# @Test
def test_timerange(self):
print("search for entries that have been created between January and September of 2023")
data = {"t1":"2023-01-01",
"t2":"2023-09-09"}
result = send_request("", data)
docIDs = [hit["_source"]["docID"] for hit in result["hits"]["hits"]]
response = post(f"http://localhost:4000/api/search", json={
"t1": "2023-01-01",
"t2": "2023-09-09"
})
if response.status_code != 200:
self.fail("Invalid response code")
docIDs = [hit["_source"]["docID"] for hit in response.json()["hits"]["hits"]]
assert docIDs == [1, 2]
def test_keywords():
# @Test
def test_keywords(self):
print("Search for entries form the user 'max")
data = {"field": "author", "value": "max"}
result = send_request("", data)
docIDs = [hit["_source"]["docID"] for hit in result["hits"]["hits"]]
response = post(f"http://localhost:4000/api/search", json={
"field": "author",
"value": "max"
})
if response.status_code != 200:
self.fail("Invalid response code")
docIDs = [hit["_source"]["docID"] for hit in response.json()["hits"]["hits"]]
assert docIDs == [2]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment