Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ acceptedBreaks:
- code: "java.method.addedToInterface"
new: "method java.lang.String org.apache.iceberg.view.ViewVersion::operation()"
justification: "Add operation API to view version"
- code: "java.method.addedToInterface"
new: "method java.util.List<org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult>\
\ org.apache.iceberg.actions.RewritePositionDeleteFiles.Result::rewriteResults()"
justification: "New method added to un-implemented interface"
- code: "java.method.removed"
old: "method java.lang.Integer org.apache.iceberg.view.ImmutableSQLViewRepresentation::schemaId()"
justification: "Moving SchemaID to ViewVersion. View Spec implementation has\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,10 @@ default DeleteReachableFiles deleteReachableFiles(String metadataLocation) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement deleteReachableFiles");
}

/** Instantiates an action to rewrite position delete files */
default RewritePositionDeleteFiles rewritePositionDeletes(Table table) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement rewritePositionDeletes");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,68 @@
*/
package org.apache.iceberg.actions;

import java.util.List;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;
import org.immutables.value.Value;

/**
* An action for rewriting position delete files.
*
* <p>Generally used for optimizing the size and layout of position delete files within a table.
*/
@Value.Enclosing
public interface RewritePositionDeleteFiles
extends SnapshotUpdate<RewritePositionDeleteFiles, RewritePositionDeleteFiles.Result> {

/**
* Enable committing groups of files (see max-file-group-size-bytes) prior to the entire rewrite
* completing. This will produce additional commits but allow for progress even if some groups
* fail to commit. This setting will not change the correctness of the rewrite operation as file
* groups can be compacted independently.
*
* <p>The default is false, which produces a single commit when the entire job has completed.
*/
String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";

boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;

/**
* The maximum amount of Iceberg commits that this rewrite is allowed to produce if partial
* progress is enabled. This setting has no effect if partial progress is disabled.
*/
String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";

int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;

/**
* The max number of file groups to be simultaneously rewritten by the rewrite strategy. The
* structure and contents of the group is determined by the rewrite strategy. Each file group will
* be rewritten independently and asynchronously.
*/
String MAX_CONCURRENT_FILE_GROUP_REWRITES = "max-concurrent-file-group-rewrites";

int MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT = 5;

/**
* Forces the rewrite job order based on the value.
*
* <ul>
* <li>If rewrite-job-order=bytes-asc, then rewrite the smallest job groups first.
* <li>If rewrite-job-order=bytes-desc, then rewrite the largest job groups first.
* <li>If rewrite-job-order=files-asc, then rewrite the job groups with the least files first.
* <li>If rewrite-job-order=files-desc, then rewrite the job groups with the most files first.
* <li>If rewrite-job-order=none, then rewrite job groups in the order they were planned (no
* specific ordering).
* </ul>
*
* <p>Defaults to none.
*/
String REWRITE_JOB_ORDER = "rewrite-job-order";

String REWRITE_JOB_ORDER_DEFAULT = RewriteJobOrder.NONE.orderName();

/**
* A filter for finding deletes to rewrite.
*
Expand All @@ -41,11 +93,80 @@ public interface RewritePositionDeleteFiles
RewritePositionDeleteFiles filter(Expression expression);

/** The action result that contains a summary of the execution. */
@Value.Immutable
interface Result {
/** Returns the count of the position deletes that been rewritten. */
List<FileGroupRewriteResult> rewriteResults();

/** Returns the count of the position delete files that have been rewritten. */
default int rewrittenDeleteFilesCount() {
return rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::rewrittenDeleteFilesCount)
.sum();
}

/** Returns the count of the added position delete files. */
default int addedDeleteFilesCount() {
return rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::addedDeleteFilesCount)
.sum();
}

/** Returns the number of bytes of position delete files that have been rewritten */
default long rewrittenBytesCount() {
return rewriteResults().stream().mapToLong(FileGroupRewriteResult::rewrittenBytesCount).sum();
}

/** Returns the number of bytes of newly added position delete files */
default long addedBytesCount() {
return rewriteResults().stream().mapToLong(FileGroupRewriteResult::addedBytesCount).sum();
}
}

/**
* For a particular position delete file group, the number of position delete files which are
* newly created and the number of files which were formerly part of the table but have been
* rewritten.
*/
@Value.Immutable
interface FileGroupRewriteResult {
/** Description of this position delete file group. */
FileGroupInfo info();

/** Returns the count of the position delete files that been rewritten in this group. */
int rewrittenDeleteFilesCount();

/** Returns the count of the added delete files. */
/** Returns the count of the added position delete files in this group. */
int addedDeleteFilesCount();

/** Returns the number of bytes of rewritten position delete files in this group. */
long rewrittenBytesCount();

/** Returns the number of bytes of newly added position delete files in this group. */
long addedBytesCount();
}

/**
* A description of a position delete file group, when it was processed, and within which
* partition. For use tracking rewrite operations and for returning results.
*/
@Value.Immutable
interface FileGroupInfo {
/**
* Returns which position delete file group this is out of the total set of file groups for this
* rewrite
*/
int globalIndex();

/**
* Returns which position delete file group this is out of the set of file groups for this
* partition
*/
int partitionIndex();

/**
* Returns which partition this position delete file group contains files from. This will be of
* the type of the table's unified partition type considering all specs in a table.
*/
StructLike partition();
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
* @param type type of files being deleted
* @param concurrent controls concurrent deletion. Only applicable for non-bulk FileIO
*/
private static void deleteFiles(
public static void deleteFiles(
FileIO io, Iterable<String> files, String type, boolean concurrent) {
if (io instanceof SupportsBulkOperations) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Table;

/** A strategy for an action to rewrite position delete files. */
/**
* A strategy for an action to rewrite position delete files.
*
* @deprecated since 1.3.0, will be removed in 1.4.0; Use {@link SizeBasedFileRewriter} instead
*/
@Deprecated
public interface RewritePositionDeleteStrategy {

/** Returns the name of this rewrite deletes strategy */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.util.Set;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Functionality used by {@link RewritePositionDeleteFiles} from different platforms to handle
* commits.
*/
public class RewritePositionDeletesCommitManager {
private static final Logger LOG =
LoggerFactory.getLogger(RewritePositionDeletesCommitManager.class);

private final Table table;
private final long startingSnapshotId;

public RewritePositionDeletesCommitManager(Table table) {
this.table = table;
this.startingSnapshotId = table.currentSnapshot().snapshotId();
}

/**
* Perform a commit operation on the table adding and removing files as required for this set of
* file groups.
*
* @param fileGroups file groups to commit
*/
public void commit(Set<RewritePositionDeletesGroup> fileGroups) {
Set<DeleteFile> rewrittenDeleteFiles = Sets.newHashSet();
Set<DeleteFile> addedDeleteFiles = Sets.newHashSet();
for (RewritePositionDeletesGroup group : fileGroups) {
rewrittenDeleteFiles.addAll(group.rewrittenDeleteFiles());
addedDeleteFiles.addAll(group.addedDeleteFiles());
}

table
.newRewrite()
.validateFromSnapshot(startingSnapshotId)
.rewriteFiles(ImmutableSet.of(), rewrittenDeleteFiles, ImmutableSet.of(), addedDeleteFiles)
.commit();
}

/**
* Clean up a specified file set by removing any files created for that operation, should not
* throw any exceptions.
*
* @param fileGroup group of files which has already been rewritten
*/
public void abort(RewritePositionDeletesGroup fileGroup) {
Preconditions.checkState(
fileGroup.addedDeleteFiles() != null, "Cannot abort a fileGroup that was not rewritten");

Iterable<String> filePaths =
Iterables.transform(fileGroup.addedDeleteFiles(), f -> f.path().toString());
CatalogUtil.deleteFiles(table.io(), filePaths, "position delete", true);
}

public void commitOrClean(Set<RewritePositionDeletesGroup> rewriteGroups) {
try {
commit(rewriteGroups);
} catch (CommitStateUnknownException e) {
LOG.error(
"Commit state unknown for {}, cannot clean up files because they may have been committed successfully.",
rewriteGroups,
e);
throw e;
} catch (Exception e) {
LOG.error("Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e);
rewriteGroups.forEach(this::abort);
throw e;
}
}

/**
* An async service which allows for committing multiple file groups as their rewrites complete.
* The service also allows for partial-progress since commits can fail. Once the service has been
* closed no new file groups should not be offered.
*
* @param rewritesPerCommit number of file groups to include in a commit
* @return the service for handling commits
*/
public CommitService service(int rewritesPerCommit) {
return new CommitService(rewritesPerCommit);
}

public class CommitService extends BaseCommitService<RewritePositionDeletesGroup> {

CommitService(int rewritesPerCommit) {
super(table, rewritesPerCommit);
}

@Override
protected void commitOrClean(Set<RewritePositionDeletesGroup> batch) {
RewritePositionDeletesCommitManager.this.commitOrClean(batch);
}

@Override
protected void abortFileGroup(RewritePositionDeletesGroup group) {
RewritePositionDeletesCommitManager.this.abort(group);
}
}
}
Loading