Skip to content
Snippets Groups Projects
Verified Commit db374682 authored by Martin Weise's avatar Martin Weise
Browse files

S3 key

parent ac312a5e
No related branches found
No related tags found
No related merge requests found
...@@ -84,7 +84,6 @@ public class StorageServiceIntegrationTest extends BaseTest { ...@@ -84,7 +84,6 @@ public class StorageServiceIntegrationTest extends BaseTest {
@BeforeEach @BeforeEach
public void beforeEach() throws SQLException, InterruptedException { public void beforeEach() throws SQLException, InterruptedException {
/* s3 */ /* s3 */
Thread.sleep(1000) /* wait for test container some more */;
if (s3Client.listBuckets().buckets().stream().noneMatch(b -> b.name().equals(s3Config.getS3Bucket()))) { if (s3Client.listBuckets().buckets().stream().noneMatch(b -> b.name().equals(s3Config.getS3Bucket()))) {
log.warn("Bucket {} not found", s3Config.getS3Bucket()); log.warn("Bucket {} not found", s3Config.getS3Bucket());
s3Client.createBucket(CreateBucketRequest.builder() s3Client.createBucket(CreateBucketRequest.builder()
......
...@@ -35,6 +35,8 @@ public class StorageServiceS3Impl implements StorageService { ...@@ -35,6 +35,8 @@ public class StorageServiceS3Impl implements StorageService {
private final S3Client s3Client; private final S3Client s3Client;
private final SparkSession sparkSession; private final SparkSession sparkSession;
private static final String S3_KEY = "s3_key";
@Autowired @Autowired
public StorageServiceS3Impl(S3Config s3Config, S3Client s3Client, SparkSession sparkSession) { public StorageServiceS3Impl(S3Config s3Config, S3Client s3Client, SparkSession sparkSession) {
this.s3Config = s3Config; this.s3Config = s3Config;
...@@ -98,7 +100,6 @@ public class StorageServiceS3Impl implements StorageService { ...@@ -98,7 +100,6 @@ public class StorageServiceS3Impl implements StorageService {
} }
@Override @Override
// TODO should be export to S3 -> load from S3
public ExportResourceDto transformDataset(Dataset<Row> dataset) throws StorageUnavailableException { public ExportResourceDto transformDataset(Dataset<Row> dataset) throws StorageUnavailableException {
final List<Map<String, String>> inMemory = dataset.collectAsList() final List<Map<String, String>> inMemory = dataset.collectAsList()
.stream() .stream()
...@@ -146,7 +147,7 @@ public class StorageServiceS3Impl implements StorageService { ...@@ -146,7 +147,7 @@ public class StorageServiceS3Impl implements StorageService {
final String path = "s3a://" + s3Config.getS3Bucket() + "/" + key; final String path = "s3a://" + s3Config.getS3Bucket() + "/" + key;
log.atDebug() log.atDebug()
.setMessage("read dataset " + key + " using header: " + withHeader) .setMessage("read dataset " + key + " using header: " + withHeader)
.addKeyValue("s3_key", key) .addKeyValue(S3_KEY, key)
.addKeyValue("s3_bucket", s3Config.getS3Bucket()) .addKeyValue("s3_bucket", s3Config.getS3Bucket())
.addKeyValue("header", withHeader) .addKeyValue("header", withHeader)
.log(); .log();
...@@ -163,7 +164,7 @@ public class StorageServiceS3Impl implements StorageService { ...@@ -163,7 +164,7 @@ public class StorageServiceS3Impl implements StorageService {
if (exception.getSimpleMessage().contains("PATH_NOT_FOUND")) { if (exception.getSimpleMessage().contains("PATH_NOT_FOUND")) {
log.atError() log.atError()
.setMessage("Failed to find dataset " + key + " in storage service") .setMessage("Failed to find dataset " + key + " in storage service")
.addKeyValue("s3_key", key) .addKeyValue(S3_KEY, key)
.setCause(e) .setCause(e)
.log(); .log();
throw new StorageNotFoundException("Failed to find dataset in storage service: " + e.getMessage()); throw new StorageNotFoundException("Failed to find dataset in storage service: " + e.getMessage());
...@@ -171,7 +172,7 @@ public class StorageServiceS3Impl implements StorageService { ...@@ -171,7 +172,7 @@ public class StorageServiceS3Impl implements StorageService {
if (exception.getSimpleMessage().contains("UNRESOLVED_COLUMN")) { if (exception.getSimpleMessage().contains("UNRESOLVED_COLUMN")) {
log.atError() log.atError()
.setMessage("Failed to resolve column from dataset in database") .setMessage("Failed to resolve column from dataset in database")
.addKeyValue("s3_key", key) .addKeyValue(S3_KEY, key)
.setCause(e) .setCause(e)
.log(); .log();
throw new TableMalformedException("Failed to resolve column from dataset in database: " + e.getMessage()); throw new TableMalformedException("Failed to resolve column from dataset in database: " + e.getMessage());
...@@ -179,14 +180,14 @@ public class StorageServiceS3Impl implements StorageService { ...@@ -179,14 +180,14 @@ public class StorageServiceS3Impl implements StorageService {
} else if (e instanceof IllegalArgumentException) { } else if (e instanceof IllegalArgumentException) {
log.atError() log.atError()
.setMessage("Failed to map columns: " + e.getMessage()) .setMessage("Failed to map columns: " + e.getMessage())
.addKeyValue("s3_key", key) .addKeyValue(S3_KEY, key)
.setCause(e) .setCause(e)
.log(); .log();
throw new MalformedException("Failed to map columns: " + e.getMessage()); throw new MalformedException("Failed to map columns: " + e.getMessage());
} }
log.atError() log.atError()
.setMessage("Failed to connect to storage service") .setMessage("Failed to connect to storage service")
.addKeyValue("s3_key", key) .addKeyValue(S3_KEY, key)
.setCause(e) .setCause(e)
.log(); .log();
throw new StorageUnavailableException("Failed to connect to storage service: " + e.getMessage()); throw new StorageUnavailableException("Failed to connect to storage service: " + e.getMessage());
...@@ -220,14 +221,14 @@ public class StorageServiceS3Impl implements StorageService { ...@@ -220,14 +221,14 @@ public class StorageServiceS3Impl implements StorageService {
if (e instanceof ExtendedAnalysisException exception) { if (e instanceof ExtendedAnalysisException exception) {
log.atError() log.atError()
.setMessage("Failed to resolve column from dataset in database") .setMessage("Failed to resolve column from dataset in database")
.addKeyValue("s3_key", key) .addKeyValue(S3_KEY, key)
.setCause(e) .setCause(e)
.log(); .log();
throw new TableMalformedException("Failed to resolve column from dataset in database: " + exception.getSimpleMessage()); throw new TableMalformedException("Failed to resolve column from dataset in database: " + exception.getSimpleMessage());
} }
log.atError() log.atError()
.setMessage("Failed to select columns from dataset") .setMessage("Failed to select columns from dataset")
.addKeyValue("s3_key", key) .addKeyValue(S3_KEY, key)
.setCause(e) .setCause(e)
.log(); .log();
throw new MalformedException("Failed to select columns from dataset: " + e.getMessage()); throw new MalformedException("Failed to select columns from dataset: " + e.getMessage());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment