Skip to content
Snippets Groups Projects
Select Git revision
  • 593a3cf74d2f68bc7882c9dd8942788fc92063e7
  • master default protected
  • dev protected
  • 551-init-broker-service-permissions
  • release-1.10 protected
  • 549-test-oai-pmh
  • 545-saving-multiple-times-breaks-pid-metadata
  • release-1.9 protected
  • 499-standalone-compute-service-2
  • 539-load-tests
  • hotfix/helm-chart
  • luca_ba_new_interface
  • 534-bug-when-adding-access-to-user-that-is-not-registered-at-dashboard-service
  • release-1.8 protected
  • 533-integrate-semantic-recommendation
  • feature/openshift
  • 518-spark-doesn-t-map-the-headers-correct
  • 485-fixity-checks
  • 530-various-schema-problems-with-subsets
  • release-1.7 protected
  • fix/auth-service
  • v1.10.1 protected
  • v1.10.0-rc13 protected
  • v1.10.0-rc12 protected
  • v1.10.0-rc11 protected
  • v1.10.0-rc10 protected
  • v1.10.0-rc9 protected
  • v1.10.0-rc8 protected
  • v1.10.0-rc7 protected
  • v1.10.0-rc6 protected
  • v1.10.0-rc5 protected
  • v1.10.0-rc4 protected
  • v1.10.0-rc3 protected
  • v1.10.0-rc2 protected
  • v1.10.0rc1 protected
  • v1.10.0rc0 protected
  • v1.10.0 protected
  • v1.9.3 protected
  • v1.9.2 protected
  • v1.9.2-rc0 protected
  • v1.9.1 protected
41 results

conftest.py

Blame
  • conftest.py 1.78 KiB
    import os
    
    import pytest
    import logging
    
    from minio.deleteobjects import DeleteObject
    from testcontainers.minio import MinioContainer
    
    
    @pytest.fixture(scope="session")
    def session(request):
        """
        Create one minIO container per test run only
        :param request: /
        :return: The minIO container
        """
        logging.debug("[fixture] creating container")
        container = MinioContainer(access_key="seaweedfsadmin", secret_key="seaweedfsadmin")
        logging.debug("[fixture] starting container")
        container.start()
        # set the environment for the client
        endpoint = 'http://' + container.get_container_host_ip() + ':' + container.get_exposed_port(9000)
        os.environ['S3_STORAGE_ENDPOINT'] = endpoint
        client = container.get_client()
        # create buckets
        logging.debug('[fixture] make buckets dbrepo-upload, dbrepo-download')
        client.make_bucket('dbrepo-upload')
        client.make_bucket('dbrepo-download')
    
        # destructor
        def stop_minio():
            container.stop()
    
        request.addfinalizer(stop_minio)
        return container
    
    
    @pytest.fixture(scope="function", autouse=True)
    def cleanup(request, session):
        """
        Clean up after each test by removing the buckets and re-adding them (=so they are empty again)
        :param request: /
        :param session: /
        :return:
        """
        logging.info("[fixture] truncate buckets")
        for bucket in ["dbrepo-upload", "dbrepo-download"]:
            objects = []
            for obj in session.get_client().list_objects(bucket):
                objects.append(DeleteObject(obj.object_name))
            logging.info(f'request to remove objects {objects}')
            errors = session.get_client().remove_objects(bucket, objects)
            for error in errors:
                raise ConnectionError(f'Failed to delete object with key {error.object_name} of bucket {bucket}')