-
Notifications
You must be signed in to change notification settings - Fork 3k
AWS: Add S3 delete tagging #4342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
510dbc4 to
b074d95
Compare
singhpk234
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rajarshisarkar !! for the change
b074d95 to
334775c
Compare
| Tasks.foreach(paths) | ||
| .noRetry() | ||
| .suppressFailureWhenFinished() | ||
| .onFailure((path, exc) -> LOG.warn("Failed to add delete tags: {}", path, exc)) | ||
| .run(this::doSoftDelete); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a configurable threadpool here? So we can get better throughput on the fetching and updating of tags. I've been thinking S3FileIO should implicitly have a threadpool but maybe it does not fit in the FileIO model
Tasks.foreach(paths)
.noRetry()
.executeWith(s3FileIoThreadPool)
.suppressFailureWhenFinished()
.onFailure((path, exc) -> LOG.warn("Failed to add delete tags: {}", path, exc))
.run(this::doSoftDelete);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, we can follow what we did in https://github.com/apache/iceberg/blob/master/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java#L110-L122 for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have introduced the executor service configurable via s3.delete.tags.num-threads.
| .getObjectTagging(getObjectTaggingRequest); | ||
| // Get existing tags, if any and then add the delete tags | ||
| Set<Tag> tags = Sets.newHashSet(); | ||
| if (getObjectTaggingResponse != null && getObjectTaggingResponse.hasTagSet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think getObjectTaggingResponse would ever be null, we can at least trust AWS client with that :)
| if (getObjectTaggingResponse != null && getObjectTaggingResponse.hasTagSet()) { | ||
| tags.addAll(getObjectTaggingResponse.tagSet()); | ||
| } | ||
| tags.addAll(awsProperties.s3DeleteTags()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline after if block
|
I'm not sure I really follow why we would do this. It seems like we're trying to overlap use cases by highjacking delete behavior. This also requires that the configuration of the bucket aligns with the tags to actually achieve the desired behavior. Overall, it feels like this is a bit of a reach in terms of functionality. |
Thanks for the feedback Daniel! I think this is not trying to highjack the delete behavior. Based on what we've heard from some key customers, tagging an object is now the preferred S3 delete implementation in many companies. It is typically a sev0 in a corporation for data loss, so the data platform team prefers to gradually retire the delete files to different tiers and let S3 handle the lifecycle transition. Note that this is not a discussion of table deletion/snapshot expiration with purge or not. For platforms that allow people to do table deletion with purge, I would imagine this feature to be preferred where users could accidentally run a DROP TABLE. And even for platform that does deletion without purge, eventually there is a team who does the "purge" part, and based on the customer discussions we have, this is typically what they do by tagging files and then define lifecycle policy so S3 takes care of the transition of files.
This can be explained in 3 parts:
Let me know what do you think about this area of topic, if you have any better solutions than the proposed one. I think this is a small feature that could enable a lot of valuable use cases. Similar to the access point feature we are adding in S3FileIO, we consider that an intermediate solution which could be solved also at Iceberg level with larger spec change to support relative file path, I think there is value in offering this feature for people who knows how to use it, while we think of anything better that could solve such retention at Iceberg spec level. |
|
@danielcweeks, @jackye1995, I think there's definitely value in being able to add tags to let lifecycle policies handle this. But I find it really confusing that adding a delete tag prevents the object from being deleted and instead just tags it. I think that tagging an object before deleting it and skipping the delete because the tag takes care of it should be separate configurations. It's reasonable for someone to replace deletes with tags, or tag an object and still delete it. I can also imagine a situations where client-side deletes aren't allowed and the warehouse relies on an orphan files service. That means all of the combinations of "skip delete" and "tag before delete" are valid use cases, so we should separate those. How about implementing this with a |
Yes totally agree. However, should it be I thought about the 4 possible combinations, and I did not ask for an additional
|
|
@jackye1995, for case 1, I thought tags would remain on objects in versioned buckets that are deleted with a delete marker. So you could use a tag to control when the old version gets cleaned up, but logically delete it immediately. Isn't that a valid case? For the question about First, if the intent is to replace the S3 delete with a tag operation instead, then it seems like you wouldn't want to disable deletes across all storage for a table. If you have files stored across multiple schemes, then it doesn't make sense to disable deletes for other schemes. Second, |
|
I have implemented the |
f0c8f91 to
fc553b4
Compare
| if (executorService == null) { | ||
| synchronized (S3OutputStream.class) { | ||
| if (executorService == null) { | ||
| executorService = MoreExecutors.getExitingExecutorService( | ||
| (ThreadPoolExecutor) Executors.newFixedThreadPool( | ||
| awsProperties.s3FileIoDeleteTagThreads(), | ||
| new ThreadFactoryBuilder() | ||
| .setDaemon(true) | ||
| .setNameFormat("iceberg-s3fileio-deletetags-%d") | ||
| .build())); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we shift this to a private function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a utility for this that we can leverage. ThreadPools.newWorkerPool(String namePrefix, int poolSize)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, for letting me know about the utility method.
| .suppressFailureWhenFinished() | ||
| .onFailure((path, exc) -> LOG.warn("Failed to add delete tags: {}", path, exc)) | ||
| .run(this::doSoftDelete); | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not return here as the flow will newer reach below if deleteTagging is enabled
| private transient S3Client client; | ||
| private MetricsContext metrics = MetricsContext.nullMetrics(); | ||
| private final AtomicBoolean isResourceClosed = new AtomicBoolean(false); | ||
| private static volatile ExecutorService executorService; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] Any reason, executor service need's to static ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: static variable should be placed before instance variables
|
|
||
| /** | ||
| * Used by {@link S3FileIO} to tag objects when deleting. When this config is set, objects are | ||
| * tagged with the configured key-value pairs instead of being hard-deleted. This is considered a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
based on the new behavior, the line here should say "objects are tagged with the configured key-value pairs before deletion"
| public static final String S3FILEIO_DELETE_TAGS_THREADS = "s3.delete.tags.num-threads"; | ||
|
|
||
| /** | ||
| * Used by {@link S3FileIO} to delete files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description here is not accurate, should say something like "Determines if S3FileIO delete the object when io.delete() is called, default to true. Once disabled, users are expected to set tags through S3_DELETE_TAGS_PREFIX and manage deleted files through S3 lifecycle policy"
| } | ||
| } | ||
|
|
||
| private void doSoftDelete(String path) throws S3Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the method should now be named "tagFileToDelete"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or a generic (still helper) tagFile(String path, List deleteTags). the caller, in this case deleteFiles, would be responsible for passing in the deletion tags.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made the changes. Named it as tagFileToDelete(String path, Set<Tag> deleteTags).
| this.s3 = s3; | ||
| this.awsProperties = awsProperties; | ||
| if (executorService == null) { | ||
| synchronized (S3OutputStream.class) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be S3FileIO.class
fc553b4 to
60c32d5
Compare
|
@jackye1995 I think we need to really clarify the use cases and make sure we're coming up with a solution that really addresses those. This may be a good topic for the next sync as well, since it's getting a little involved in terms of object lifecycle management in S3. I would say the typical uses cases would be: Versioned Bucket: Life-cycle policy for non-current versions+delete markers with direct delete as there's no requirement for tags. Non-Versioned Bucket: Life-cycle policy for tagged objects and no direct delete (deleting after tagging doesn't really make sense). I'm not sure if there's a use case for both versioned bucket and tagging for delete. I would also question the use of life-cycle for transitioning tagged objects since there's a cost associated with the tagging, the transition and, depending on the tier, a minimum charge. It feels like that the use case where it becomes more cost effective gets narrow pretty quickly (as compared to just versioned bucket). Given that there are these different strategies, we might actually want to separate this behavior into an interface for |
|
I talked with Dan about this and he made me question my take on |
60c32d5 to
11d7023
Compare
Yes, after doing some more research I think this is the right conclusion. I thought there was such a use case based on what @rdblue describe, but I've never heard of any. Regarding having a I don't know how this interface would look like though, do you have anything in your mind? Because unlike bulk operation, this is just conceptually a "delete" in the user's mind, which should ideally reuse the existing delete interface. So the strategy feels more like a config to me, where we can have |
|
|
||
| /** | ||
| * Determines if S3FileIO delete the object when io.delete() is called, default to true. Once | ||
| * disabled, users are expected to set tags through S3_DELETE_TAGS_PREFIX and manage deleted files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: {@link #S3_DELETE_TAGS_PREFIX}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made the changes.
|
@danielcweeks do you have any additional suggestions? If not I think the current approach that Ryan suggested and the current code path looks good to me. |
| public static final String S3FILEIO_DELETE_THREADS = "s3.delete.num-threads"; | ||
|
|
||
| /** | ||
| * Determines if S3FileIO delete the object when io.delete() is called, default to true. Once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
| * Determines if S3FileIO delete the object when io.delete() is called, default to true. Once | |
| * Determines if S3FileIO deletes the object when io.delete() is called, default to true. Once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made the changes.
| try { | ||
| tagFileToDelete(path, awsProperties.s3DeleteTags()); | ||
| } catch (S3Exception e) { | ||
| LOG.warn("Failed to add delete tags: {}", path, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] should we log both deleteTag as well as path :
| LOG.warn("Failed to add delete tags: {}", path, e); | |
| LOG.warn("Failed to add delete tags: {} to {}", awsProperties.s3DeleteTags().toString(), path, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the changes.
| } | ||
|
|
||
| private void tagFileToDelete(String path, Set<Tag> deleteTags) throws S3Exception { | ||
| S3URI location = new S3URI(path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we wire in s3BucketToAccessPointMapping, to use access point in tag manipulation.
Adding Tag's via access point is an allowed operation : https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-points-usage-examples.html#add-tag-set-ap i.e both GetObjectTagging as well as PutObjectTagging operations are allowed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, missed on adding this after merging master.
| return client; | ||
| } | ||
|
|
||
| private ExecutorService executorService() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably shutdown the ExecutorService on close of the FileIO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have made the changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we cannot close it, because it's a static variable and shared across FileIOs? Which is similar to the one in S3OutputStream. We either not close it, or make it an instance variable instead. I think not closing it is fine given we are already doing that in S3OutputStream, any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel not closing is fine. We can keep it as static considering parity with S3OutputStream.
|
@rajarshisarkar and @jackye1995 One small comment about proper shutdown above, but other than that I'm generally ok with this. I think we can tackle refactoring some of this logic later (the S3FileIO is now mostly bulk/delete/tagging, so I think there's an opportunity to clean this up). It is important to note that for the bulk use cases, tagging will be quite a bit slower since we have to tag each object individually. That may not scale particularly well, but at least the option to tag is there. |
|
This looks good to me, the only point with concern now is the executor service. I think we can merge this, and address the issue if there is a better way from @danielcweeks, otherwise the current implementation is consistent with what we had in |
|
Thanks for getting this in, @amogh-jahagirdar and @jackye1995! |
This change adds S3 Tags to the objects while deleting using
S3FileIO. Users can pass their custom tags as part of the catalog properties. This would allow the users to manage their storage lifecycle.Spark SQL launch command:
Tags in S3 after delete:
Note:
my_key=my_valandmy_key2=my_val2are the tags when the object was written.cc: @jackye1995 @arminnajafi @singhpk234 @amogh-jahagirdar @xiaoxuandev @yyanyy