Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,32 @@ public class AwsProperties implements Serializable {
*/
public static final String S3_WRITE_TAGS_PREFIX = "s3.write.tags.";

/**
* Used by {@link S3FileIO} to tag objects when deleting. When this config is set, objects are
* tagged with the configured key-value pairs before deletion. This is considered a soft-delete,
* because users are able to configure tag-based object lifecycle policy at bucket level to
* transition objects to different tiers.
* <p>
* For more details, see https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html
* <p>
* Example: s3.delete.tags.my_key=my_val
*/
public static final String S3_DELETE_TAGS_PREFIX = "s3.delete.tags.";

/**
* Number of threads to use for adding delete tags to S3 objects, default to {@link
* Runtime#availableProcessors()}
*/
public static final String S3FILEIO_DELETE_THREADS = "s3.delete.num-threads";

/**
* Determines if {@link S3FileIO} deletes the object when io.delete() is called, default to true. Once
* disabled, users are expected to set tags through {@link #S3_DELETE_TAGS_PREFIX} and manage
* deleted files through S3 lifecycle policy.
*/
public static final String S3_DELETE_ENABLED = "s3.delete-enabled";
public static final boolean S3_DELETE_ENABLED_DEFAULT = true;

/**
* Used by {@link S3FileIO}, prefix used for bucket access point configuration.
* To set, we can pass a catalog property.
Expand Down Expand Up @@ -342,6 +368,9 @@ public class AwsProperties implements Serializable {
private ObjectCannedACL s3FileIoAcl;
private boolean isS3ChecksumEnabled;
private final Set<Tag> s3WriteTags;
private final Set<Tag> s3DeleteTags;
private int s3FileIoDeleteThreads;
private boolean isS3DeleteEnabled;
private final Map<String, String> s3BucketToAccessPointMapping;

private String glueCatalogId;
Expand All @@ -363,6 +392,9 @@ public AwsProperties() {
this.s3fileIoStagingDirectory = System.getProperty("java.io.tmpdir");
this.isS3ChecksumEnabled = S3_CHECKSUM_ENABLED_DEFAULT;
this.s3WriteTags = Sets.newHashSet();
this.s3DeleteTags = Sets.newHashSet();
this.s3FileIoDeleteThreads = Runtime.getRuntime().availableProcessors();
this.isS3DeleteEnabled = S3_DELETE_ENABLED_DEFAULT;
this.s3BucketToAccessPointMapping = ImmutableMap.of();

this.glueCatalogId = null;
Expand Down Expand Up @@ -427,6 +459,10 @@ public AwsProperties(Map<String, String> properties) {
String.format("Deletion batch size must be between 1 and %s", S3FILEIO_DELETE_BATCH_SIZE_MAX));

this.s3WriteTags = toTags(properties, S3_WRITE_TAGS_PREFIX);
this.s3DeleteTags = toTags(properties, S3_DELETE_TAGS_PREFIX);
this.s3FileIoDeleteThreads = PropertyUtil.propertyAsInt(properties, S3FILEIO_DELETE_THREADS,
Runtime.getRuntime().availableProcessors());
this.isS3DeleteEnabled = PropertyUtil.propertyAsBoolean(properties, S3_DELETE_ENABLED, S3_DELETE_ENABLED_DEFAULT);
this.s3BucketToAccessPointMapping = PropertyUtil.propertiesWithPrefix(properties, S3_ACCESS_POINTS_PREFIX);

this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME,
Expand Down Expand Up @@ -549,6 +585,26 @@ public Set<Tag> s3WriteTags() {
return s3WriteTags;
}

public Set<Tag> s3DeleteTags() {
return s3DeleteTags;
}

public int s3FileIoDeleteThreads() {
return s3FileIoDeleteThreads;
}

public void setS3FileIoDeleteThreads(int threads) {
this.s3FileIoDeleteThreads = threads;
}

public boolean isS3DeleteEnabled() {
return isS3DeleteEnabled;
}

public void setS3DeleteEnabled(boolean s3DeleteEnabled) {
this.isS3DeleteEnabled = s3DeleteEnabled;
}

private Set<Tag> toTags(Map<String, String> properties, String prefix) {
return PropertyUtil.propertiesWithPrefix(properties, prefix)
.entrySet().stream()
Expand Down
74 changes: 74 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.iceberg.aws.AwsClientFactories;
Expand All @@ -40,14 +41,22 @@
import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.SerializableSupplier;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.GetObjectTaggingResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.Tag;
import software.amazon.awssdk.services.s3.model.Tagging;

/**
* FileIO implementation backed by S3.
Expand All @@ -59,6 +68,7 @@
public class S3FileIO implements FileIO, SupportsBulkOperations {
private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
private static final String DEFAULT_METRICS_IMPL = "org.apache.iceberg.hadoop.HadoopMetricsContext";
private static volatile ExecutorService executorService;

private SerializableSupplier<S3Client> s3;
private AwsProperties awsProperties;
Expand Down Expand Up @@ -108,6 +118,18 @@ public OutputFile newOutputFile(String path) {

@Override
public void deleteFile(String path) {
if (awsProperties.s3DeleteTags() != null && !awsProperties.s3DeleteTags().isEmpty()) {
try {
tagFileToDelete(path, awsProperties.s3DeleteTags());
} catch (S3Exception e) {
LOG.warn("Failed to add delete tags: {} to {}", awsProperties.s3DeleteTags(), path, e);
}
}

if (!awsProperties.isS3DeleteEnabled()) {
return;
}

S3URI location = new S3URI(path, awsProperties.s3BucketToAccessPointMapping());
DeleteObjectRequest deleteRequest =
DeleteObjectRequest.builder().bucket(location.bucket()).key(location.key()).build();
Expand All @@ -125,6 +147,20 @@ public void deleteFile(String path) {
*/
@Override
public void deleteFiles(Iterable<String> paths) throws BulkDeletionFailureException {
if (awsProperties.s3DeleteTags() != null && !awsProperties.s3DeleteTags().isEmpty()) {
Tasks.foreach(paths)
.noRetry()
.executeWith(executorService())
.suppressFailureWhenFinished()
.onFailure((path, exc) -> LOG.warn("Failed to add delete tags: {} to {}",
awsProperties.s3DeleteTags(), path, exc))
.run(path -> tagFileToDelete(path, awsProperties.s3DeleteTags()));
}

if (!awsProperties.isS3DeleteEnabled()) {
return;
}

SetMultimap<String, String> bucketToObjects = Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet);
int numberOfFailedDeletions = 0;
for (String path : paths) {
Expand Down Expand Up @@ -155,6 +191,31 @@ public void deleteFiles(Iterable<String> paths) throws BulkDeletionFailureExcept
}
}

private void tagFileToDelete(String path, Set<Tag> deleteTags) throws S3Exception {
S3URI location = new S3URI(path, awsProperties.s3BucketToAccessPointMapping());
String bucket = location.bucket();
String objectKey = location.key();
GetObjectTaggingRequest getObjectTaggingRequest = GetObjectTaggingRequest.builder()
.bucket(bucket)
.key(objectKey)
.build();
GetObjectTaggingResponse getObjectTaggingResponse = client()
.getObjectTagging(getObjectTaggingRequest);
// Get existing tags, if any and then add the delete tags
Set<Tag> tags = Sets.newHashSet();
if (getObjectTaggingResponse.hasTagSet()) {
tags.addAll(getObjectTaggingResponse.tagSet());
}

tags.addAll(deleteTags);
PutObjectTaggingRequest putObjectTaggingRequest = PutObjectTaggingRequest.builder()
.bucket(bucket)
.key(objectKey)
.tagging(Tagging.builder().tagSet(tags).build())
.build();
client().putObjectTagging(putObjectTaggingRequest);
}

private List<String> deleteObjectsInBucket(String bucket, Collection<String> objects) {
if (!objects.isEmpty()) {
List<ObjectIdentifier> objectIds = objects
Expand Down Expand Up @@ -184,6 +245,19 @@ private S3Client client() {
return client;
}

private ExecutorService executorService() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably shutdown the ExecutorService on close of the FileIO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have made the changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we cannot close it, because it's a static variable and shared across FileIOs? Which is similar to the one in S3OutputStream. We either not close it, or make it an instance variable instead. I think not closing it is fine given we are already doing that in S3OutputStream, any thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel not closing is fine. We can keep it as static considering parity with S3OutputStream.

if (executorService == null) {
synchronized (S3FileIO.class) {
if (executorService == null) {
executorService = ThreadPools.newWorkerPool(
"iceberg-s3fileio-delete", awsProperties.s3FileIoDeleteThreads());
}
}
}

return executorService;
}

@Override
public void initialize(Map<String, String> properties) {
this.awsProperties = new AwsProperties(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public class TestS3OutputStream {
AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024),
AwsProperties.S3FILEIO_STAGING_DIRECTORY, tmpDir.toString(),
"s3.write.tags.abc", "123",
"s3.write.tags.def", "789"));
"s3.write.tags.def", "789",
"s3.delete.tags.xyz", "456"));

public TestS3OutputStream() throws IOException {
}
Expand Down