diff --git a/fda-table-service/rest-service/src/main/java/at/tuwien/endpoints/DataEndpoint.java b/fda-table-service/rest-service/src/main/java/at/tuwien/endpoints/DataEndpoint.java index 738943aee0ae88c32eac9955e9a5c5d601801330..6364a38b04a1d333feb2fc09547ef0a7c0dc6c9e 100644 --- a/fda-table-service/rest-service/src/main/java/at/tuwien/endpoints/DataEndpoint.java +++ b/fda-table-service/rest-service/src/main/java/at/tuwien/endpoints/DataEndpoint.java @@ -1,13 +1,11 @@ package at.tuwien.endpoints; -import at.tuwien.api.amqp.TupleDto; import at.tuwien.api.database.query.QueryResultDto; import at.tuwien.api.database.table.TableCsvDto; import at.tuwien.api.database.table.TableInsertDto; -import at.tuwien.entities.database.Database; import at.tuwien.entities.database.table.Table; import at.tuwien.exception.*; -import at.tuwien.service.DataService; +import at.tuwien.service.MariaDataService; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; @@ -26,10 +24,10 @@ import java.sql.Timestamp; @RequestMapping("/api/database/{id}/table/{tableId}/data") public class DataEndpoint { - private final DataService dataService; + private final MariaDataService dataService; @Autowired - public DataEndpoint(DataService dataService) { + public DataEndpoint(MariaDataService dataService) { this.dataService = dataService; } diff --git a/fda-table-service/rest-service/src/test/java/at/tuwien/endpoint/DataEndpointIntegrationTest.java b/fda-table-service/rest-service/src/test/java/at/tuwien/endpoint/DataEndpointIntegrationTest.java index 168c3d57751c9dc0f634bec0ffc126a80e561ef6..f0ef7ead8a90e3ed92065e0b87f8f1edb6c4527e 100644 --- a/fda-table-service/rest-service/src/test/java/at/tuwien/endpoint/DataEndpointIntegrationTest.java +++ b/fda-table-service/rest-service/src/test/java/at/tuwien/endpoint/DataEndpointIntegrationTest.java @@ -4,13 +4,11 @@ import at.tuwien.BaseUnitTest; import at.tuwien.api.database.table.*; import at.tuwien.config.ReadyConfig; import at.tuwien.endpoints.DataEndpoint; -import at.tuwien.endpoints.TableEndpoint; import at.tuwien.exception.*; import at.tuwien.repository.jpa.DatabaseRepository; import at.tuwien.repository.jpa.ImageRepository; import at.tuwien.repository.jpa.TableRepository; -import at.tuwien.service.DataService; -import at.tuwien.service.JdbcConnector; +import at.tuwien.service.MariaDataService; import at.tuwien.service.TableService; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.command.CreateContainerResponse; @@ -21,7 +19,6 @@ import com.rabbitmq.client.Channel; import lombok.extern.log4j.Log4j2; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; @@ -33,14 +30,12 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringExtension; import javax.transaction.Transactional; -import javax.ws.rs.core.Response; import java.time.Instant; import java.util.*; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; @Log4j2 @DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD) @@ -64,7 +59,7 @@ public class DataEndpointIntegrationTest extends BaseUnitTest { private ImageRepository imageRepository; @Autowired - private DataService dataService; + private MariaDataService dataService; @Autowired private TableService tableService; @@ -137,16 +132,22 @@ public class DataEndpointIntegrationTest extends BaseUnitTest { } @Test - public void insertFromTuple_succeeds() { + public void insertFromTuple_succeeds() throws TableNotFoundException, TableMalformedException, + DatabaseNotFoundException, ImageNotSupportedException { + final Map<String, Object> map = new LinkedHashMap<>() {{ + put(COLUMN_1_NAME, 1L); + put(COLUMN_2_NAME, Instant.now()); + put(COLUMN_3_NAME, 35.2); + put(COLUMN_4_NAME, "Sydney"); + put(COLUMN_5_NAME, 10.2); + }}; final TableCsvDto request = TableCsvDto.builder() - .data(List.of(Map.of(COLUMN_1_NAME, 1L, COLUMN_2_NAME, Instant.now(), COLUMN_3_NAME, 35.2, - COLUMN_4_NAME, "Sydney", COLUMN_5_NAME, 10.2))) + .data(List.of(map)) .build(); /* test */ - assertThrows(TableMalformedException.class, () -> { - dataEndpoint.insertFromTuple(DATABASE_1_ID, TABLE_1_ID, request); - }); + final ResponseEntity<?> response = dataEndpoint.insertFromTuple(DATABASE_1_ID, TABLE_1_ID, request); + assertEquals(HttpStatus.OK, response.getStatusCode()); } @Test diff --git a/fda-table-service/rest-service/src/test/java/at/tuwien/service/DataServiceIntegrationTest.java b/fda-table-service/rest-service/src/test/java/at/tuwien/service/DataServiceIntegrationTest.java index a4df04118f07c8fc7989eaf8df0d7a1506b0e178..4b6f99e1254764e7a602d8e5eba24f1ba4f3af80 100644 --- a/fda-table-service/rest-service/src/test/java/at/tuwien/service/DataServiceIntegrationTest.java +++ b/fda-table-service/rest-service/src/test/java/at/tuwien/service/DataServiceIntegrationTest.java @@ -69,7 +69,7 @@ public class DataServiceIntegrationTest extends BaseUnitTest { private TableService tableService; @Autowired - private DataService dataService; + private MariaDataService dataService; private CreateContainerResponse request1, request2; diff --git a/fda-table-service/services/src/main/java/at/tuwien/service/AmqpService.java b/fda-table-service/services/src/main/java/at/tuwien/service/AmqpService.java index 911b7611dd74526c4669a59a84d67f537eb9c9d4..0fbf1f9737f03e80ef87d11d80f9e670a046dfe7 100644 --- a/fda-table-service/services/src/main/java/at/tuwien/service/AmqpService.java +++ b/fda-table-service/services/src/main/java/at/tuwien/service/AmqpService.java @@ -32,12 +32,12 @@ public class AmqpService { private static final String AMQP_EXCHANGE = "fda"; private final Channel channel; - private final DataService dataService; + private final MariaDataService dataService; private final ObjectMapper objectMapper; private final TableRepository tableRepository; @Autowired - public AmqpService(Channel channel, DataService dataService, ObjectMapper objectMapper, + public AmqpService(Channel channel, MariaDataService dataService, ObjectMapper objectMapper, TableRepository tableRepository) { this.channel = channel; this.dataService = dataService; diff --git a/fda-table-service/services/src/main/java/at/tuwien/service/DataService.java b/fda-table-service/services/src/main/java/at/tuwien/service/DataService.java index d376e0931364af55fdfeaa7abf6f99b5e36275db..a02f086703aadda06059b7697756b1f8008fac55 100644 --- a/fda-table-service/services/src/main/java/at/tuwien/service/DataService.java +++ b/fda-table-service/services/src/main/java/at/tuwien/service/DataService.java @@ -3,63 +3,18 @@ package at.tuwien.service; import at.tuwien.api.database.query.QueryResultDto; import at.tuwien.api.database.table.TableCsvDto; import at.tuwien.api.database.table.TableInsertDto; -import at.tuwien.entities.database.Database; import at.tuwien.entities.database.table.Table; import at.tuwien.exception.*; -import at.tuwien.mapper.ImageMapper; -import at.tuwien.mapper.QueryMapper; -import at.tuwien.mapper.TableMapper; -import at.tuwien.repository.jpa.DatabaseRepository; -import at.tuwien.repository.jpa.TableRepository; -import com.opencsv.CSVParser; -import com.opencsv.CSVParserBuilder; -import com.opencsv.CSVReader; -import com.opencsv.CSVReaderBuilder; import com.opencsv.exceptions.CsvException; -import lombok.extern.log4j.Log4j2; -import org.jooq.exception.DataAccessException; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.mock.web.MockMultipartFile; -import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import org.springframework.web.multipart.MultipartFile; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Reader; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.sql.SQLException; import java.sql.Timestamp; -import java.util.*; -@Log4j2 -@Service -public class DataService extends JdbcConnector { - - private final DatabaseRepository databaseRepository; - private final TableRepository tableRepository; - - @Autowired - public DataService(DatabaseRepository databaseRepository, TableRepository tableRepository, - ImageMapper imageMapper, TableMapper tableMapper, QueryMapper queryMapper) { - super(imageMapper, tableMapper, queryMapper); - this.tableRepository = tableRepository; - this.databaseRepository = databaseRepository; - } - - @Transactional - public Table findById(Long databaseId, Long tableId) throws TableNotFoundException, DatabaseNotFoundException { - final Optional<Table> table = tableRepository.findByDatabaseAndId(findDatabase(databaseId), tableId); - if (table.isEmpty()) { - log.error("table {} not found in database {}", tableId, databaseId); - throw new TableNotFoundException("table not found in database"); - } - return table.get(); - } +public interface DataService { /** - * Insert data from a file into a table of a database with possible null values (denoted by a null element). + * Insert data from a file into a table of a database * * @param databaseId The database. * @param tableId The table. @@ -69,118 +24,34 @@ public class DataService extends JdbcConnector { * @throws DatabaseNotFoundException The database does not exist in the metdata database. * @throws FileStorageException The CSV could not be parsed. */ - @Transactional - public void insertCsv(Long databaseId, Long tableId, TableInsertDto data) + void insertCsv(Long databaseId, Long tableId, TableInsertDto data) throws TableNotFoundException, ImageNotSupportedException, DatabaseNotFoundException, FileStorageException, - TableMalformedException { - final Table table = findById(databaseId, tableId); - final TableCsvDto values; - try { - values = readCsv(table, data); - log.debug("read {} rows from csv", values.getData().size()); - } catch (IOException | CsvException | ArrayIndexOutOfBoundsException e) { - log.error("failed to parse csv {}", e.getMessage()); - throw new FileStorageException("failed to parse csv", e); - } - try { - insertCsv(table, values); - } catch (SQLException | DataAccessException e) { - log.error("could not insert data {}", e.getMessage()); - throw new TableMalformedException("could not insert data", e); - } - log.info("Inserted {} csv records into table id {}", values.getData().size(), tableId); - } - - /* helper functions */ - - @Transactional - public Database findDatabase(Long id) throws DatabaseNotFoundException { - final Optional<Database> database = databaseRepository.findById(id); - if (database.isEmpty()) { - log.error("no database with this id found in metadata database"); - throw new DatabaseNotFoundException("database not found in metadata database"); - } - return database.get(); - } - - public TableCsvDto readCsv(Table table, TableInsertDto data) throws IOException, CsvException, - ArrayIndexOutOfBoundsException { - log.debug("insert into table {} with params {}", table, data); - if (data.getDelimiter() == null) { - data.setDelimiter(','); - } - if (!data.getCsvLocation().startsWith("test:")) { // todo: improve this? - data.setCsvLocation("/tmp/" + data.getCsvLocation()); - } else { - data.setCsvLocation(data.getCsvLocation().substring(5)); - } - final CSVParser csvParser = new CSVParserBuilder() - .withSeparator(data.getDelimiter()) - .build(); - final MultipartFile multipartFile = new MockMultipartFile(data.getCsvLocation(), Files.readAllBytes(Paths.get(data.getCsvLocation()))); - final Reader fileReader = new InputStreamReader(multipartFile.getInputStream()); - final List<List<String>> cells = new LinkedList<>(); - final CSVReader reader = new CSVReaderBuilder(fileReader) - .withCSVParser(csvParser) - .build(); - final List<Map<String, Object>> records = new LinkedList<>(); - List<String> headers = null; - reader.readAll() - .forEach(x -> cells.add(Arrays.asList(x))); - log.trace("csv raw row size {}, cells raw size {}", reader.readAll().size(), cells.size()); - /* get header */ - if (data.getSkipHeader()) { - headers = cells.get(0); - log.debug("got headers {}", headers); - } - /* map to the map-list structure */ - for (int i = (data.getSkipHeader() ? 1 : 0); i < cells.size(); i++) { - final Map<String, Object> record = new HashMap<>(); - final List<String> row = cells.get(i); - for (int j = 0; j < table.getColumns().size(); j++) { - /* detect if order is correct, we depend on the CsvParser library */ - if (headers != null && table.getColumns().get(j).getInternalName().equals(headers.get(j))) { - log.error("header out of sync, actual: {} but expected: {}", headers.get(j), table.getColumns().get(j).getInternalName()); - } - record.put(table.getColumns().get(j).getInternalName(), row.get(j)); - } - /* when the nullElement itself is null, nothing to do */ - if (data.getNullElement() != null) { - record.replaceAll((key, value) -> value.equals(data.getNullElement()) ? null : value); - } - records.add(record); - } - if (headers == null || headers.size() == 0) { - log.warn("No header check possible, possibly csv without header line or skipHeader=false provided"); - } - log.debug("first row is {}", records.size() > 0 ? records.get(0) : null); - return TableCsvDto.builder() - .data(records) - .build(); - } - - public void insert(Table table, TableCsvDto data) throws ImageNotSupportedException, TableMalformedException { - try { - insertCsv(table, data); - } catch (SQLException e) { - log.error("could not insert data {}", e.getMessage()); - throw new TableMalformedException("could not insert data", e); - } - } - - @Transactional - public QueryResultDto selectAll(Long databaseId, Long tableId, Timestamp timestamp, Integer page, Integer size) throws TableNotFoundException, - DatabaseNotFoundException, ImageNotSupportedException, DatabaseConnectionException { - final QueryResultDto queryResult; - try { - queryResult = selectAll(findById(databaseId, tableId), timestamp, page, size); - } catch (SQLException e) { - log.error("Could not find data: {}", e.getMessage()); - throw new DatabaseConnectionException(e); - } - log.trace("found data {}", queryResult); - return queryResult; - } + TableMalformedException; + /** + * Insert data from AMQP client into a table of a database + * + * @param table The table. + * @param data The data. + * @throws ImageNotSupportedException The image is not supported. + * @throws TableMalformedException The table does not exist in the metadata database. + */ + void insert(Table table, TableCsvDto data) throws ImageNotSupportedException, TableMalformedException; + /** + * Select all data known in the database-table id tuple at a given time and return a page of specific size + * + * @param databaseId The database-table id tuple. + * @param tableId The database-table id tuple. + * @param timestamp The given time. + * @param page The page. + * @param size The page size. + * @return The select all data result + * @throws TableNotFoundException The table was not found in the metadata database. + * @throws DatabaseNotFoundException The database was not found in the remote database. + * @throws ImageNotSupportedException The image is not supported. + * @throws DatabaseConnectionException The connection to the remote database was unsuccessful. + */ + QueryResultDto selectAll(Long databaseId, Long tableId, Timestamp timestamp, Integer page, Integer size) throws TableNotFoundException, + DatabaseNotFoundException, ImageNotSupportedException, DatabaseConnectionException; } diff --git a/fda-table-service/services/src/main/java/at/tuwien/service/DatabaseConnector.java b/fda-table-service/services/src/main/java/at/tuwien/service/DatabaseConnector.java new file mode 100644 index 0000000000000000000000000000000000000000..ff3e2da84cb607a40a87dfce34160c26f92cdb90 --- /dev/null +++ b/fda-table-service/services/src/main/java/at/tuwien/service/DatabaseConnector.java @@ -0,0 +1,60 @@ +package at.tuwien.service; + +import at.tuwien.api.database.query.QueryResultDto; +import at.tuwien.api.database.table.TableCreateDto; +import at.tuwien.api.database.table.TableCsvDto; +import at.tuwien.entities.database.Database; +import at.tuwien.entities.database.table.Table; +import at.tuwien.exception.ArbitraryPrimaryKeysException; +import at.tuwien.exception.ImageNotSupportedException; +import at.tuwien.exception.TableMalformedException; +import org.jooq.DSLContext; +import org.springframework.transaction.annotation.Transactional; + +import java.sql.SQLException; +import java.sql.Timestamp; + +public interface DatabaseConnector { + /** + * Open a new database connection for a database + * + * @param database The database + * @return The database connection + * @throws SQLException Error in the Syntax + * @throws ImageNotSupportedException The credentials for the image seems not as expected (username, password) + */ + DSLContext open(Database database) throws SQLException, ImageNotSupportedException; + + /** + * Create a new database from a entity and create information + * + * @param database The entity + * @param createDto The create information + * @throws SQLException Error in the Syntax + * @throws ArbitraryPrimaryKeysException The primary keys provided are not supported. + * @throws ImageNotSupportedException The image is not supported. + * @throws TableMalformedException The resulting table by the mapper is malformed. + */ + void create(Database database, TableCreateDto createDto) throws SQLException, + ArbitraryPrimaryKeysException, ImageNotSupportedException, TableMalformedException; + + /** + * Insert data inside a csv document into a table + * + * @param table The table + * @param data The csv document + * @throws SQLException Error in the syntax. + * @throws ImageNotSupportedException The image is not supported. + * @throws TableMalformedException The resulting table by the mapper is malformed. + */ + void insertCsv(Table table, TableCsvDto data) throws SQLException, ImageNotSupportedException, TableMalformedException; + + /** + * Delete a table + * + * @param table The table. + * @throws SQLException Error in the syntax. + * @throws ImageNotSupportedException The image is not supported. + */ + void delete(Table table) throws SQLException, ImageNotSupportedException; +} diff --git a/fda-table-service/services/src/main/java/at/tuwien/service/JdbcConnector.java b/fda-table-service/services/src/main/java/at/tuwien/service/JdbcConnector.java index 592a3d201547f323d5e732a1cc85eed52877873f..28e2deee391ce824cf1b174c71e269853549a902 100644 --- a/fda-table-service/services/src/main/java/at/tuwien/service/JdbcConnector.java +++ b/fda-table-service/services/src/main/java/at/tuwien/service/JdbcConnector.java @@ -1,6 +1,5 @@ package at.tuwien.service; -import at.tuwien.api.amqp.TupleDto; import at.tuwien.api.database.query.QueryResultDto; import at.tuwien.api.database.table.TableCreateDto; import at.tuwien.api.database.table.TableCsvDto; @@ -15,11 +14,11 @@ import at.tuwien.mapper.TableMapper; import lombok.extern.log4j.Log4j2; import org.jooq.*; import org.jooq.Record; +import org.jooq.exception.DataAccessException; import org.jooq.impl.DSL; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.annotation.Transactional; -import java.math.BigInteger; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -30,7 +29,7 @@ import java.util.stream.Collectors; import static org.jooq.impl.DSL.*; @Log4j2 -public abstract class JdbcConnector { +public abstract class JdbcConnector implements DatabaseConnector { private final ImageMapper imageMapper; private final TableMapper tableMapper; @@ -43,31 +42,34 @@ public abstract class JdbcConnector { this.queryMapper = queryMapper; } - protected DSLContext open(Database database) throws SQLException, ImageNotSupportedException { + @Override + public DSLContext open(Database database) throws SQLException, ImageNotSupportedException { final String url = "jdbc:" + database.getContainer().getImage().getJdbcMethod() + "://" + database.getContainer().getInternalName() + "/" + database.getInternalName(); log.info("Attempt to connect to '{}'", url); final Connection connection = DriverManager.getConnection(url, imageMapper.containerImageToProperties(database.getContainer().getImage())); return DSL.using(connection, SQLDialect.valueOf(database.getContainer().getImage().getDialect())); } - protected void create(Database database, TableCreateDto createDto) throws SQLException, + @Override + public void create(Database database, TableCreateDto createDto) throws SQLException, ArbitraryPrimaryKeysException, ImageNotSupportedException, TableMalformedException { final DSLContext context = open(database); CreateTableColumnStep createTableColumnStep = tableMapper.tableCreateDtoToCreateTableColumnStep(context, createDto); log.debug("Before insertion: {} ", createTableColumnStep.getSQL()); /* add versioning for mariadb databases */ - if(database.getContainer().getImage().getDialect().equals("MARIADB")) { + if (database.getContainer().getImage().getDialect().equals("MARIADB")) { String sql = createTableColumnStep.getSQL(); sql = sql + "WITH SYSTEM VERSIONING;"; - log.debug("With versioning {} ",sql); + log.debug("With versioning {} ", sql); context.fetch(sql); } else { createTableColumnStep.execute(); } } + @Override @Transactional - protected void insertCsv(Table table, TableCsvDto data) throws SQLException, ImageNotSupportedException, TableMalformedException { + public void insertCsv(Table table, TableCsvDto data) throws SQLException, ImageNotSupportedException, TableMalformedException { if (data.getData().size() == 0 || (data.getData().size() == 1 && data.getData().get(0).size() == 0)) { log.warn("No data provided."); throw new TableMalformedException("No data provided"); @@ -77,54 +79,26 @@ public abstract class JdbcConnector { throw new TableMalformedException("Provided columns differ from table columns found in metadata db."); } final List<Field<?>> headers = tableMapper.tableToFieldList(table); - log.trace("first row received {}", data.getData().size() > 0 ? data.getData().get(0) : null); + log.trace("-> headers received {}", headers.stream().map(Field::getName).collect(Collectors.toList())); + log.trace("-> first row received {}", data.getData().size() > 0 ? data.getData().get(0) : null); final DSLContext context = open(table.getDatabase()); final List<InsertValuesStepN<Record>> statements = new LinkedList<>(); for (List<Object> row : tableMapper.tableCsvDtoToObjectListList(data)) { statements.add(context.insertInto(table(table.getInternalName()), headers) .values(row)); } - context.batch(statements) - .execute(); + try { + context.batch(statements) + .execute(); + } catch (DataAccessException e) { + throw new TableMalformedException("Columns seem to differ or other problem with jOOQ mapper", e); + } } - protected void delete(Table table) throws SQLException, ImageNotSupportedException { + @Override + public void delete(Table table) throws SQLException, ImageNotSupportedException { final DSLContext context = open(table.getDatabase()); context.dropTable(table.getName()); } - protected QueryResultDto selectAll(Table table, Timestamp timestamp, Integer page, Integer size) throws SQLException, ImageNotSupportedException { - if (table == null || table.getInternalName() == null) { - log.error("Could not obtain the table internal name"); - throw new SQLException("Could not obtain the table internal name"); - } - final DSLContext context = open(table.getDatabase()); - /* For versioning, but with jooq implementation better */ - if(table.getDatabase().getContainer().getImage().getDialect().equals("MARIADB")) { - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append("SELECT * FROM "); - stringBuilder.append( table.getInternalName()); - if(timestamp != null) { - stringBuilder.append(" FOR SYSTEM_TIME AS OF TIMESTAMP'"); - stringBuilder.append(timestamp.toLocalDateTime()); - stringBuilder.append("'"); - } - if(page != null && size != null) { - page = Math.abs(page); - size = Math.abs(size); - stringBuilder.append(" LIMIT "); - stringBuilder.append(size); - stringBuilder.append(" OFFSET "); - stringBuilder.append(page * size); - } - stringBuilder.append(";"); - return queryMapper.recordListToQueryResultDto(context.fetch(stringBuilder.toString())); - } else { - return queryMapper.recordListToQueryResultDto(context - .selectFrom(table.getInternalName()) - .fetch()); - } - - } - } diff --git a/fda-table-service/services/src/main/java/at/tuwien/service/MariaDataService.java b/fda-table-service/services/src/main/java/at/tuwien/service/MariaDataService.java new file mode 100644 index 0000000000000000000000000000000000000000..90ffdc1fd8abe943032f59aa07943484b4746d08 --- /dev/null +++ b/fda-table-service/services/src/main/java/at/tuwien/service/MariaDataService.java @@ -0,0 +1,206 @@ +package at.tuwien.service; + +import at.tuwien.api.database.query.QueryResultDto; +import at.tuwien.api.database.table.TableCsvDto; +import at.tuwien.api.database.table.TableInsertDto; +import at.tuwien.entities.database.Database; +import at.tuwien.entities.database.table.Table; +import at.tuwien.exception.*; +import at.tuwien.mapper.ImageMapper; +import at.tuwien.mapper.QueryMapper; +import at.tuwien.mapper.TableMapper; +import at.tuwien.repository.jpa.DatabaseRepository; +import at.tuwien.repository.jpa.TableRepository; +import com.opencsv.CSVParser; +import com.opencsv.CSVParserBuilder; +import com.opencsv.CSVReader; +import com.opencsv.CSVReaderBuilder; +import com.opencsv.exceptions.CsvException; +import lombok.extern.log4j.Log4j2; +import org.jooq.DSLContext; +import org.jooq.exception.DataAccessException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.mock.web.MockMultipartFile; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.multipart.MultipartFile; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.*; + +@Log4j2 +@Service +public class MariaDataService extends JdbcConnector implements DataService { + + private final DatabaseRepository databaseRepository; + private final TableRepository tableRepository; + private final QueryMapper queryMapper; + + @Autowired + public MariaDataService(DatabaseRepository databaseRepository, TableRepository tableRepository, + ImageMapper imageMapper, TableMapper tableMapper, QueryMapper queryMapper) { + super(imageMapper, tableMapper, queryMapper); + this.queryMapper = queryMapper; + this.tableRepository = tableRepository; + this.databaseRepository = databaseRepository; + } + + @Transactional + public Table findById(Long databaseId, Long tableId) throws TableNotFoundException, DatabaseNotFoundException { + final Optional<Table> table = tableRepository.findByDatabaseAndId(findDatabase(databaseId), tableId); + if (table.isEmpty()) { + log.error("table {} not found in database {}", tableId, databaseId); + throw new TableNotFoundException("table not found in database"); + } + return table.get(); + } + + @Override + @Transactional + public void insertCsv(Long databaseId, Long tableId, TableInsertDto data) + throws TableNotFoundException, ImageNotSupportedException, DatabaseNotFoundException, FileStorageException, + TableMalformedException { + final Table table = findById(databaseId, tableId); + final TableCsvDto values; + try { + values = readCsv(table, data); + log.debug("read {} rows from csv", values.getData().size()); + } catch (IOException | CsvException | ArrayIndexOutOfBoundsException e) { + log.error("failed to parse csv {}", e.getMessage()); + throw new FileStorageException("failed to parse csv", e); + } + try { + insertCsv(table, values); + } catch (SQLException | DataAccessException e) { + log.error("could not insert data {}", e.getMessage()); + throw new TableMalformedException("could not insert data", e); + } + log.info("Inserted {} csv records into table id {}", values.getData().size(), tableId); + } + + @Transactional + protected Database findDatabase(Long id) throws DatabaseNotFoundException { + final Optional<Database> database = databaseRepository.findById(id); + if (database.isEmpty()) { + log.error("no database with this id found in metadata database"); + throw new DatabaseNotFoundException("database not found in metadata database"); + } + return database.get(); + } + + protected TableCsvDto readCsv(Table table, TableInsertDto data) throws IOException, CsvException, + ArrayIndexOutOfBoundsException { + log.debug("insert into table {} with params {}", table, data); + if (data.getDelimiter() == null) { + data.setDelimiter(','); + } + if (!data.getCsvLocation().startsWith("test:")) { // todo: improve this? + data.setCsvLocation("/tmp/" + data.getCsvLocation()); + } else { + data.setCsvLocation(data.getCsvLocation().substring(5)); + } + final CSVParser csvParser = new CSVParserBuilder() + .withSeparator(data.getDelimiter()) + .build(); + final MultipartFile multipartFile = new MockMultipartFile(data.getCsvLocation(), Files.readAllBytes(Paths.get(data.getCsvLocation()))); + final Reader fileReader = new InputStreamReader(multipartFile.getInputStream()); + final List<List<String>> cells = new LinkedList<>(); + final CSVReader reader = new CSVReaderBuilder(fileReader) + .withCSVParser(csvParser) + .build(); + final List<Map<String, Object>> records = new LinkedList<>(); + List<String> headers = null; + reader.readAll() + .forEach(x -> cells.add(Arrays.asList(x))); + log.trace("csv raw row size {}, cells raw size {}", reader.readAll().size(), cells.size()); + /* get header */ + if (data.getSkipHeader()) { + headers = cells.get(0); + log.debug("got headers {}", headers); + } + /* map to the map-list structure */ + for (int i = (data.getSkipHeader() ? 1 : 0); i < cells.size(); i++) { + final Map<String, Object> record = new HashMap<>(); + final List<String> row = cells.get(i); + for (int j = 0; j < table.getColumns().size(); j++) { + /* detect if order is correct, we depend on the CsvParser library */ + if (headers != null && table.getColumns().get(j).getInternalName().equals(headers.get(j))) { + log.error("header out of sync, actual: {} but expected: {}", headers.get(j), table.getColumns().get(j).getInternalName()); + } + record.put(table.getColumns().get(j).getInternalName(), row.get(j)); + } + /* when the nullElement itself is null, nothing to do */ + if (data.getNullElement() != null) { + record.replaceAll((key, value) -> value.equals(data.getNullElement()) ? null : value); + } + records.add(record); + } + if (headers == null || headers.size() == 0) { + log.warn("No header check possible, possibly csv without header line or skipHeader=false provided"); + } + log.debug("first row is {}", records.size() > 0 ? records.get(0) : null); + return TableCsvDto.builder() + .data(records) + .build(); + } + + @Override + public void insert(Table table, TableCsvDto data) throws ImageNotSupportedException, TableMalformedException { + try { + insertCsv(table, data); + } catch (SQLException e) { + log.error("could not insert data {}", e.getMessage()); + throw new TableMalformedException("could not insert data", e); + } + } + + @Override + @Transactional + public QueryResultDto selectAll(Long databaseId, Long tableId, Timestamp timestamp, Integer page, Integer size) throws TableNotFoundException, + DatabaseNotFoundException, ImageNotSupportedException, DatabaseConnectionException { + final Table table = findById(databaseId, tableId); + try { + if (table == null || table.getInternalName() == null) { + log.error("Could not obtain the table internal name"); + throw new SQLException("Could not obtain the table internal name"); + } + final DSLContext context = open(table.getDatabase()); + /* For versioning, but with jooq implementation better */ + if (table.getDatabase().getContainer().getImage().getDialect().equals("MARIADB")) { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("SELECT * FROM "); + stringBuilder.append(table.getInternalName()); + if (timestamp != null) { + stringBuilder.append(" FOR SYSTEM_TIME AS OF TIMESTAMP'"); + stringBuilder.append(timestamp.toLocalDateTime()); + stringBuilder.append("'"); + } + if (page != null && size != null) { + page = Math.abs(page); + size = Math.abs(size); + stringBuilder.append(" LIMIT "); + stringBuilder.append(size); + stringBuilder.append(" OFFSET "); + stringBuilder.append(page * size); + } + stringBuilder.append(";"); + return queryMapper.recordListToQueryResultDto(context.fetch(stringBuilder.toString())); + } else { + return queryMapper.recordListToQueryResultDto(context + .selectFrom(table.getInternalName()) + .fetch()); + } + } catch (SQLException e) { + log.error("Could not find data: {}", e.getMessage()); + throw new DatabaseConnectionException(e); + } + } + + +}