Skip to content
Snippets Groups Projects
Commit 8768274c authored by Martin Weise's avatar Martin Weise
Browse files

Merge branch '513-scrape-old-datasets' into 'dev'

Added scheduled scraping that removes stale datasets

See merge request !399
parents d0295afa b5bb8e82
Branches
Tags
2 merge requests!400Need assets path,!399Added scheduled scraping that removes stale datasets
Showing
with 109 additions and 10 deletions
...@@ -46,12 +46,18 @@ everytime e.g. a sensor measurement is inserted. By default, this information is ...@@ -46,12 +46,18 @@ everytime e.g. a sensor measurement is inserted. By default, this information is
administrators can disable this behavior by setting `CREDENTIAL_CACHE_TIMEOUT=0` (cache is deleted after 0 seconds). administrators can disable this behavior by setting `CREDENTIAL_CACHE_TIMEOUT=0` (cache is deleted after 0 seconds).
## Upload ## Storage
The Data Service also is capable to upload files to the S3 backend. The default limit The Data Service also is capable to upload files to the S3 backend. The default limit
of [`Tomcat`](https://spring.io/guides/gs/uploading-files#_tuning_file_upload_limits) in Spring Boot is configured to be of [`Tomcat`](https://spring.io/guides/gs/uploading-files#_tuning_file_upload_limits) in Spring Boot is configured to be
`2GB`. You can provide your own limit with setting `MAX_UPLOAD_SIZE`. `2GB`. You can provide your own limit with setting `MAX_UPLOAD_SIZE`.
By default, the Data Service removes datasets older than 24 hours on a regular basis every 60 minutes. You can set the
`MAX_AGE` (in seconds) and `S3_STALE_CRON` to fit your use-case. You can disable this feature by setting `S3_STALE_CRON`
to `-`, this may lead to storage issues as no space will be available inevitably. Note
that [Spring Boot uses its own flavor](https://spring.io/blog/2020/11/10/new-in-spring-5-3-improved-cron-expressions#usage)
of cron syntax.
## Limitations ## Limitations
* Views in DBRepo can only have 63-character length (it is assumed only internal views have the maximum length of 64 * Views in DBRepo can only have 63-character length (it is assumed only internal views have the maximum length of 64
......
No preview for this file type
...@@ -3,13 +3,15 @@ package at.tuwien; ...@@ -3,13 +3,15 @@ package at.tuwien;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@Log4j2 @Log4j2
@EnableScheduling
@SpringBootApplication @SpringBootApplication
public class DbrepoDataServiceApplication { public class DataServiceApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(DbrepoDataServiceApplication.class, args); SpringApplication.run(DataServiceApplication.class, args);
} }
} }
...@@ -64,6 +64,8 @@ dbrepo: ...@@ -64,6 +64,8 @@ dbrepo:
accessKeyId: "${S3_ACCESS_KEY_ID:seaweedfsadmin}" accessKeyId: "${S3_ACCESS_KEY_ID:seaweedfsadmin}"
secretAccessKey: "${S3_SECRET_ACCESS_KEY:seaweedfsadmin}" secretAccessKey: "${S3_SECRET_ACCESS_KEY:seaweedfsadmin}"
bucket: "${S3_BUCKET:dbrepo}" bucket: "${S3_BUCKET:dbrepo}"
maxAge: "${S3_MAX_AGE:86400}"
cron: "${S3_STALE_CRON:0 */60 * * * *}"
system: system:
username: "${SYSTEM_USERNAME:admin}" username: "${SYSTEM_USERNAME:admin}"
password: "${SYSTEM_PASSWORD:admin}" password: "${SYSTEM_PASSWORD:admin}"
......
package at.tuwien.service; package at.tuwien.service;
import at.ac.tuwien.ifs.dbrepo.core.api.ExportResourceDto; import at.ac.tuwien.ifs.dbrepo.core.api.ExportResourceDto;
import at.tuwien.config.S3Config;
import at.ac.tuwien.ifs.dbrepo.core.exception.MalformedException; import at.ac.tuwien.ifs.dbrepo.core.exception.MalformedException;
import at.ac.tuwien.ifs.dbrepo.core.exception.StorageNotFoundException; import at.ac.tuwien.ifs.dbrepo.core.exception.StorageNotFoundException;
import at.ac.tuwien.ifs.dbrepo.core.exception.StorageUnavailableException; import at.ac.tuwien.ifs.dbrepo.core.exception.StorageUnavailableException;
import at.ac.tuwien.ifs.dbrepo.core.exception.TableMalformedException; import at.ac.tuwien.ifs.dbrepo.core.exception.TableMalformedException;
import at.ac.tuwien.ifs.dbrepo.core.test.BaseTest; import at.ac.tuwien.ifs.dbrepo.core.test.BaseTest;
import at.tuwien.config.S3Config;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
...@@ -31,6 +31,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; ...@@ -31,6 +31,7 @@ import org.testcontainers.junit.jupiter.Testcontainers;
import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import java.io.*; import java.io.*;
...@@ -232,6 +233,35 @@ public class StorageServiceIntegrationTest extends BaseTest { ...@@ -232,6 +233,35 @@ public class StorageServiceIntegrationTest extends BaseTest {
assertEquals("", lines.get(0)); assertEquals("", lines.get(0));
} }
@Test
public void deleteStaleObjects_none_succeeds() {
/* mock */
s3Client.putObject(PutObjectRequest.builder()
.key("s3key")
.bucket(s3Config.getS3Bucket())
.build(), RequestBody.fromFile(new File("src/test/resources/csv/weather_aus.csv")));
/* test */
storageService.deleteStaleObjects();
assertEquals(1, s3Client.listObjects(ListObjectsRequest.builder().bucket(s3Config.getS3Bucket()).build()).contents().size());
}
@Test
public void deleteStaleObjects_succeeds() throws InterruptedException {
/* mock */
s3Client.putObject(PutObjectRequest.builder()
.key("s3key")
.bucket(s3Config.getS3Bucket())
.build(), RequestBody.fromFile(new File("src/test/resources/csv/weather_aus.csv")));
/* test */
Thread.sleep(4000);
storageService.deleteStaleObjects();
assertEquals(0, s3Client.listObjects(ListObjectsRequest.builder().bucket(s3Config.getS3Bucket()).build()).contents().size());
}
@ParameterizedTest @ParameterizedTest
@Disabled("cannot fix") @Disabled("cannot fix")
@MethodSource("loadDataset_arguments") @MethodSource("loadDataset_arguments")
......
...@@ -33,3 +33,4 @@ spring.rabbitmq.password=guest ...@@ -33,3 +33,4 @@ spring.rabbitmq.password=guest
# s3 # s3
dbrepo.s3.accessKeyId=minioadmin dbrepo.s3.accessKeyId=minioadmin
dbrepo.s3.secretAccessKey=minioadmin dbrepo.s3.secretAccessKey=minioadmin
dbrepo.s3.maxAge=3
...@@ -30,6 +30,9 @@ public class S3Config { ...@@ -30,6 +30,9 @@ public class S3Config {
@Value("${dbrepo.s3.bucket}") @Value("${dbrepo.s3.bucket}")
private String s3Bucket; private String s3Bucket;
@Value("${dbrepo.s3.maxAge}")
private Integer maxAge;
@Bean @Bean
public S3Client s3client() { public S3Client s3client() {
final AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider.create( final AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(
......
...@@ -9,6 +9,7 @@ import org.apache.spark.sql.Dataset; ...@@ -9,6 +9,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import java.io.InputStream; import java.io.InputStream;
import java.time.Instant;
import java.util.List; import java.util.List;
public interface StorageService { public interface StorageService {
...@@ -47,6 +48,10 @@ public interface StorageService { ...@@ -47,6 +48,10 @@ public interface StorageService {
*/ */
byte[] getBytes(String bucket, String key) throws StorageUnavailableException, StorageNotFoundException; byte[] getBytes(String bucket, String key) throws StorageUnavailableException, StorageNotFoundException;
void deleteObject(String bucket, String key);
void deleteStaleObjects();
/** /**
* Loads an object of the default export bucket from the Storage Service into an export resource. * Loads an object of the default export bucket from the Storage Service into an export resource.
* *
......
package at.tuwien.service.impl; package at.tuwien.service.impl;
import at.ac.tuwien.ifs.dbrepo.core.api.ExportResourceDto; import at.ac.tuwien.ifs.dbrepo.core.api.ExportResourceDto;
import at.tuwien.config.S3Config;
import at.ac.tuwien.ifs.dbrepo.core.exception.MalformedException; import at.ac.tuwien.ifs.dbrepo.core.exception.MalformedException;
import at.ac.tuwien.ifs.dbrepo.core.exception.StorageNotFoundException; import at.ac.tuwien.ifs.dbrepo.core.exception.StorageNotFoundException;
import at.ac.tuwien.ifs.dbrepo.core.exception.StorageUnavailableException; import at.ac.tuwien.ifs.dbrepo.core.exception.StorageUnavailableException;
import at.ac.tuwien.ifs.dbrepo.core.exception.TableMalformedException; import at.ac.tuwien.ifs.dbrepo.core.exception.TableMalformedException;
import at.tuwien.config.S3Config;
import at.tuwien.service.StorageService; import at.tuwien.service.StorageService;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
...@@ -17,13 +17,12 @@ import org.springframework.core.io.InputStreamResource; ...@@ -17,13 +17,12 @@ import org.springframework.core.io.InputStreamResource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.*;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import java.io.*; import java.io.*;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
...@@ -91,6 +90,30 @@ public class StorageServiceS3Impl implements StorageService { ...@@ -91,6 +90,30 @@ public class StorageServiceS3Impl implements StorageService {
} }
} }
@Override
public void deleteObject(String bucket, String key) {
log.trace("delete object with key {} from bucket: {}", key, bucket);
s3Client.deleteObject(DeleteObjectRequest.builder()
.bucket(bucket)
.key(key)
.build());
}
@Override
public void deleteStaleObjects() {
log.trace("list stale objects in bucket: {}", s3Config.getS3Bucket());
final List<String> keys = s3Client.listObjects(ListObjectsRequest.builder()
.bucket(s3Config.getS3Bucket())
.build())
.contents()
.stream()
.filter(o -> o.lastModified().isBefore(Instant.now().minus(s3Config.getMaxAge(), ChronoUnit.SECONDS)))
.map(S3Object::key)
.toList();
keys.forEach(key -> deleteObject(s3Config.getS3Bucket(), key));
log.info("Deleted {} stale object(s) in bucket: {}", keys.size(), s3Config.getS3Bucket());
}
@Override @Override
public ExportResourceDto getResource(String key) throws StorageNotFoundException, StorageUnavailableException { public ExportResourceDto getResource(String key) throws StorageNotFoundException, StorageUnavailableException {
return getResource(s3Config.getS3Bucket(), key); return getResource(s3Config.getS3Bucket(), key);
......
package at.tuwien.timer;
import at.tuwien.service.StorageService;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Log4j2
@Component
public class StaleObjectTimer {
private final StorageService storageService;
@Autowired
public StaleObjectTimer(StorageService storageService) {
this.storageService = storageService;
}
@Scheduled(cron = "${dbrepo.s3.cron}")
public void deleteStaleObjects() {
storageService.deleteStaleObjects();
}
}
...@@ -8,7 +8,9 @@ import lombok.extern.log4j.Log4j2; ...@@ -8,7 +8,9 @@ import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*; import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.S3Exception;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
......
No preview for this file type
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment