Skip to content
Snippets Groups Projects
Verified Commit cfb21e9d authored by Martin Weise's avatar Martin Weise
Browse files

Fixed the consumer issue

parent 321368d3
No related branches found
No related tags found
No related merge requests found
Showing
with 461 additions and 81 deletions
......@@ -15,6 +15,7 @@
from __future__ import absolute_import
# import apis into sdk package
from api_query.api.consumer_endpoint_api import ConsumerEndpointApi
from api_query.api.export_endpoint_api import ExportEndpointApi
from api_query.api.query_endpoint_api import QueryEndpointApi
from api_query.api.store_endpoint_api import StoreEndpointApi
......
......@@ -3,6 +3,7 @@ from __future__ import absolute_import
# flake8: noqa
# import apis into api package
from api_query.api.consumer_endpoint_api import ConsumerEndpointApi
from api_query.api.export_endpoint_api import ExportEndpointApi
from api_query.api.query_endpoint_api import QueryEndpointApi
from api_query.api.store_endpoint_api import StoreEndpointApi
......
This diff is collapsed.
......@@ -38,6 +38,7 @@ public class GatewayConfig {
"/api/container/**/database/**/table/**/data/**",
"/api/container/**/database/**/table/**/query/**",
"/api/container/**/database/**/table/**/export/**",
"/api/container/**/database/**/table/**/consumer",
"/api/container/**/database/**/version/**")
.and()
.method("POST", "GET", "PUT", "DELETE")
......
package at.tuwien.endpoint;
import at.tuwien.entities.database.Database;
import at.tuwien.entities.database.table.Table;
import at.tuwien.entities.identifier.Identifier;
import at.tuwien.exception.DatabaseNotFoundException;
import at.tuwien.exception.IdentifierNotFoundException;
......@@ -61,6 +62,35 @@ public abstract class AbstractEndpoint {
return false;
}
protected Boolean hasQueuePermission(Long containerId, Long databaseId, Long tableId, String permissionCode,
Principal principal) {
final Database database;
try {
database = databaseService.find(containerId, databaseId);
} catch (DatabaseNotFoundException e) {
log.debug("failed to find database with id {}", databaseId);
return false;
}
if (principal == null) {
log.debug("failed to grant permission {} because principal is null", permissionCode);
return false;
}
/* modification operations are limited to the creator */
if (database.getCreator().getUsername().equals(principal.getName())) {
log.debug("grant permission {} because user {} is creator {}", permissionCode, principal.getName(),
database.getCreator().getUsername());
return true;
}
final Authentication authentication = (Authentication) principal /* with pre-authorization this always holds */;
if (authentication.getAuthorities().stream().noneMatch(a -> a.getAuthority().equals("ROLE_RESEARCHER"))) {
log.debug("failed to grant permission {} because current user misses authority 'ROLE_RESEARCHER'",
permissionCode);
return false;
}
log.debug("failed to grant permission {} because database is not owner by the current user", permissionCode);
return false;
}
protected Boolean hasQueryPermission(Long containerId, Long databaseId, Long queryId, String permissionCode, Principal principal) {
final Database database;
try {
......
package at.tuwien.endpoint;
import at.tuwien.entities.database.table.Table;
import at.tuwien.exception.*;
import at.tuwien.service.*;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import javax.validation.constraints.NotNull;
import java.security.Principal;
@Log4j2
@CrossOrigin(origins = "*")
@RestController
@RequestMapping("/api/container/{id}/database/{databaseId}/table/{tableId}/consumer")
public class ConsumerEndpoint extends AbstractEndpoint {
private final TableService tableService;
private final MessageQueueService messageQueueService;
@Autowired
public ConsumerEndpoint(DatabaseService databaseService, IdentifierService identifierService,
TableService tableService, MessageQueueService messageQueueService) {
super(databaseService, identifierService);
this.tableService = tableService;
this.messageQueueService = messageQueueService;
}
@PostMapping
@Transactional
@Operation(summary = "Declare consumer", security = @SecurityRequirement(name = "bearerAuth"))
public ResponseEntity<Void> declare(@NotNull @PathVariable("id") Long containerId,
@NotNull @PathVariable("databaseId") Long databaseId,
@NotNull @PathVariable("tableId") Long tableId,
@NotNull Principal principal)
throws TableNotFoundException, DatabaseNotFoundException, AmqpException, NotAllowedException {
if (!hasDatabasePermission(containerId, databaseId, "QUEUE_CREATE_CONSUMER", principal)) {
log.error("Missing data export permission");
throw new NotAllowedException("Missing data export permission");
}
final Table table = tableService.find(containerId, databaseId, tableId);
messageQueueService.createConsumer(table.getTopic(), containerId, databaseId, tableId);
return ResponseEntity.accepted()
.build();
}
}
......@@ -103,7 +103,8 @@ public class TableDataEndpoint extends AbstractEndpoint {
@NotNull @Valid @RequestBody ImportDto data,
@NotNull Principal principal)
throws TableNotFoundException, DatabaseNotFoundException, TableMalformedException,
ImageNotSupportedException, ContainerNotFoundException, NotAllowedException, DatabaseConnectionException, QueryMalformedException {
ImageNotSupportedException, ContainerNotFoundException, NotAllowedException, DatabaseConnectionException,
QueryMalformedException {
if (!hasDatabasePermission(containerId, databaseId, "DATA_INSERT", principal)) {
log.error("Missing data insert permission");
throw new NotAllowedException("Missing data insert permission");
......
......@@ -62,7 +62,7 @@ public class RabbitMqServiceImpl implements MessageQueueService {
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
final TypeReference<HashMap<String, Object>> payloadReference = new TypeReference<>() {
};
try {
......
......@@ -15,12 +15,10 @@ import java.util.List;
@Slf4j
public abstract class AbstractEndpoint {
private final TableService tableService;
private final DatabaseService databaseService;
@Autowired
protected AbstractEndpoint(TableService tableService, DatabaseService databaseService) {
this.tableService = tableService;
protected AbstractEndpoint(DatabaseService databaseService) {
this.databaseService = databaseService;
}
......
......@@ -13,7 +13,6 @@ import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
......@@ -30,17 +29,17 @@ import java.util.stream.Collectors;
@RequestMapping("/api/container/{id}/database/{databaseId}/table")
public class TableEndpoint extends AbstractEndpoint {
private final TableMapper tableMapper;
private final TableService tableService;
private final MessageQueueService amqpService;
private final TableMapper tableMapper;
@Autowired
public TableEndpoint(TableService tableService, DatabaseService databaseService, MessageQueueService amqpService,
TableMapper tableMapper) {
super(tableService, databaseService);
this.tableService = tableService;
this.amqpService = amqpService;
public TableEndpoint(TableMapper tableMapper, TableService tableService, MessageQueueService amqpService,
DatabaseService databaseService) {
super(databaseService);
this.tableMapper = tableMapper;
this.amqpService = amqpService;
this.tableService = tableService;
}
@GetMapping
......@@ -66,7 +65,7 @@ public class TableEndpoint extends AbstractEndpoint {
public ResponseEntity<TableBriefDto> create(@NotNull @PathVariable("id") Long containerId,
@NotNull @PathVariable("databaseId") Long databaseId,
@NotNull @Valid @RequestBody TableCreateDto createDto,
Principal principal)
@NotNull Principal principal)
throws ImageNotSupportedException, DatabaseNotFoundException, TableMalformedException, AmqpException,
TableNameExistsException, ContainerNotFoundException, UserNotFoundException, QueryMalformedException {
if (!hasDatabasePermission(containerId, databaseId, "TABLE_CREATE", principal)) {
......
package at.tuwien.gateway;
import at.tuwien.api.database.table.TableCsvDto;
import at.tuwien.exception.AmqpException;
public interface QueryServiceGateway {
......@@ -10,9 +10,8 @@ public interface QueryServiceGateway {
* @param containerId The container id.
* @param databaseId The database id.
* @param tableId The table id.
* @param data The data.
* @return The number of inserted tuples.
* @param authorization The authentication token.
*/
Integer publish(Long containerId, Long databaseId, Long tableId, TableCsvDto data);
void declareConsumer(Long containerId, Long databaseId, Long tableId, String authorization) throws AmqpException;
}
package at.tuwien.gateway.impl;
import at.tuwien.api.database.table.TableCsvDto;
import at.tuwien.exception.AmqpException;
import at.tuwien.gateway.QueryServiceGateway;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
......@@ -22,11 +20,18 @@ public class QueryServiceGatewayImpl implements QueryServiceGateway {
}
@Override
public Integer publish(Long containerId, Long databaseId, Long tableId, TableCsvDto data) {
final String url = "/api/container/" + containerId + "/database/" + databaseId + "/table/" + tableId + "/data";
final ResponseEntity<Integer> response = restTemplate.exchange(url, HttpMethod.POST,
new HttpEntity<>(data), Integer.class);
return response.getBody();
public void declareConsumer(Long containerId, Long databaseId, Long tableId, String authorization) throws AmqpException {
final String url = "/api/container/" + containerId + "/database/" + databaseId + "/table/" + tableId + "/consumer";
final HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", authorization);
final ResponseEntity<Void> response = restTemplate.exchange(url, HttpMethod.POST,
new HttpEntity<>(null, headers), Void.class);
if (!response.getStatusCode().equals(HttpStatus.ACCEPTED)) {
log.error("Failed to declare consumer for table with id {}", tableId);
log.debug("failed to declare consumer for container with id {} database with id {} table with id {}",
containerId, databaseId, tableId);
throw new AmqpException("Failed to declare consumer");
}
}
}
......@@ -25,6 +25,8 @@ import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;
import static org.springframework.transaction.annotation.Propagation.REQUIRES_NEW;
@Log4j2
@Service
public class TableServiceImpl extends HibernateConnector implements TableService {
......@@ -162,7 +164,7 @@ public class TableServiceImpl extends HibernateConnector implements TableService
dataSource1.close();
}
/* save in metadata database */
final Table table = tableRepository.save(entity);
final Table table = tableRepository.saveAndFlush(entity);
log.info("Created table with id {}", table.getId());
log.debug("created table {}", table);
/* save in elastic search */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment