-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 3.3: Add RemoveDanglingDeletes action #6581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.actions; | ||
|
|
||
| import java.util.List; | ||
| import org.apache.iceberg.DeleteFile; | ||
|
|
||
| /** | ||
| * An action that removes dangling delete files from the current snapshot. A delete file is dangling | ||
| * if its deletes no longer applies to any data file. | ||
| * | ||
| * <p>The following dangling delete files are removed: | ||
| * | ||
| * <ul> | ||
| * <li>Position delete files with a sequence number less than that of any data file in the same | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is more of a technical detail right? Not sure we need it in the java doc |
||
| * partition | ||
| * <li>Equality delete files with a sequence number less than or equal to that of any data file in | ||
| * the same partition | ||
| * </ul> | ||
| */ | ||
| public interface RemoveDanglingDeleteFiles | ||
| extends Action<RemoveDanglingDeleteFiles, RemoveDanglingDeleteFiles.Result> { | ||
|
|
||
| /** The action result that contains a summary of the execution. */ | ||
| interface Result { | ||
| /** Removes representation of removed delete files. */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this comment just be "List of removed delete files"? |
||
| List<DeleteFile> removedDeleteFiles(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.actions; | ||
|
|
||
| import java.util.List; | ||
| import org.apache.iceberg.DeleteFile; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
|
|
||
| public class RemoveDanglingDeleteFilesActionResult implements RemoveDanglingDeleteFiles.Result { | ||
|
|
||
| private static final RemoveDanglingDeleteFilesActionResult EMPTY = | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need this? Seems like in the code we can just add new RemoveDanglingDeleteFileActionResult(Collections.emptyList()) Doesn't seem like we really are making that many of these objects |
||
| new RemoveDanglingDeleteFilesActionResult(ImmutableList.of()); | ||
|
|
||
| private List<DeleteFile> removedDeleteFiles; | ||
|
|
||
| public RemoveDanglingDeleteFilesActionResult(List<DeleteFile> removeDeleteFiles) { | ||
| this.removedDeleteFiles = removeDeleteFiles; | ||
| } | ||
|
|
||
| public static RemoveDanglingDeleteFilesActionResult empty() { | ||
| return EMPTY; | ||
| } | ||
|
|
||
| @Override | ||
| public List<DeleteFile> removedDeleteFiles() { | ||
| return removedDeleteFiles; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ | |
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.function.Consumer; | ||
| import java.util.function.Function; | ||
| import java.util.function.Supplier; | ||
| import org.apache.iceberg.AllManifestsTable; | ||
| import org.apache.iceberg.BaseTable; | ||
|
|
@@ -58,20 +59,26 @@ | |
| import org.apache.iceberg.spark.JobGroupUtils; | ||
| import org.apache.iceberg.spark.SparkTableUtil; | ||
| import org.apache.iceberg.spark.source.SerializableTableWithSize; | ||
| import org.apache.iceberg.util.PropertyUtil; | ||
| import org.apache.iceberg.util.Tasks; | ||
| import org.apache.spark.SparkContext; | ||
| import org.apache.spark.api.java.JavaSparkContext; | ||
| import org.apache.spark.api.java.function.FlatMapFunction; | ||
| import org.apache.spark.api.java.function.MapFunction; | ||
| import org.apache.spark.broadcast.Broadcast; | ||
| import org.apache.spark.sql.Column; | ||
| import org.apache.spark.sql.Dataset; | ||
| import org.apache.spark.sql.Row; | ||
| import org.apache.spark.sql.SparkSession; | ||
| import org.apache.spark.sql.internal.SQLConf; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| abstract class BaseSparkAction<ThisT> { | ||
|
|
||
| public static final String USE_CACHING = "use-caching"; | ||
| public static final boolean USE_CACHING_DEFAULT = true; | ||
|
|
||
| protected static final String MANIFEST = "Manifest"; | ||
| protected static final String MANIFEST_LIST = "Manifest List"; | ||
| protected static final String OTHERS = "Others"; | ||
|
|
@@ -361,4 +368,25 @@ static FileInfo toFileInfo(ContentFile<?> file) { | |
| return new FileInfo(file.path().toString(), file.content().toString()); | ||
| } | ||
| } | ||
|
|
||
| protected <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) { | ||
| Dataset<T> reusableDS; | ||
| boolean useCaching = | ||
| PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT); | ||
| if (useCaching) { | ||
| reusableDS = ds.cache(); | ||
| } else { | ||
| int parallelism = SQLConf.get().numShufflePartitions(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit internal to spark, probably fine but i'm not a fan of touching this over spark.conf() |
||
| reusableDS = | ||
| ds.repartition(parallelism).map((MapFunction<T, T>) value -> value, ds.exprEnc()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we repartition here? (and encode) |
||
| } | ||
|
|
||
| try { | ||
| return func.apply(reusableDS); | ||
| } finally { | ||
| if (useCaching) { | ||
| reusableDS.unpersist(false); | ||
| } | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a note that this removes delete files only if they don't apply to any non-expired datafile. Just to make it clear that this isn't just about "live" delete files