diff --git a/fda-metadata-db/querystore/src/main/java/at/tuwien/querystore/Query.java b/fda-metadata-db/querystore/src/main/java/at/tuwien/querystore/Query.java index 8df1ced8691c05bb429a5aa56c0d2553c5b843af..f589913ec36b76fdc99fe90dff41b3ecae5d060f 100644 --- a/fda-metadata-db/querystore/src/main/java/at/tuwien/querystore/Query.java +++ b/fda-metadata-db/querystore/src/main/java/at/tuwien/querystore/Query.java @@ -62,6 +62,9 @@ public class Query implements Serializable { @CreatedDate private Instant created; + @javax.persistence.Column(nullable = false, updatable = false) + private Instant executed; + @javax.persistence.Column(nullable = false) private String createdBy; diff --git a/fda-query-service/rest-service/src/test/java/at/tuwien/BaseUnitTest.java b/fda-query-service/rest-service/src/test/java/at/tuwien/BaseUnitTest.java index b369dfe7c4a12c8092a4c747a1620ea7d29dc646..8fb923dff3c2f606bee4ea6ba5a8be1167520e4c 100644 --- a/fda-query-service/rest-service/src/test/java/at/tuwien/BaseUnitTest.java +++ b/fda-query-service/rest-service/src/test/java/at/tuwien/BaseUnitTest.java @@ -1031,7 +1031,7 @@ public abstract class BaseUnitTest { public final static Long QUERY_1_DATABASE_ID = DATABASE_1_ID; public final static String QUERY_1_QUERY_HASH = "a3b8ac39e38167d14cf3a9c20a69e4b6954d049525390b973a2c23064953a992"; public final static String QUERY_1_RESULT_HASH = "8358c8ade4849d2094ab5bb29127afdae57e6bb5acb1db7af603813d406c467a"; - public final static Instant QUERY_1_CREATED = Instant.now(); + public final static Instant QUERY_1_CREATED = Instant.ofEpochSecond(1677648377); public final static Instant QUERY_1_EXECUTION = Instant.now(); public final static Boolean QUERY_1_PERSISTED = false; @@ -1043,6 +1043,8 @@ public abstract class BaseUnitTest { .created(QUERY_1_CREATED) .createdBy(USER_1_USERNAME) .isPersisted(QUERY_1_PERSISTED) + .executed(QUERY_1_EXECUTION) + .created(QUERY_1_CREATED) .build(); public final static QueryDto QUERY_1_DTO = QueryDto.builder() diff --git a/fda-query-service/rest-service/src/test/java/at/tuwien/config/MariaDbConfig.java b/fda-query-service/rest-service/src/test/java/at/tuwien/config/MariaDbConfig.java index 076c1b05ccacc15ff2a66d188f118663c5207e54..17a8cbf8e594bfb2337c6dd75f327c49c11582e9 100644 --- a/fda-query-service/rest-service/src/test/java/at/tuwien/config/MariaDbConfig.java +++ b/fda-query-service/rest-service/src/test/java/at/tuwien/config/MariaDbConfig.java @@ -24,6 +24,29 @@ public class MariaDbConfig { connection.close(); } + public static List<Map<String, Object>> listQueryStore(String hostname, String database) throws SQLException { + final String jdbc = "jdbc:mariadb://" + hostname + "/" + database; + log.trace("connect to database {}", jdbc); + final Connection connection = DriverManager.getConnection(jdbc, "root", "mariadb"); + final Statement statement = connection.createStatement(); + final ResultSet result = statement.executeQuery("SELECT created_by, query, query_normalized, is_persisted, query_hash, result_hash, result_number, created, executed FROM qs_queries"); + final List<Map<String, Object>> rows = new LinkedList<>(); + while (result.next()) { + rows.add(new HashMap<>() {{ + put("created_by", result.getString(1)); + put("query", result.getString(2)); + put("query_normalized", result.getString(3)); + put("is_persisted", result.getBoolean(4)); + put("query_hash", result.getString(5)); + put("result_hash", result.getString(6)); + put("result_number", result.getLong(7)); + put("created", result.getTimestamp(8)); + put("executed", result.getTimestamp(9)); + }}); + } + return rows; + } + public static List<Map<String, String>> selectQuery(String hostname, String database, String query, String... columns) throws SQLException { final String jdbc = "jdbc:mariadb://" + hostname + "/" + database; diff --git a/fda-query-service/rest-service/src/test/java/at/tuwien/listener/RabbitMqListenerIntegrationTest.java b/fda-query-service/rest-service/src/test/java/at/tuwien/listener/RabbitMqListenerIntegrationTest.java deleted file mode 100644 index 84b278ad273abe5180df23c724829b0da2da14c3..0000000000000000000000000000000000000000 --- a/fda-query-service/rest-service/src/test/java/at/tuwien/listener/RabbitMqListenerIntegrationTest.java +++ /dev/null @@ -1,133 +0,0 @@ -package at.tuwien.listener; - -import at.tuwien.BaseUnitTest; -import at.tuwien.api.amqp.ConsumerDto; -import at.tuwien.config.*; -import at.tuwien.repository.jpa.*; -import com.rabbitmq.client.BuiltinExchangeType; -import com.rabbitmq.client.Channel; -import lombok.extern.log4j.Log4j2; -import org.junit.Rule; -import org.junit.jupiter.api.*; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.rules.Timeout; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.junit.jupiter.SpringExtension; - -import java.io.IOException; -import java.util.*; -import java.util.stream.Collectors; - -import static org.junit.jupiter.api.Assertions.assertEquals; - - -@Log4j2 -@ActiveProfiles(profiles = "junit") -@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD) -@SpringBootTest -@ExtendWith(SpringExtension.class) -public class RabbitMqListenerIntegrationTest extends BaseUnitTest { - - @MockBean - private ReadyConfig readyConfig; - - @MockBean - private IndexConfig indexConfig; - - @Autowired - private Channel channel; - - @Autowired - private ImageRepository imageRepository; - - @Autowired - private ContainerRepository containerRepository; - - @Autowired - private DatabaseRepository databaseRepository; - - @Autowired - private TableRepository tableRepository; - - @Autowired - private H2Utils h2Utils; - - @Autowired - private RabbitMqConfig rabbitMqConfig; - - @Autowired - private AmqpConfig amqpConfig; - - @Rule - public Timeout globalTimeout = Timeout.seconds(300); - - @BeforeAll - public static void beforeAll() throws InterruptedException { - afterAll(); - /* create networks */ - DockerConfig.createAllNetworks(); - /* create containers */ - DockerConfig.createContainer(null, CONTAINER_BROKER, 15672, CONTAINER_BROKER_ENV); - DockerConfig.startContainer(CONTAINER_BROKER); - } - - @AfterAll - public static void afterAll() { - DockerConfig.removeAllContainers(); - DockerConfig.removeAllNetworks(); - } - - @BeforeEach - public void beforeEach() { - /* metadata database */ - h2Utils.runScript("schema.sql"); - imageRepository.save(IMAGE_1); - containerRepository.save(CONTAINER_1); - DATABASE_1.setTables(List.of()); - databaseRepository.save(DATABASE_1); - DATABASE_1.setTables(List.of(TABLE_1, TABLE_2, TABLE_3)); - tableRepository.save(TABLE_1_NOCOLS); - tableRepository.save(TABLE_2_NOCOLS); - tableRepository.save(TABLE_3_NOCOLS); - } - - @Test - @Disabled("Not testable") - public void updateConsumers_succeeds() throws IOException, InterruptedException { - - /* pre-condition */ - assertEquals(0, getConsumers().size()); - assertEquals(2, amqpConfig.getAmqpConsumers()); - - /* mock */ - channel.exchangeDeclare(DATABASE_1_EXCHANGE, BuiltinExchangeType.FANOUT); - channel.queueDeclare(TABLE_1_QUEUE_NAME, true, false, false, null); - channel.queueBind(TABLE_1_QUEUE_NAME, DATABASE_1_EXCHANGE, TABLE_1_ROUTING_KEY); - channel.queueDeclare(TABLE_2_QUEUE_NAME, true, false, false, null); - channel.queueBind(TABLE_2_QUEUE_NAME, DATABASE_1_EXCHANGE, TABLE_2_ROUTING_KEY); - channel.queueDeclare(TABLE_3_QUEUE_NAME, true, false, false, null); - channel.queueBind(TABLE_3_QUEUE_NAME, DATABASE_1_EXCHANGE, TABLE_3_ROUTING_KEY); - - /* test */ - Thread.sleep(30 * 1000) /* wait for scheduled insert */; - final List<ConsumerDto> response = getConsumers(); - final List<ConsumerDto> consumers1 = response.stream().filter(c -> c.getQueue().getName().equals(TABLE_1_QUEUE_NAME)).collect(Collectors.toList()); - assertEquals(2, consumers1.size()); - final List<ConsumerDto> consumers2 = response.stream().filter(c -> c.getQueue().getName().equals(TABLE_2_QUEUE_NAME)).collect(Collectors.toList()); - assertEquals(2, consumers2.size()); - final List<ConsumerDto> consumers3 = response.stream().filter(c -> c.getQueue().getName().equals(TABLE_3_QUEUE_NAME)).collect(Collectors.toList()); - assertEquals(2, consumers3.size()); - } - - private List<ConsumerDto> getConsumers() throws IOException { - return rabbitMqConfig.findAllConsumers() - .stream() - .filter(c -> List.of(TABLE_1_QUEUE_NAME, TABLE_2_QUEUE_NAME, TABLE_3_QUEUE_NAME).contains(c.getQueue().getName())) - .collect(Collectors.toList()); - } - -} diff --git a/fda-query-service/rest-service/src/test/java/at/tuwien/service/QueueServiceIntegrationTest.java b/fda-query-service/rest-service/src/test/java/at/tuwien/service/QueueServiceIntegrationTest.java index 49a47646b20d3fe4dc98c7dcf95df339c3beaafb..acfe952c722a1a36fd6fe50338cf49335e4c6699 100644 --- a/fda-query-service/rest-service/src/test/java/at/tuwien/service/QueueServiceIntegrationTest.java +++ b/fda-query-service/rest-service/src/test/java/at/tuwien/service/QueueServiceIntegrationTest.java @@ -2,11 +2,9 @@ package at.tuwien.service; import at.tuwien.BaseUnitTest; import at.tuwien.amqp.RabbitMqConsumer; +import at.tuwien.api.amqp.ConsumerDto; import at.tuwien.api.database.table.TableCsvDto; -import at.tuwien.config.AmqpConfig; -import at.tuwien.config.DockerConfig; -import at.tuwien.config.IndexConfig; -import at.tuwien.config.ReadyConfig; +import at.tuwien.config.*; import at.tuwien.exception.AmqpException; import at.tuwien.gateway.BrokerServiceGateway; import at.tuwien.listener.MessageQueueListener; @@ -29,9 +27,12 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; import java.io.File; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Optional; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -69,6 +70,9 @@ public class QueueServiceIntegrationTest extends BaseUnitTest { @Autowired private AmqpConfig amqpConfig; + @Autowired + private RabbitMqConfig rabbitMqConfig; + @Autowired private Channel channel; @@ -86,7 +90,7 @@ public class QueueServiceIntegrationTest extends BaseUnitTest { DockerConfig.createAllNetworks(); DockerConfig.createContainer(BIND, CONTAINER_1, CONTAINER_1_ENV); DockerConfig.startContainer(CONTAINER_1); - DockerConfig.createContainer(null, CONTAINER_BROKER, CONTAINER_BROKER_ENV); + DockerConfig.createContainer(null, CONTAINER_BROKER, 15672, CONTAINER_BROKER_ENV); DockerConfig.startContainer(CONTAINER_BROKER); } @@ -194,4 +198,18 @@ public class QueueServiceIntegrationTest extends BaseUnitTest { channel.basicPublish(DATABASE_1_EXCHANGE, TABLE_1_ROUTING_KEY, basicProperties, objectMapper.writeValueAsBytes(TABLE_1_CSV_DTO)); } + @Test + public void restore_succeeds() throws AmqpException, IOException, InterruptedException { + + /* mock */ + when(tableRepository.findAll()) + .thenReturn(List.of(TABLE_1)); + + /* test */ + messageQueueService.restore(); + Thread.sleep(5 * 1000); + final List<ConsumerDto> response = rabbitMqConfig.findAllConsumers(); + assertEquals(amqpConfig.getAmqpConsumers(), (int) response.stream().filter(c -> c.getQueue().getName().equals(TABLE_1_QUEUE_NAME)).count()); + } + } diff --git a/fda-query-service/rest-service/src/test/java/at/tuwien/service/StoreServiceIntegrationTest.java b/fda-query-service/rest-service/src/test/java/at/tuwien/service/StoreServiceIntegrationTest.java index 3307d1bf8187628cf95afa287bf2fc231ad27cf7..82879e674f7a107d1513dfde582f93b13a75063f 100644 --- a/fda-query-service/rest-service/src/test/java/at/tuwien/service/StoreServiceIntegrationTest.java +++ b/fda-query-service/rest-service/src/test/java/at/tuwien/service/StoreServiceIntegrationTest.java @@ -31,6 +31,7 @@ import java.sql.SQLException; import java.util.List; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Map; import java.util.Optional; import static java.time.temporal.ChronoUnit.HOURS; @@ -100,9 +101,11 @@ public class StoreServiceIntegrationTest extends BaseUnitTest { } @BeforeEach - public void beforeEach() throws InterruptedException, SQLException { + public void beforeEach() throws InterruptedException { afterEach(); + /* create networks */ DockerConfig.createAllNetworks(); + /* create containers */ DockerConfig.createContainer(BIND, CONTAINER_1, CONTAINER_1_ENV); DockerConfig.startContainer(CONTAINER_1); /* metadata database */ @@ -386,4 +389,17 @@ public class StoreServiceIntegrationTest extends BaseUnitTest { }); } + @Test + public void deleteStaleQueries_succeeds() throws QueryStoreException, ImageNotSupportedException, SQLException { + + /* mock */ + when(databaseRepository.findAll()) + .thenReturn(List.of(DATABASE_1)); + + /* test */ + storeService.deleteStaleQueries(); + final List<Map<String, Object>> response = MariaDbConfig.listQueryStore(CONTAINER_1_INTERNALNAME, DATABASE_1_INTERNALNAME); + assertEquals(2, response.size()); + } + } diff --git a/fda-query-service/rest-service/src/test/resources/weather/3_queries.sql b/fda-query-service/rest-service/src/test/resources/weather/3_queries.sql new file mode 100644 index 0000000000000000000000000000000000000000..f50144176f689398b21971e58778d4cd688383b7 --- /dev/null +++ b/fda-query-service/rest-service/src/test/resources/weather/3_queries.sql @@ -0,0 +1,11 @@ +INSERT INTO `qs_queries` (`created`, `executed`, `created_by`, `query`, `query_normalized`, `is_persisted`, + `query_hash`, `result_hash`, `result_number`) +VALUES ('2022-12-24 18:00:00', '2022-12-24 18:00:00', 'sclause', 'SELECT `present` FROM `bag`', + 'SELECT `present` FROM `bag`', false, 'e8aff3ca4caeb228b314e88f00be767407bc45656a96da208a4cea00b75cc8d8', + '5a9977bb0b8653f18a6542f098b72e696a3584433db156ceb26047ee4f6f7e2b', 3), + ('2022-12-24 18:00:01', '2022-12-24 18:00:01', 'sclause', 'SELECT `type`, `present` FROM `bag`', + 'SELECT `type`, `present` FROM `bag`', true, 'e8aff3ca4caeb228b314e88f00be767407bc45656a96da208a4cea00b75cc8d7', + '5a9977bb0b8653f18a6542f098b72e696a3584433db156ceb26047ee4f6f7e2a', 3), + (NOW(), NOW(), 'sclause', 'SELECT `id`, `present` FROM `bag`', + 'SELECT `id`, `present` FROM `bag`', false, 'e8aff3ca4caeb228b314e88f00be767407bc45656a96da208a4cea00b75cc8d9', + '5a9977bb0b8653f18a6542f098b72e696a3584433db156ceb26047ee4f6f7e2c', 3); \ No newline at end of file diff --git a/fda-query-service/services/src/main/java/at/tuwien/listener/DatabaseListener.java b/fda-query-service/services/src/main/java/at/tuwien/listener/DatabaseListener.java new file mode 100644 index 0000000000000000000000000000000000000000..10adef889b3ef66eb2f7c37b1a806d2d7f1ed9bb --- /dev/null +++ b/fda-query-service/services/src/main/java/at/tuwien/listener/DatabaseListener.java @@ -0,0 +1,17 @@ +package at.tuwien.listener; + +import at.tuwien.exception.ImageNotSupportedException; +import at.tuwien.exception.QueryStoreException; +import org.springframework.scheduling.annotation.Scheduled; + +public interface DatabaseListener { + + /** + * Deletes stale queries that have not been persisted within 24 hours. + * + * @throws QueryStoreException The query store raised some exception. + * @throws ImageNotSupportedException The image is not supported by the service. + */ + @Scheduled + void deleteStaleQueries() throws QueryStoreException, ImageNotSupportedException; +} diff --git a/fda-query-service/services/src/main/java/at/tuwien/listener/MessageQueueListener.java b/fda-query-service/services/src/main/java/at/tuwien/listener/MessageQueueListener.java index afac64a7a0b323bb9745a47365e1a8fd8e20f4ae..4b8b1872272eb7375d408b1029fbf18c0a9be93d 100644 --- a/fda-query-service/services/src/main/java/at/tuwien/listener/MessageQueueListener.java +++ b/fda-query-service/services/src/main/java/at/tuwien/listener/MessageQueueListener.java @@ -5,6 +5,11 @@ import org.springframework.scheduling.annotation.Scheduled; public interface MessageQueueListener { + /** + * Restores the consumers up to the configured limit. + * + * @throws AmqpException The consumer could not be created. + */ @Scheduled(fixedDelay = 5000) void updateConsumers() throws AmqpException; } diff --git a/fda-query-service/services/src/main/java/at/tuwien/listener/impl/MariadbListenerImpl.java b/fda-query-service/services/src/main/java/at/tuwien/listener/impl/MariadbListenerImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..ebc76975ea876211aa75b2aeace17568d783f735 --- /dev/null +++ b/fda-query-service/services/src/main/java/at/tuwien/listener/impl/MariadbListenerImpl.java @@ -0,0 +1,29 @@ +package at.tuwien.listener.impl; + +import at.tuwien.exception.ImageNotSupportedException; +import at.tuwien.exception.QueryStoreException; +import at.tuwien.listener.DatabaseListener; +import at.tuwien.service.StoreService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +@Component +public class MariadbListenerImpl implements DatabaseListener { + + private final StoreService storeService; + + @Autowired + public MariadbListenerImpl(StoreService storeService) { + this.storeService = storeService; + } + + @Override + @Scheduled(cron = "0 2 * * *" /* at 2am */) + @Transactional(readOnly = true) + public void deleteStaleQueries() throws QueryStoreException, ImageNotSupportedException { + storeService.deleteStaleQueries(); + } + +} diff --git a/fda-query-service/services/src/main/java/at/tuwien/listener/impl/RabbitMqListenerImpl.java b/fda-query-service/services/src/main/java/at/tuwien/listener/impl/RabbitMqListenerImpl.java index bc45a7888800822c806a6ded90cb043670e14fc8..6e3670a70a74c12db7f2844968ad10272c682859 100644 --- a/fda-query-service/services/src/main/java/at/tuwien/listener/impl/RabbitMqListenerImpl.java +++ b/fda-query-service/services/src/main/java/at/tuwien/listener/impl/RabbitMqListenerImpl.java @@ -1,57 +1,28 @@ package at.tuwien.listener.impl; -import at.tuwien.api.amqp.ConsumerDto; -import at.tuwien.config.AmqpConfig; -import at.tuwien.entities.database.table.Table; import at.tuwien.exception.AmqpException; -import at.tuwien.gateway.BrokerServiceGateway; import at.tuwien.listener.MessageQueueListener; import at.tuwien.service.MessageQueueService; -import at.tuwien.service.TableService; import lombok.extern.log4j.Log4j2; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.util.List; - @Log4j2 @Service public class RabbitMqListenerImpl implements MessageQueueListener { - private final AmqpConfig amqpConfig; - private final TableService tableService; private final MessageQueueService messageQueueService; - private final BrokerServiceGateway brokerServiceGateway; - @Autowired - public RabbitMqListenerImpl(AmqpConfig amqpConfig, TableService tableService, - MessageQueueService messageQueueService, BrokerServiceGateway brokerServiceGateway) { - this.amqpConfig = amqpConfig; - this.tableService = tableService; + public RabbitMqListenerImpl(MessageQueueService messageQueueService) { this.messageQueueService = messageQueueService; - this.brokerServiceGateway = brokerServiceGateway; } @Override @Scheduled(fixedDelay = 5000) @Transactional(readOnly = true) public void updateConsumers() throws AmqpException { - final List<Table> tables = tableService.findAll(); - final List<ConsumerDto> consumers = brokerServiceGateway.findAllConsumers(); - for (Table table : tables) { - final long consumerCount = consumers.stream().filter(c -> c.getQueue().getName().equals(table.getQueueName())).count(); - if (consumerCount >= amqpConfig.getAmqpConsumers()) { - log.trace("listener table with name {} already has {} consumers (max. {})", table.getName(), - consumerCount, amqpConfig.getAmqpConsumers()); - continue; - } - log.debug("table with id {} has {} consumers, but needs {} in total", table.getId(), consumerCount, - amqpConfig.getAmqpConsumers()); - messageQueueService.createConsumer(table.getQueueName(), table.getDatabase().getContainer().getId(), - table.getDatabase().getId(), table.getId()); - } + messageQueueService.restore(); } } diff --git a/fda-query-service/services/src/main/java/at/tuwien/mapper/StoreMapper.java b/fda-query-service/services/src/main/java/at/tuwien/mapper/StoreMapper.java index c594988657b4924af6581570e63cce73add71d20..9302a0ca0aac8a652ef8833657abff90c02e880a 100644 --- a/fda-query-service/services/src/main/java/at/tuwien/mapper/StoreMapper.java +++ b/fda-query-service/services/src/main/java/at/tuwien/mapper/StoreMapper.java @@ -62,6 +62,17 @@ public interface StoreMapper { } } + default PreparedStatement queryStoreRawDeleteStaleQueries(Connection connection) throws QueryStoreException { + final String statement = "DELETE FROM `qs_queries` WHERE `is_persisted` = false AND ABS(DATEDIFF(`created`, NOW())) >= 1"; + try { + log.trace("mapped select all query '{}' to prepared statement", statement); + return connection.prepareStatement(statement); + } catch (SQLException e) { + log.error("Failed to prepare statement {}, reason: {}", statement, e.getMessage()); + throw new QueryStoreException("Failed to prepare statement", e); + } + } + default PreparedStatement queryStoreRawSelectOneQuery(Connection connection, Long queryId) throws QueryStoreException { final String statement = "SELECT `id`, `created`, `created_by`, `query`, `query_hash`, `result_hash`, `result_number`, `is_persisted` FROM `qs_queries` q WHERE q.`id` = ?"; try { diff --git a/fda-query-service/services/src/main/java/at/tuwien/service/DatabaseService.java b/fda-query-service/services/src/main/java/at/tuwien/service/DatabaseService.java index 2e27dc0b4024fe4d03687eb77bde0df89dc3c374..982871215c63bb4953fb3b68012e83c8667ce305 100644 --- a/fda-query-service/services/src/main/java/at/tuwien/service/DatabaseService.java +++ b/fda-query-service/services/src/main/java/at/tuwien/service/DatabaseService.java @@ -2,11 +2,14 @@ package at.tuwien.service; import at.tuwien.entities.database.Database; import at.tuwien.exception.DatabaseNotFoundException; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; public interface DatabaseService { /** - * Finds a database by given id in the remote database service. + * Finds a database by given id in the metadata database. * * @param containerId The container id. * @param databaseId The database id. @@ -14,4 +17,11 @@ public interface DatabaseService { * @throws DatabaseNotFoundException The database was not found. */ Database find(Long containerId, Long databaseId) throws DatabaseNotFoundException; + + /** + * Finds all databases in the metadata database. + * + * @return List of databases. + */ + List<Database> findAll(); } diff --git a/fda-query-service/services/src/main/java/at/tuwien/service/MessageQueueService.java b/fda-query-service/services/src/main/java/at/tuwien/service/MessageQueueService.java index 4bd2fd9bebdb5fa3fe2f67fcc3d75af4bba75b87..ed4342ddd40cfda326958f0b23fab34b30fc8583 100644 --- a/fda-query-service/services/src/main/java/at/tuwien/service/MessageQueueService.java +++ b/fda-query-service/services/src/main/java/at/tuwien/service/MessageQueueService.java @@ -15,4 +15,11 @@ public interface MessageQueueService { * @throws AmqpException The consumer could not be created. */ void createConsumer(String queueName, Long containerId, Long databaseId, Long tableId) throws AmqpException; + + /** + * Restores missing consumers at the Broker Service. + * + * @throws AmqpException The consumer could not be created. + */ + void restore() throws AmqpException; } diff --git a/fda-query-service/services/src/main/java/at/tuwien/service/StoreService.java b/fda-query-service/services/src/main/java/at/tuwien/service/StoreService.java index f2e206abb0ac473227a1b7cb6300ddf782faa1ba..baebb84c4a04ad3abf8dadef00706c9ae7bc9e20 100644 --- a/fda-query-service/services/src/main/java/at/tuwien/service/StoreService.java +++ b/fda-query-service/services/src/main/java/at/tuwien/service/StoreService.java @@ -74,11 +74,18 @@ public interface StoreService { * @param principal The user principal. * @return The stored query on success. * @throws DatabaseNotFoundException The database id was not found in the metadata database - * @throws ImageNotSupportedException The image is not supported + * @throws ImageNotSupportedException The image is not supported. * @throws DatabaseConnectionException The database connection to the remote container failed. - * @throws QueryStoreException The query store raised some error + * @throws QueryStoreException The query store raised some error. */ Query persist(Long containerId, Long databaseId, Long queryId, Principal principal) throws DatabaseNotFoundException, ImageNotSupportedException, DatabaseConnectionException, QueryStoreException, UserNotFoundException; + /** + * Deletes the stale queries that have not been persisted within 24 hozrs. + * + * @throws ImageNotSupportedException The image is not supported. + * @throws QueryStoreException The query store raised some error. + */ + void deleteStaleQueries() throws ImageNotSupportedException, QueryStoreException; } diff --git a/fda-query-service/services/src/main/java/at/tuwien/service/impl/DatabaseServiceImpl.java b/fda-query-service/services/src/main/java/at/tuwien/service/impl/DatabaseServiceImpl.java index c5c6a30008b9b28435f45cf3fbd1216989d148f6..79910ab9bf4aaec93670d22d6678eaac437ef59c 100644 --- a/fda-query-service/services/src/main/java/at/tuwien/service/impl/DatabaseServiceImpl.java +++ b/fda-query-service/services/src/main/java/at/tuwien/service/impl/DatabaseServiceImpl.java @@ -9,6 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.util.List; import java.util.Optional; @Log4j2 @@ -32,4 +33,10 @@ public class DatabaseServiceImpl implements DatabaseService { } return database.get(); } + + @Override + @Transactional(readOnly = true) + public List<Database> findAll() { + return databaseRepository.findAll(); + } } diff --git a/fda-query-service/services/src/main/java/at/tuwien/service/impl/RabbitMqServiceImpl.java b/fda-query-service/services/src/main/java/at/tuwien/service/impl/RabbitMqServiceImpl.java index 00f16ff96f763f335a24177d1c66cfd4185f3438..89bcb5425ec77f72cf971366b3bd70ec2e80c7dd 100644 --- a/fda-query-service/services/src/main/java/at/tuwien/service/impl/RabbitMqServiceImpl.java +++ b/fda-query-service/services/src/main/java/at/tuwien/service/impl/RabbitMqServiceImpl.java @@ -1,19 +1,22 @@ package at.tuwien.service.impl; import at.tuwien.amqp.RabbitMqConsumer; +import at.tuwien.api.amqp.ConsumerDto; import at.tuwien.config.AmqpConfig; import at.tuwien.entities.database.table.Table; import at.tuwien.exception.*; +import at.tuwien.gateway.BrokerServiceGateway; import at.tuwien.service.MessageQueueService; import at.tuwien.service.QueryService; +import at.tuwien.service.TableService; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.*; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import java.io.IOException; +import java.util.List; @Log4j2 @Service @@ -23,14 +26,19 @@ public class RabbitMqServiceImpl implements MessageQueueService { private final AmqpConfig amqpConfig; private final ObjectMapper objectMapper; private final QueryService queryService; + private final TableService tableService; + private final BrokerServiceGateway brokerServiceGateway; @Autowired public RabbitMqServiceImpl(Channel channel, AmqpConfig amqpConfig, ObjectMapper objectMapper, - QueryService queryService) { + QueryService queryService, TableService tableService, + BrokerServiceGateway brokerServiceGateway) { this.channel = channel; this.amqpConfig = amqpConfig; this.objectMapper = objectMapper; this.queryService = queryService; + this.tableService = tableService; + this.brokerServiceGateway = brokerServiceGateway; } @Override @@ -54,4 +62,25 @@ public class RabbitMqServiceImpl implements MessageQueueService { } } + @Override + public void restore() throws AmqpException { + final List<Table> tables = tableService.findAll(); + final List<ConsumerDto> consumers = brokerServiceGateway.findAllConsumers(); + for (Table table : tables) { + final long consumerCount = consumers.stream().filter(c -> c.getQueue().getName().equals(table.getQueueName())).count(); + if (consumerCount >= amqpConfig.getAmqpConsumers()) { + log.trace("listener table with name {} already has {} consumers (max. {})", table.getName(), + consumerCount, amqpConfig.getAmqpConsumers()); + continue; + } + log.debug("table with id {} has {} consumers, but needs {} in total", table.getId(), consumerCount, + amqpConfig.getAmqpConsumers()); + for (long i = consumerCount; i < amqpConfig.getAmqpConsumers(); i++) { + createConsumer(table.getQueueName(), table.getDatabase().getContainer().getId(), + table.getDatabase().getId(), table.getId()); + log.trace("creating consumer #{}", i); + } + } + } + } diff --git a/fda-query-service/services/src/main/java/at/tuwien/service/impl/StoreServiceImpl.java b/fda-query-service/services/src/main/java/at/tuwien/service/impl/StoreServiceImpl.java index 52dba2e3368ea79eaa13b6b4026bcfcf1904ab61..b03514297f2fab2544257505a51d1272d73dfabe 100644 --- a/fda-query-service/services/src/main/java/at/tuwien/service/impl/StoreServiceImpl.java +++ b/fda-query-service/services/src/main/java/at/tuwien/service/impl/StoreServiceImpl.java @@ -181,6 +181,34 @@ public class StoreServiceImpl extends HibernateConnector implements StoreService return out; } + @Override + public void deleteStaleQueries() throws ImageNotSupportedException, QueryStoreException { + /* find */ + final List<Database> databases = databaseService.findAll(); + for (Database database : databases) { + if (!database.getContainer().getImage().getRepository().equals("mariadb")) { + log.error("Currently only MariaDB is supported"); + throw new ImageNotSupportedException("Currently only MariaDB is supported"); + } + final User root = databaseMapper.containerToPrivilegedUser(database.getContainer()); + /* run query */ + final ComboPooledDataSource dataSource = getDataSource(database.getContainer().getImage(), + database.getContainer(), database, root); + /* delete stale queries older than 24hrs */ + try { + final Connection connection = dataSource.getConnection(); + final PreparedStatement preparedStatement = storeMapper.queryStoreRawDeleteStaleQueries(connection); + final int affected = preparedStatement.executeUpdate(); + log.debug("delete stale queries affected {} rows", affected); + } catch (SQLException e) { + log.error("Failed to delete stale queries in database with id {}, reason: {}", database.getId(), e.getMessage()); + throw new QueryStoreException("Failed to delete stale queries in database with id " + database.getId() + ": " + e.getMessage()); + } finally { + dataSource.close(); + } + } + } + protected List<Query> resultSetToQueryList(ResultSet resultSet) throws SQLException { final List<Query> queries = new LinkedList<>(); while (resultSet.next()) {