From 44b253494348b82c5333312b6fa708e39a356b97 Mon Sep 17 00:00:00 2001
From: Martin Weise <martin.weise@tuwien.ac.at>
Date: Thu, 30 Sep 2021 12:09:49 +0200
Subject: [PATCH] documented some more

---
 .../at/tuwien/endpoints/DataEndpoint.java     |   2 +-
 .../at/tuwien/endpoints/TableEndpoint.java    |  10 +-
 .../endpoint/DataEndpointIntegrationTest.java |   6 +-
 .../endpoint/TableEndpointUnitTest.java       |   4 +-
 .../service/DataServiceIntegrationTest.java   |   4 +-
 .../service/TableServiceIntegrationTest.java  |   3 +-
 .../tuwien/service/TableServiceUnitTest.java  |   3 +-
 .../tuwien/service/MessageQueueService.java   |  49 ++++
 .../java/at/tuwien/service/TableService.java  | 228 ++++--------------
 .../service/{ => impl}/JdbcConnector.java     |   3 +-
 .../service/{ => impl}/MariaDataService.java  |   3 +-
 .../RabbitMqService.java}                     |  44 ++--
 .../tuwien/service/impl/TableServiceImpl.java | 194 +++++++++++++++
 13 files changed, 331 insertions(+), 222 deletions(-)
 create mode 100644 fda-table-service/services/src/main/java/at/tuwien/service/MessageQueueService.java
 rename fda-table-service/services/src/main/java/at/tuwien/service/{ => impl}/JdbcConnector.java (98%)
 rename fda-table-service/services/src/main/java/at/tuwien/service/{ => impl}/MariaDataService.java (99%)
 rename fda-table-service/services/src/main/java/at/tuwien/service/{AmqpService.java => impl/RabbitMqService.java} (76%)
 create mode 100644 fda-table-service/services/src/main/java/at/tuwien/service/impl/TableServiceImpl.java

diff --git a/fda-table-service/rest-service/src/main/java/at/tuwien/endpoints/DataEndpoint.java b/fda-table-service/rest-service/src/main/java/at/tuwien/endpoints/DataEndpoint.java
index 6364a38b04..062cb1cdc8 100644
--- a/fda-table-service/rest-service/src/main/java/at/tuwien/endpoints/DataEndpoint.java
+++ b/fda-table-service/rest-service/src/main/java/at/tuwien/endpoints/DataEndpoint.java
@@ -5,7 +5,7 @@ import at.tuwien.api.database.table.TableCsvDto;
 import at.tuwien.api.database.table.TableInsertDto;
 import at.tuwien.entities.database.table.Table;
 import at.tuwien.exception.*;
-import at.tuwien.service.MariaDataService;
+import at.tuwien.service.impl.MariaDataService;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
diff --git a/fda-table-service/rest-service/src/main/java/at/tuwien/endpoints/TableEndpoint.java b/fda-table-service/rest-service/src/main/java/at/tuwien/endpoints/TableEndpoint.java
index 17dd7989ae..81ad864f82 100644
--- a/fda-table-service/rest-service/src/main/java/at/tuwien/endpoints/TableEndpoint.java
+++ b/fda-table-service/rest-service/src/main/java/at/tuwien/endpoints/TableEndpoint.java
@@ -4,8 +4,8 @@ import at.tuwien.api.database.table.*;
 import at.tuwien.entities.database.table.Table;
 import at.tuwien.exception.*;
 import at.tuwien.mapper.TableMapper;
-import at.tuwien.service.AmqpService;
-import at.tuwien.service.TableService;
+import at.tuwien.service.impl.RabbitMqService;
+import at.tuwien.service.impl.TableServiceImpl;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
@@ -26,12 +26,12 @@ import java.util.stream.Collectors;
 @RequestMapping("/api/database/{id}")
 public class TableEndpoint {
 
-    private final TableService tableService;
-    private final AmqpService amqpService;
+    private final TableServiceImpl tableService;
+    private final RabbitMqService amqpService;
     private final TableMapper tableMapper;
 
     @Autowired
-    public TableEndpoint(TableService tableService, AmqpService amqpService, TableMapper tableMapper) {
+    public TableEndpoint(TableServiceImpl tableService, RabbitMqService amqpService, TableMapper tableMapper) {
         this.tableService = tableService;
         this.amqpService = amqpService;
         this.tableMapper = tableMapper;
diff --git a/fda-table-service/rest-service/src/test/java/at/tuwien/endpoint/DataEndpointIntegrationTest.java b/fda-table-service/rest-service/src/test/java/at/tuwien/endpoint/DataEndpointIntegrationTest.java
index f0ef7ead8a..97b3a54b51 100644
--- a/fda-table-service/rest-service/src/test/java/at/tuwien/endpoint/DataEndpointIntegrationTest.java
+++ b/fda-table-service/rest-service/src/test/java/at/tuwien/endpoint/DataEndpointIntegrationTest.java
@@ -8,8 +8,8 @@ import at.tuwien.exception.*;
 import at.tuwien.repository.jpa.DatabaseRepository;
 import at.tuwien.repository.jpa.ImageRepository;
 import at.tuwien.repository.jpa.TableRepository;
-import at.tuwien.service.MariaDataService;
-import at.tuwien.service.TableService;
+import at.tuwien.service.impl.MariaDataService;
+import at.tuwien.service.impl.TableServiceImpl;
 import com.github.dockerjava.api.DockerClient;
 import com.github.dockerjava.api.command.CreateContainerResponse;
 import com.github.dockerjava.api.exception.NotModifiedException;
@@ -62,7 +62,7 @@ public class DataEndpointIntegrationTest extends BaseUnitTest {
     private MariaDataService dataService;
 
     @Autowired
-    private TableService tableService;
+    private TableServiceImpl tableService;
 
     @Autowired
     private TableRepository tableRepository;
diff --git a/fda-table-service/rest-service/src/test/java/at/tuwien/endpoint/TableEndpointUnitTest.java b/fda-table-service/rest-service/src/test/java/at/tuwien/endpoint/TableEndpointUnitTest.java
index 9ac21bbe82..f2b59ec195 100644
--- a/fda-table-service/rest-service/src/test/java/at/tuwien/endpoint/TableEndpointUnitTest.java
+++ b/fda-table-service/rest-service/src/test/java/at/tuwien/endpoint/TableEndpointUnitTest.java
@@ -9,7 +9,7 @@ import at.tuwien.endpoints.TableEndpoint;
 import at.tuwien.exception.*;
 import at.tuwien.repository.jpa.DatabaseRepository;
 import at.tuwien.repository.jpa.TableRepository;
-import at.tuwien.service.TableService;
+import at.tuwien.service.impl.TableServiceImpl;
 import com.rabbitmq.client.Channel;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -40,7 +40,7 @@ public class TableEndpointUnitTest extends BaseUnitTest {
     private ReadyConfig readyConfig;
 
     @MockBean
-    private TableService tableService;
+    private TableServiceImpl tableService;
 
     @MockBean
     private TableRepository tableRepository;
diff --git a/fda-table-service/rest-service/src/test/java/at/tuwien/service/DataServiceIntegrationTest.java b/fda-table-service/rest-service/src/test/java/at/tuwien/service/DataServiceIntegrationTest.java
index 4b6f99e125..17181d5ac3 100644
--- a/fda-table-service/rest-service/src/test/java/at/tuwien/service/DataServiceIntegrationTest.java
+++ b/fda-table-service/rest-service/src/test/java/at/tuwien/service/DataServiceIntegrationTest.java
@@ -11,6 +11,8 @@ import at.tuwien.repository.jpa.ContainerRepository;
 import at.tuwien.repository.jpa.DatabaseRepository;
 import at.tuwien.repository.jpa.ImageRepository;
 import at.tuwien.repository.jpa.TableRepository;
+import at.tuwien.service.impl.MariaDataService;
+import at.tuwien.service.impl.TableServiceImpl;
 import com.github.dockerjava.api.DockerClient;
 import com.github.dockerjava.api.command.CreateContainerResponse;
 import com.github.dockerjava.api.exception.NotModifiedException;
@@ -66,7 +68,7 @@ public class DataServiceIntegrationTest extends BaseUnitTest {
     private TableRepository tableRepository;
 
     @Autowired
-    private TableService tableService;
+    private TableServiceImpl tableService;
 
     @Autowired
     private MariaDataService dataService;
diff --git a/fda-table-service/rest-service/src/test/java/at/tuwien/service/TableServiceIntegrationTest.java b/fda-table-service/rest-service/src/test/java/at/tuwien/service/TableServiceIntegrationTest.java
index d3e00b4ef1..942ac6cd53 100644
--- a/fda-table-service/rest-service/src/test/java/at/tuwien/service/TableServiceIntegrationTest.java
+++ b/fda-table-service/rest-service/src/test/java/at/tuwien/service/TableServiceIntegrationTest.java
@@ -9,6 +9,7 @@ import at.tuwien.repository.jpa.ContainerRepository;
 import at.tuwien.repository.jpa.DatabaseRepository;
 import at.tuwien.repository.jpa.ImageRepository;
 import at.tuwien.repository.jpa.TableRepository;
+import at.tuwien.service.impl.TableServiceImpl;
 import com.github.dockerjava.api.DockerClient;
 import com.github.dockerjava.api.command.CreateContainerResponse;
 import com.github.dockerjava.api.exception.NotModifiedException;
@@ -60,7 +61,7 @@ public class TableServiceIntegrationTest extends BaseUnitTest {
     private TableRepository tableRepository;
 
     @Autowired
-    private TableService tableService;
+    private TableServiceImpl tableService;
 
     private CreateContainerResponse request;
 
diff --git a/fda-table-service/rest-service/src/test/java/at/tuwien/service/TableServiceUnitTest.java b/fda-table-service/rest-service/src/test/java/at/tuwien/service/TableServiceUnitTest.java
index f4da420e6b..694559ec9f 100644
--- a/fda-table-service/rest-service/src/test/java/at/tuwien/service/TableServiceUnitTest.java
+++ b/fda-table-service/rest-service/src/test/java/at/tuwien/service/TableServiceUnitTest.java
@@ -8,6 +8,7 @@ import at.tuwien.entities.database.table.Table;
 import at.tuwien.exception.*;
 import at.tuwien.repository.jpa.DatabaseRepository;
 import at.tuwien.repository.jpa.TableRepository;
+import at.tuwien.service.impl.TableServiceImpl;
 import com.opencsv.exceptions.CsvException;
 import com.rabbitmq.client.Channel;
 import org.junit.jupiter.api.BeforeAll;
@@ -41,7 +42,7 @@ public class TableServiceUnitTest extends BaseUnitTest {
     private ReadyConfig readyConfig;
 
     @Autowired
-    private TableService tableService;
+    private TableServiceImpl tableService;
 
     @MockBean
     private DatabaseRepository databaseRepository;
diff --git a/fda-table-service/services/src/main/java/at/tuwien/service/MessageQueueService.java b/fda-table-service/services/src/main/java/at/tuwien/service/MessageQueueService.java
new file mode 100644
index 0000000000..4ccfe3690d
--- /dev/null
+++ b/fda-table-service/services/src/main/java/at/tuwien/service/MessageQueueService.java
@@ -0,0 +1,49 @@
+package at.tuwien.service;
+
+import at.tuwien.entities.database.Database;
+import at.tuwien.entities.database.table.Table;
+import at.tuwien.exception.AmqpException;
+
+import java.io.IOException;
+
+public interface MessageQueueService {
+
+    /**
+     * Declares the exchange, the topics and the consumers
+     *
+     * @throws IOException Error on any of these actions.
+     */
+    void init() throws IOException;
+
+    /**
+     * Creates a queue/topic for a table.
+     *
+     * @param table The table.
+     * @throws AmqpException Creation failed.
+     */
+    void createQueue(Table table) throws AmqpException;
+
+    /**
+     * Creates an exchange for a database.
+     *
+     * @param database The database.
+     * @throws IOException Creation failed.
+     */
+    void create(Database database) throws IOException;
+
+    /**
+     * Creates a queue for a table.
+     *
+     * @param table The table.
+     * @throws IOException Creation failed.
+     */
+    void create(Table table) throws IOException;
+
+    /**
+     * Creates a consumer for a table.
+     *
+     * @param table The table.
+     * @throws IOException Creation failed.
+     */
+    void createUserConsumer(Table table) throws IOException;
+}
diff --git a/fda-table-service/services/src/main/java/at/tuwien/service/TableService.java b/fda-table-service/services/src/main/java/at/tuwien/service/TableService.java
index b56651ccbe..35d213faef 100644
--- a/fda-table-service/services/src/main/java/at/tuwien/service/TableService.java
+++ b/fda-table-service/services/src/main/java/at/tuwien/service/TableService.java
@@ -1,189 +1,65 @@
 package at.tuwien.service;
 
 import at.tuwien.api.database.table.TableCreateDto;
-import at.tuwien.api.database.table.TableCsvDto;
-import at.tuwien.api.database.table.TableInsertDto;
-import at.tuwien.entities.database.Database;
 import at.tuwien.entities.database.table.Table;
-import at.tuwien.entities.database.table.columns.TableColumn;
 import at.tuwien.exception.*;
-import at.tuwien.mapper.AmqpMapper;
-import at.tuwien.mapper.ImageMapper;
-import at.tuwien.mapper.QueryMapper;
-import at.tuwien.mapper.TableMapper;
-import at.tuwien.repository.jpa.DatabaseRepository;
-import at.tuwien.repository.jpa.TableRepository;
-import com.opencsv.CSVParser;
-import com.opencsv.CSVParserBuilder;
-import com.opencsv.CSVReader;
-import com.opencsv.CSVReaderBuilder;
-import com.opencsv.exceptions.CsvException;
-import lombok.extern.log4j.Log4j2;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.web.multipart.MultipartFile;
 
-import javax.persistence.EntityNotFoundException;
-import java.io.*;
-import java.sql.SQLException;
-import java.util.*;
+import java.util.List;
 
-@Log4j2
-@Service
-public class TableService extends JdbcConnector {
+public interface TableService {
 
-    private final DatabaseRepository databaseRepository;
-    private final TableRepository tableRepository;
-    private final TableMapper tableMapper;
-    private final AmqpMapper amqpMapper;
+    /**
+     * Select all tables from the metadata database.
+     *
+     * @return The list of tables.
+     */
+    List<Table> findAll();
 
-    @Autowired
-    public TableService(TableRepository tableRepository, DatabaseRepository databaseRepository,
-                        ImageMapper imageMapper, TableMapper tableMapper, QueryMapper queryMapper,
-                        AmqpMapper amqpMapper) {
-        super(imageMapper, tableMapper, queryMapper);
-        this.tableRepository = tableRepository;
-        this.databaseRepository = databaseRepository;
-        this.tableMapper = tableMapper;
-        this.amqpMapper = amqpMapper;
-    }
+    /**
+     * Find all tables for a given database id.
+     *
+     * @param databaseId The database id.
+     * @return Return a list of all tables for this database id.
+     * @throws DatabaseNotFoundException The database was not found in the metadata database.
+     */
+    List<Table> findAllForDatabaseId(Long databaseId) throws DatabaseNotFoundException;
 
-    @Transactional
-    public List<Table> findAll() {
-        return tableRepository.findAll();
-    }
-
-    @Transactional
-    public List<Table> findAllForDatabaseId(Long databaseId) throws DatabaseNotFoundException {
-        final Optional<Database> database;
-        try {
-            database = databaseRepository.findById(databaseId);
-        } catch (EntityNotFoundException e) {
-            log.error("Unable to find database {}", databaseId);
-            throw new DatabaseNotFoundException("Unable to find database.");
-        }
-        if (database.isEmpty()) {
-            log.error("Unable to find database {}", databaseId);
-            throw new DatabaseNotFoundException("Unable to find database.");
-        }
-        return tableRepository.findByDatabase(database.get());
-    }
-
-    @Transactional
-    public void deleteTable(Long databaseId, Long tableId) throws TableNotFoundException, DatabaseNotFoundException,
-            ImageNotSupportedException, DataProcessingException {
-        final Table table = findById(databaseId, tableId);
-        try {
-            delete(table);
-        } catch (SQLException e) {
-            log.error("Could not delete database: {}", e.getMessage());
-            throw new DataProcessingException("could not delete table", e);
-        }
-        tableRepository.delete(table);
-        log.info("Deleted table {}", table.getId());
-        log.debug("Deleted table {}", table);
-    }
-
-    @Transactional
-    public Table findById(Long databaseId, Long tableId) throws TableNotFoundException, DatabaseNotFoundException {
-        final Optional<Table> table = tableRepository.findByDatabaseAndId(findDatabase(databaseId), tableId);
-        if (table.isEmpty()) {
-            log.error("Table {} not found in database {}", tableId, databaseId);
-            throw new TableNotFoundException("table not found in database");
-        }
-        return table.get();
-    }
-
-    @Transactional
-    public Table createTable(Long databaseId, TableCreateDto createDto) throws ImageNotSupportedException,
-            DatabaseNotFoundException, DataProcessingException, ArbitraryPrimaryKeysException, TableMalformedException {
-        log.trace("create table in db {} with request {}", databaseId, createDto);
-        final Database database = findDatabase(databaseId);
-        /* create database in container */
-        try {
-            create(database, createDto);
-        } catch (SQLException e) {
-            log.error("Could not create table via JDBC: {}", e.getMessage());
-            throw new DataProcessingException("could not create table", e);
-        }
-        /* save in metadata db */
-        final Table mappedTable = tableMapper.tableCreateDtoToTable(createDto);
-        mappedTable.setDatabase(database);
-        mappedTable.setTdbid(databaseId);
-        mappedTable.setColumns(List.of()); // TODO: our metadata db model (primary keys x3) does not allow this currently
-        mappedTable.setTopic(amqpMapper.queueName(mappedTable));
-        final Table table;
-        try {
-            table = tableRepository.save(mappedTable);
-        } catch (EntityNotFoundException e) {
-            log.error("Could not create table compound key: {}", e.getMessage());
-            throw new DataProcessingException("failed to create table compound key", e);
-        }
-        /* we cannot insert columns at the same time since they depend on the table id */
-        for (int i = 0; i < createDto.getColumns().length; i++) {
-            final TableColumn column = tableMapper.columnCreateDtoToTableColumn(createDto.getColumns()[i]);
-            column.setOrdinalPosition(i);
-            column.setCdbid(databaseId);
-            column.setTid(table.getId());
-            table.getColumns()
-                    .add(column);
-        }
-        /* update table in metadata db */
-        final Table out;
-        try {
-            out = tableRepository.save(table);
-        } catch (EntityNotFoundException e) {
-            log.error("Could not create column compound key: {}", e.getMessage());
-            throw new DataProcessingException("failed to create column compound key", e);
-        }
-        log.info("Created table {}", out.getId());
-        log.debug("created table: {}", out);
-        return out;
-    }
-
-    /* helper functions */
-
-    public Database findDatabase(Long id) throws DatabaseNotFoundException {
-        final Optional<Database> database = databaseRepository.findById(id);
-        if (database.isEmpty()) {
-            log.error("Could not find database with id {} in metadata database", id);
-            throw new DatabaseNotFoundException("database not found in metadata database");
-        }
-        return database.get();
-    }
-
-    public TableCsvDto readCsv(Table table, TableInsertDto data, MultipartFile file) throws IOException, CsvException,
-            ArrayIndexOutOfBoundsException {
-        final CSVParser csvParser = new CSVParserBuilder()
-                .withSeparator(data.getDelimiter())
-                .build();
-        final Reader fileReader = new InputStreamReader(file.getInputStream());
-        final List<List<String>> cells = new LinkedList<>();
-        final CSVReader reader = new CSVReaderBuilder(fileReader)
-                .withCSVParser(csvParser)
-                .withSkipLines(data.getSkipHeader() ? 1 : 0)
-                .build();
-        final List<Map<String, Object>> records = new LinkedList<>();
-        reader.readAll()
-                .forEach(x -> cells.add(Arrays.asList(x)));
-        /* map to the map-list structure */
-        for (List<String> row : cells) {
-            final Map<String, Object> record = new HashMap<>();
-            for (int i = 0; i < table.getColumns().size(); i++) {
-                record.put(table.getColumns().get(i).getInternalName(), row.get(i));
-            }
-            /* when the nullElement itself is null, nothing to do */
-            if (data.getNullElement() != null) {
-                record.replaceAll((key, value) -> value.equals(data.getNullElement()) ? null : value);
-            }
-            log.trace("processed {}", row);
-            records.add(record);
-        }
-        return TableCsvDto.builder()
-                .data(records)
-                .build();
-    }
+    /**
+     * Deletes a table for a fiven database-table id pair.
+     *
+     * @param databaseId The database-table id pair.
+     * @param tableId    The database-table id pair.
+     * @throws TableNotFoundException     The table was not found in the metadata database.
+     * @throws DatabaseNotFoundException  The database was not found in the metadata database.
+     * @throws ImageNotSupportedException The image is not supported.
+     * @throws DataProcessingException    The deletion did not work.
+     */
+    void deleteTable(Long databaseId, Long tableId) throws TableNotFoundException, DatabaseNotFoundException,
+            ImageNotSupportedException, DataProcessingException;
 
+    /**
+     * Find a table by database-table id pair
+     *
+     * @param databaseId The database-table id pair.
+     * @param tableId    The database-table id pair.
+     * @return The table.
+     * @throws TableNotFoundException    The table was not found in the metadata database.
+     * @throws DatabaseNotFoundException The database was not found in the metadata database.
+     */
+    Table findById(Long databaseId, Long tableId) throws TableNotFoundException, DatabaseNotFoundException;
 
+    /**
+     * Creates a table for a database id with given schema as data
+     *
+     * @param databaseId The database id.
+     * @param createDto  The schema (as data)
+     * @return The created table.
+     * @throws ImageNotSupportedException    The image is not supported.
+     * @throws DatabaseNotFoundException     The database was not found in the metadata database.
+     * @throws DataProcessingException       The remote database engine resulted in some error.
+     * @throws ArbitraryPrimaryKeysException The primary keys are configured wrong.
+     * @throws TableMalformedException       The table seems malformed by the mapper.
+     */
+    Table createTable(Long databaseId, TableCreateDto createDto) throws ImageNotSupportedException,
+            DatabaseNotFoundException, DataProcessingException, ArbitraryPrimaryKeysException, TableMalformedException;
 }
diff --git a/fda-table-service/services/src/main/java/at/tuwien/service/JdbcConnector.java b/fda-table-service/services/src/main/java/at/tuwien/service/impl/JdbcConnector.java
similarity index 98%
rename from fda-table-service/services/src/main/java/at/tuwien/service/JdbcConnector.java
rename to fda-table-service/services/src/main/java/at/tuwien/service/impl/JdbcConnector.java
index 28e2deee39..49a1a77a81 100644
--- a/fda-table-service/services/src/main/java/at/tuwien/service/JdbcConnector.java
+++ b/fda-table-service/services/src/main/java/at/tuwien/service/impl/JdbcConnector.java
@@ -1,4 +1,4 @@
-package at.tuwien.service;
+package at.tuwien.service.impl;
 
 import at.tuwien.api.database.query.QueryResultDto;
 import at.tuwien.api.database.table.TableCreateDto;
@@ -11,6 +11,7 @@ import at.tuwien.exception.TableMalformedException;
 import at.tuwien.mapper.ImageMapper;
 import at.tuwien.mapper.QueryMapper;
 import at.tuwien.mapper.TableMapper;
+import at.tuwien.service.DatabaseConnector;
 import lombok.extern.log4j.Log4j2;
 import org.jooq.*;
 import org.jooq.Record;
diff --git a/fda-table-service/services/src/main/java/at/tuwien/service/MariaDataService.java b/fda-table-service/services/src/main/java/at/tuwien/service/impl/MariaDataService.java
similarity index 99%
rename from fda-table-service/services/src/main/java/at/tuwien/service/MariaDataService.java
rename to fda-table-service/services/src/main/java/at/tuwien/service/impl/MariaDataService.java
index 90ffdc1fd8..d4ce949a15 100644
--- a/fda-table-service/services/src/main/java/at/tuwien/service/MariaDataService.java
+++ b/fda-table-service/services/src/main/java/at/tuwien/service/impl/MariaDataService.java
@@ -1,4 +1,4 @@
-package at.tuwien.service;
+package at.tuwien.service.impl;
 
 import at.tuwien.api.database.query.QueryResultDto;
 import at.tuwien.api.database.table.TableCsvDto;
@@ -11,6 +11,7 @@ import at.tuwien.mapper.QueryMapper;
 import at.tuwien.mapper.TableMapper;
 import at.tuwien.repository.jpa.DatabaseRepository;
 import at.tuwien.repository.jpa.TableRepository;
+import at.tuwien.service.DataService;
 import com.opencsv.CSVParser;
 import com.opencsv.CSVParserBuilder;
 import com.opencsv.CSVReader;
diff --git a/fda-table-service/services/src/main/java/at/tuwien/service/AmqpService.java b/fda-table-service/services/src/main/java/at/tuwien/service/impl/RabbitMqService.java
similarity index 76%
rename from fda-table-service/services/src/main/java/at/tuwien/service/AmqpService.java
rename to fda-table-service/services/src/main/java/at/tuwien/service/impl/RabbitMqService.java
index 0fbf1f9737..07bfc00418 100644
--- a/fda-table-service/services/src/main/java/at/tuwien/service/AmqpService.java
+++ b/fda-table-service/services/src/main/java/at/tuwien/service/impl/RabbitMqService.java
@@ -1,4 +1,4 @@
-package at.tuwien.service;
+package at.tuwien.service.impl;
 
 import at.tuwien.api.database.table.TableCsvDto;
 import at.tuwien.entities.database.Database;
@@ -7,6 +7,8 @@ import at.tuwien.exception.AmqpException;
 import at.tuwien.exception.ImageNotSupportedException;
 import at.tuwien.exception.TableMalformedException;
 import at.tuwien.repository.jpa.TableRepository;
+import at.tuwien.service.MessageQueueService;
+import at.tuwien.service.impl.MariaDataService;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.exc.MismatchedInputException;
@@ -27,7 +29,7 @@ import java.util.List;
 
 @Log4j2
 @Service
-public class AmqpService {
+public class RabbitMqService implements MessageQueueService {
 
     private static final String AMQP_EXCHANGE = "fda";
 
@@ -37,8 +39,8 @@ public class AmqpService {
     private final TableRepository tableRepository;
 
     @Autowired
-    public AmqpService(Channel channel, MariaDataService dataService, ObjectMapper objectMapper,
-                       TableRepository tableRepository) {
+    public RabbitMqService(Channel channel, MariaDataService dataService, ObjectMapper objectMapper,
+                           TableRepository tableRepository) {
         this.channel = channel;
         this.dataService = dataService;
         this.objectMapper = objectMapper;
@@ -50,6 +52,7 @@ public class AmqpService {
      *
      * @throws IOException Exchange or queue was not declarable.
      */
+    @Override
     @EventListener(ApplicationReadyEvent.class)
     @Transactional
     public void init() throws IOException {
@@ -61,6 +64,7 @@ public class AmqpService {
         }
     }
 
+    @Override
     public void createQueue(Table table) throws AmqpException {
         try {
             create(table);
@@ -72,47 +76,27 @@ public class AmqpService {
         }
     }
 
-    public void createExchange(Database database) throws AmqpException {
-        try {
-            create(database);
-            log.info("Created exchange {}", database.getExchange());
-        } catch (IOException e) {
-            log.error("Could not create exchange and consumer: {}", e.getMessage());
-            throw new AmqpException("Could not create exchange and consumer", e);
-        }
-    }
-
+    @Override
     @Transactional
-    protected void create(Database database) throws IOException {
+    public void create(Database database) throws IOException {
         channel.exchangeDeclare(database.getExchange(), BuiltinExchangeType.FANOUT, true);
         log.debug("declare fanout exchange {}", database.getExchange());
         channel.exchangeBind(database.getExchange(), AMQP_EXCHANGE, database.getExchange());
         log.debug("bind exchange {} to {}", database.getExchange(), AMQP_EXCHANGE);
     }
 
+    @Override
     @Transactional
-    protected void create(Table table) throws IOException {
+    public void create(Table table) throws IOException {
         channel.queueDeclare(table.getTopic(), true, false, false, null);
         log.debug("declare queue {}", table.getTopic());
         channel.queueBind(table.getTopic(), table.getDatabase().getExchange(), table.getTopic());
         log.debug("bind queue {} to {}", table.getTopic(), table.getDatabase().getExchange());
     }
 
-    private void delete(Database database) throws IOException {
-        channel.exchangeDelete(database.getExchange());
-        log.debug("delete exchange {}", database.getExchange());
-        for (Table table : database.getTables()) {
-            delete(table);
-        }
-    }
-
-    private void delete(Table table) throws IOException {
-        channel.queueDelete(table.getTopic());
-        log.debug("delete queue {}", table.getTopic());
-    }
-
+    @Override
     @Transactional
-    protected void createUserConsumer(Table table) throws IOException {
+    public void createUserConsumer(Table table) throws IOException {
         channel.basicConsume(table.getTopic(), true, (consumerTag, response) -> {
             try {
                 dataService.insertCsv(table, objectMapper.readValue(response.getBody(), TableCsvDto.class));
diff --git a/fda-table-service/services/src/main/java/at/tuwien/service/impl/TableServiceImpl.java b/fda-table-service/services/src/main/java/at/tuwien/service/impl/TableServiceImpl.java
new file mode 100644
index 0000000000..53572592bd
--- /dev/null
+++ b/fda-table-service/services/src/main/java/at/tuwien/service/impl/TableServiceImpl.java
@@ -0,0 +1,194 @@
+package at.tuwien.service.impl;
+
+import at.tuwien.api.database.table.TableCreateDto;
+import at.tuwien.api.database.table.TableCsvDto;
+import at.tuwien.api.database.table.TableInsertDto;
+import at.tuwien.entities.database.Database;
+import at.tuwien.entities.database.table.Table;
+import at.tuwien.entities.database.table.columns.TableColumn;
+import at.tuwien.exception.*;
+import at.tuwien.mapper.AmqpMapper;
+import at.tuwien.mapper.ImageMapper;
+import at.tuwien.mapper.QueryMapper;
+import at.tuwien.mapper.TableMapper;
+import at.tuwien.repository.jpa.DatabaseRepository;
+import at.tuwien.repository.jpa.TableRepository;
+import at.tuwien.service.TableService;
+import at.tuwien.service.impl.JdbcConnector;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import com.opencsv.CSVReader;
+import com.opencsv.CSVReaderBuilder;
+import com.opencsv.exceptions.CsvException;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.web.multipart.MultipartFile;
+
+import javax.persistence.EntityNotFoundException;
+import java.io.*;
+import java.sql.SQLException;
+import java.util.*;
+
+@Log4j2
+@Service
+public class TableServiceImpl extends JdbcConnector implements TableService {
+
+    private final DatabaseRepository databaseRepository;
+    private final TableRepository tableRepository;
+    private final TableMapper tableMapper;
+    private final AmqpMapper amqpMapper;
+
+    @Autowired
+    public TableServiceImpl(TableRepository tableRepository, DatabaseRepository databaseRepository,
+                            ImageMapper imageMapper, TableMapper tableMapper, QueryMapper queryMapper,
+                            AmqpMapper amqpMapper) {
+        super(imageMapper, tableMapper, queryMapper);
+        this.tableRepository = tableRepository;
+        this.databaseRepository = databaseRepository;
+        this.tableMapper = tableMapper;
+        this.amqpMapper = amqpMapper;
+    }
+
+    @Override
+    @Transactional
+    public List<Table> findAll() {
+        return tableRepository.findAll();
+    }
+
+    @Override
+    @Transactional
+    public List<Table> findAllForDatabaseId(Long databaseId) throws DatabaseNotFoundException {
+        final Optional<Database> database;
+        try {
+            database = databaseRepository.findById(databaseId);
+        } catch (EntityNotFoundException e) {
+            log.error("Unable to find database {}", databaseId);
+            throw new DatabaseNotFoundException("Unable to find database.");
+        }
+        if (database.isEmpty()) {
+            log.error("Unable to find database {}", databaseId);
+            throw new DatabaseNotFoundException("Unable to find database.");
+        }
+        return tableRepository.findByDatabase(database.get());
+    }
+
+    @Override
+    @Transactional
+    public void deleteTable(Long databaseId, Long tableId) throws TableNotFoundException, DatabaseNotFoundException,
+            ImageNotSupportedException, DataProcessingException {
+        final Table table = findById(databaseId, tableId);
+        try {
+            delete(table);
+        } catch (SQLException e) {
+            log.error("Could not delete database: {}", e.getMessage());
+            throw new DataProcessingException("could not delete table", e);
+        }
+        tableRepository.delete(table);
+        log.info("Deleted table {}", table.getId());
+        log.debug("Deleted table {}", table);
+    }
+
+    @Override
+    @Transactional
+    public Table findById(Long databaseId, Long tableId) throws TableNotFoundException, DatabaseNotFoundException {
+        final Optional<Table> table = tableRepository.findByDatabaseAndId(findDatabase(databaseId), tableId);
+        if (table.isEmpty()) {
+            log.error("Table {} not found in database {}", tableId, databaseId);
+            throw new TableNotFoundException("table not found in database");
+        }
+        return table.get();
+    }
+
+    @Override
+    @Transactional
+    public Table createTable(Long databaseId, TableCreateDto createDto) throws ImageNotSupportedException,
+            DatabaseNotFoundException, DataProcessingException, ArbitraryPrimaryKeysException, TableMalformedException {
+        log.trace("create table in db {} with request {}", databaseId, createDto);
+        final Database database = findDatabase(databaseId);
+        /* create database in container */
+        try {
+            create(database, createDto);
+        } catch (SQLException e) {
+            log.error("Could not create table via JDBC: {}", e.getMessage());
+            throw new DataProcessingException("could not create table", e);
+        }
+        /* save in metadata db */
+        final Table mappedTable = tableMapper.tableCreateDtoToTable(createDto);
+        mappedTable.setDatabase(database);
+        mappedTable.setTdbid(databaseId);
+        mappedTable.setColumns(List.of()); // TODO: our metadata db model (primary keys x3) does not allow this currently
+        mappedTable.setTopic(amqpMapper.queueName(mappedTable));
+        final Table table;
+        try {
+            table = tableRepository.save(mappedTable);
+        } catch (EntityNotFoundException e) {
+            log.error("Could not create table compound key: {}", e.getMessage());
+            throw new DataProcessingException("failed to create table compound key", e);
+        }
+        /* we cannot insert columns at the same time since they depend on the table id */
+        for (int i = 0; i < createDto.getColumns().length; i++) {
+            final TableColumn column = tableMapper.columnCreateDtoToTableColumn(createDto.getColumns()[i]);
+            column.setOrdinalPosition(i);
+            column.setCdbid(databaseId);
+            column.setTid(table.getId());
+            table.getColumns()
+                    .add(column);
+        }
+        /* update table in metadata db */
+        final Table out;
+        try {
+            out = tableRepository.save(table);
+        } catch (EntityNotFoundException e) {
+            log.error("Could not create column compound key: {}", e.getMessage());
+            throw new DataProcessingException("failed to create column compound key", e);
+        }
+        log.info("Created table {}", out.getId());
+        log.debug("created table: {}", out);
+        return out;
+    }
+
+    protected Database findDatabase(Long id) throws DatabaseNotFoundException {
+        final Optional<Database> database = databaseRepository.findById(id);
+        if (database.isEmpty()) {
+            log.error("Could not find database with id {} in metadata database", id);
+            throw new DatabaseNotFoundException("database not found in metadata database");
+        }
+        return database.get();
+    }
+
+    public TableCsvDto readCsv(Table table, TableInsertDto data, MultipartFile file) throws IOException, CsvException,
+            ArrayIndexOutOfBoundsException {
+        final CSVParser csvParser = new CSVParserBuilder()
+                .withSeparator(data.getDelimiter())
+                .build();
+        final Reader fileReader = new InputStreamReader(file.getInputStream());
+        final List<List<String>> cells = new LinkedList<>();
+        final CSVReader reader = new CSVReaderBuilder(fileReader)
+                .withCSVParser(csvParser)
+                .withSkipLines(data.getSkipHeader() ? 1 : 0)
+                .build();
+        final List<Map<String, Object>> records = new LinkedList<>();
+        reader.readAll()
+                .forEach(x -> cells.add(Arrays.asList(x)));
+        /* map to the map-list structure */
+        for (List<String> row : cells) {
+            final Map<String, Object> record = new HashMap<>();
+            for (int i = 0; i < table.getColumns().size(); i++) {
+                record.put(table.getColumns().get(i).getInternalName(), row.get(i));
+            }
+            /* when the nullElement itself is null, nothing to do */
+            if (data.getNullElement() != null) {
+                record.replaceAll((key, value) -> value.equals(data.getNullElement()) ? null : value);
+            }
+            log.trace("processed {}", row);
+            records.add(record);
+        }
+        return TableCsvDto.builder()
+                .data(records)
+                .build();
+    }
+
+
+}
-- 
GitLab