-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark: Add an action to remove all referenced files #2415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| return new ExpireSnapshotsAction(delegate); | ||
| } | ||
|
|
||
| public BaseDropTableSparkAction dropTableAction() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not add it to the old Actions API which we are about to deprecate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I know why this is added here. We don't have an implementation of ActionsProvider yet. I am about to add it so we will move this logic there. It is ok to keep it here for now but we will need to wait for a new entry point before merging this.
spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
|
We will have to decide what to do with the root table dir and other dirs where our data can be. |
We have api for file deletion only( |
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveFilesSparkAction.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| private Dataset<Row> appendTypeString(Dataset<Row> ds, String type) { | ||
| return ds.select(new Column("file_path"), functions.lit(type).as("file_type")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not important here, but let's raise an issue to change this sort of thing to Constants, I think this would involve making some things public in metadata table apis so maybe out of scope here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be able to put as part of BaseSparkAction since it's used in BaseExpireSnapshotsSparkAction too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is also a bit weird as it not only adds file type, it also projects file_path.
We could call it projectFilePathWithType or something.
Also, I'd use functions.col() instead of new Column for consistency.
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveFilesSparkAction.java
Outdated
Show resolved
Hide resolved
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few minor comments, I think the Spark UI Job name needs a change and we should probably only do the shuffle when required rather than preemptively in the base class
| checkDropTableResults(3L, 4L, 3L, results); | ||
|
|
||
| Assert.assertTrue( | ||
| String.format("Expected total jobs to be equal to total number of shuffle partitions", SHUFFLE_PARTITIONS, totalJobsRun), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi @RussellSpitzer Since we dedup the values as part of the dataset, the number of elements in iterator will be equal to the number of shuffle partitions.
|
|
||
| /** | ||
| * Passes an alternative delete implementation that will be used for manifests and data files. | ||
| * <p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think we need the <p> at the end of doc.
| * Passes an alternative delete implementation that will be used for manifests and data files. | ||
| * <p> | ||
| * | ||
| * @param deleteFunc a function that will be called to delete manifests and data files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you add more description for what the string in the consumer means for the function? It might be not clear for the readers.
| long deletedManifestListsCount(); | ||
|
|
||
| /** | ||
| * Returns the number of files deleted other than data, manifest and manifest list. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how are we treating delete files? We do we not have another long deletedDeleteFilesCount()?
I know we don't have utils like buildValidDeleteFileDF yet, but I think it would be good to start add those in, and this action seems to be a good one to add that, maybe collaborate a bit on the timing with #2518.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the doc here should be a bit more descriptive. If I understand correctly, this count will represent things like version hint, json files, etc but not delete files. Removed delete files count should be a separate top-level method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's refine the doc to reflect the concern @jackye1995 mentioned.
| * Returns the number of deleted data files. | ||
| */ | ||
| long deletedDataFilesCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ContentFiles since I guess positional/equality delete files will also be deleted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd expose the positional/equality delete counts in a separate method.
| } | ||
|
|
||
| private Result doExecute() { | ||
| boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I'm not super familiar with other actions' code base, how would this be set? looks like it's used in ExpireSnapshotsAction but it seems like just for passing a parameter to BaseExpireSnapshotsSparkAction and is not something user can control, and here we don't really use this base class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The user can pass this config as part of options
Actions.forTable(table).removeFilesAction().option("stream-results", "true").execute()
| } | ||
|
|
||
| private Dataset<Row> appendTypeString(Dataset<Row> ds, String type) { | ||
| return ds.select(new Column("file_path"), functions.lit(type).as("file_type")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be able to put as part of BaseSparkAction since it's used in BaseExpireSnapshotsSparkAction too
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveFilesSparkAction.java
Outdated
Show resolved
Hide resolved
| } | ||
| long minTimeStamp = Long.MAX_VALUE; | ||
| String minMetadataLocation = null; | ||
| TableMetadata metadata = TableMetadataParser.read(io, metadataFileLocation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this seems to be running recursively, is it possible that a previous metadata file's "previousFiles" is already cleaned up an no longer exist, and here when we read a non existing file it will throw exception?
| public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec) { | ||
| TestIcebergSourceHiveTables.currentIdentifier = ident; | ||
| return TestIcebergSourceHiveTables.catalog.createTable(ident, schema, spec); | ||
| Table table = TestIcebergSourceHiveTables.catalog.createTable(ident, schema, spec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this change?
core/src/main/java/org/apache/iceberg/actions/RemoveFilesActionResult.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RemoveFilesActionResult.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveFilesSparkAction.java
Outdated
Show resolved
Hide resolved
| private Consumer<String> deleteFunc = defaultDelete; | ||
| private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE; | ||
|
|
||
| public BaseRemoveFilesSparkAction(SparkSession spark, Table table) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main purpose of this action is to remove files once a table dropped. How do we see that happening? Will we try to load the table before drop, then drop the table, then call this action to cleanup by passing the table object we loaded before the drop? That way we get the correct fileIO?
cc @RussellSpitzer @rdblue too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could accept a metadata file location as an alternative.
actions.removeFiles(jsonFileLocation)
.fileIO(io)
.execute();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the second option sends a message the table has been dropped and we just point to a version file. In addition, this will enable removing files not only during DROP but also later. Users won't need to construct StaticTableOperations at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like passing the JSON file location, but we would probably want to make it clear that it deletes the whole tree referenced by the metadata file. I don't think that's clear from just the code sample above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, any ideas on naming or other ways to convey that?
removeFileTree(jsonFileLocation) or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I was wondering too, if we call this to drop the files without the table being dropped, doesn't it leave behind a broken table in the catalog that cannot be loaded anymore?
Best is if we can drop the table itself in the same call but probably that's not possible. Maybe as another option, this still is an action on Table but it commits back a single empty metadata like create-table? Then it still keeps atomicity, and also we can expose this action via SparkSQL procedure, to run before dropping a big table, and not worry about broken table if user fails to actually drop the table.
Metadata json option could be a good indication that table needs to be dropped before, but there's no opportunity to sanity check that (as it break the table otherwise)? It's also a bit harder to expose this action eventually to Spark or Hive users via procedures, given metadata location is harder to get there.
That being said, I think this is a nice feature (catalog.dropTable with purge=true can often timeout), and drop table is rare enough that I might be over-complicating it, I was just considering that Iceberg actions in general are atomic and do not have potential to break the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I consider this action as something we would mostly use from Iceberg code. For example, we currently drop the table and then call CatalogUtil$dropTableData that will try to clean as much as possible but may also fail at some point. I see this as a replacement of that logic that should scale much better as we will use metadata tables.
I hope users will not really interact with this action and just use the regular DROP with or without purge. The only case when someone would need to invoke this manually is if something went wrong during DROP. In that case, it seems reasonable to ask the user to check whether the table was dropped and find a recent metadata file to use for cleaning the file tree. By calling this as RemoveReachableFiles and accepting a metadata location instead of a table we send a message it is something that is invoked after DROP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, yea if we can use this code as part of drop, that would be great.
I did hit issues where drop fails, and leaves behind files but the table is already gone. In this case, deleting the metadata json last makes good sense for recovery, thanks for the context, I suppose we can add this documentation of the action if not done already.
8206e30 to
aa2ea82
Compare
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me except the public method name. I think it should match the action name.
| /** | ||
| * Instantiates an action to remove all the files referenced by given metadata location. | ||
| */ | ||
| default RemoveReachableFiles removeFiles(String metadataLocation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name of the method should match the name of the action: removeReachableFiles.
| * Instantiates an action to remove all the files referenced by given metadata location. | ||
| */ | ||
| default RemoveReachableFiles removeFiles(String metadataLocation) { | ||
| throw new UnsupportedOperationException(this.getClass().getName() + " does not implement removeFiles"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment should refer to removeReachableFiles as well.
| package org.apache.iceberg.actions; | ||
|
|
||
| public class TestExpireSnapshotsAction24 extends TestExpireSnapshotsAction{ | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed?
| */ | ||
| default RemoveReachableFiles removeReachableFiles(String metadataLocation) { | ||
| throw new UnsupportedOperationException(this.getClass().getName() + " does not implement " + | ||
| RemoveReachableFiles.class.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the others use the method name in the api and not the class name of the api
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| /** | ||
| * Passes an alternative executor service that will be used for files removal. | ||
| * <p> | ||
| * If this method is not called, files will still be deleted in the current thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: will still be deleted -> will be deleted
|
|
||
| @Override | ||
| public Result execute() { | ||
| Preconditions.checkArgument(io != null, "File IO cannot be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably fine here, but could put the precondition in the "io(FileIO fileIO)" method for just a little earlier erroring
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tiny nits, looks good to me
|
|
||
| private Consumer<String> removeFunc = defaultDelete; | ||
| private ExecutorService removeExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE; | ||
| private FileIO io = new HadoopFileIO(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we cannot use this constructor for HadoopFileIO. This will result in a NPE as it is only meant for dynamic instantiation. It should use the underlying Hadoop conf from the session.
new HadoopFileIO(spark().sessionState().newHadoopConf())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test that will use the default IO? I think it would currently fail.
| */ | ||
| default RemoveReachableFiles removeReachableFiles(String metadataLocation) { | ||
| throw new UnsupportedOperationException(this.getClass().getName() + " does not implement" + | ||
| " removeReachableFiles"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be on one line?
|
Thanks a lot, @karuppayya! This is going to be a valuable contribution. Looking forward to PRs that will use this logic next. Thanks everyone for reviewing! |
Merge remote-tracking branch 'upstream/merge-master-20210816' into master ## 该MR主要解决什么? merge upstream/master,引入最近的一些bugFix和优化 ## 该MR的修改是什么? 核心关注PR: > Predicate PushDown 支持,https://github.com/apache/iceberg/pull/2358, https://github.com/apache/iceberg/pull/2926, https://github.com/apache/iceberg/pull/2777/files > Spark场景写入空dataset 报错问题,直接skip掉即可, apache#2960 > Flink UI补充uidPrefix到operator方便跟踪多个iceberg sink任务, apache#288 > Spark 修复nested Struct Pruning问题, apache#2877 > 可以使用Table Properties指定创建v2 format表,apache#2887 > 补充SortRewriteStrategy框架,逐步支持不同rewrite策略, apache#2609 (WIP:apache#2829) > Spark 为catalog配置hadoop属性支持, apache#2792 > Spark 针对timestamps without timezone读写支持, apache#2757 > Spark MicroBatch支持配置属性skip delete snapshots, apache#2752 > Spark V2 RewriteDatafilesAction 支持 > Core: Add validation for row-level deletes with rewrites, apache#2865 > schema time travel 功能相关,补充schema-id, Core: add schema id to snapshot > Spark Extension支持identifier fields操作, apache#2560 > Parquet: Update to 1.12.0, apache#2441 > Hive: Vectorized ORC reads for Hive, apache#2613 > Spark: Add an action to remove all referenced files, apache#2415 ## 该MR是如何测试的? UT
No description provided.