Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/src/main/java/org/apache/iceberg/RewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg;

import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

Expand Down Expand Up @@ -86,4 +87,7 @@ public interface RewriteManifests extends SnapshotUpdate<RewriteManifests> {
* @return this for method chaining
*/
RewriteManifests addManifest(ManifestFile manifest);

RewriteManifests validateWith(BiFunction<Long, Long, Void> func);

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ default RewriteManifests rewriteManifests(Table table) {
this.getClass().getName() + " does not implement rewriteManifests");
}

/** Instantiates an action to repair manifests. */
default RepairManifests repairManifests(Table table) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement repairManifests");
}

/** Instantiates an action to rewrite data files. */
default RewriteDataFiles rewriteDataFiles(Table table) {
throw new UnsupportedOperationException(
Expand Down
58 changes: 58 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/RepairManifests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import org.apache.iceberg.ManifestFile;

public interface RepairManifests extends SnapshotUpdate<RepairManifests, RepairManifests.Result> {

/**
* Passes a location where the staged manifests should be written.
*
* <p>If not set, defaults to the table's metadata location.
*
* @param stagingLocation a staging location
* @return this for method chaining
*/
RepairManifests stagingLocation(String stagingLocation);

/**
* Toggle writing manifests or only seeing potential results
*
* <p>If not set, defaults false
*
* @param value boolean
* @return this for method chaining
*/
RepairManifests dryRun(boolean value);

interface Result {
/** Returns rewritten manifests. */
Iterable<ManifestFile> rewrittenManifests();

/** Returns added manifests. */
Iterable<ManifestFile> addedManifests();

/** Returns count of duplicate files removed */
Long duplicateFilesRemoved();

/** Returns count of missing files removed */
Long missingFilesRemoved();
}
}
25 changes: 20 additions & 5 deletions core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -68,6 +69,17 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests>
private final AtomicLong entryCount = new AtomicLong(0);

private Function<DataFile, Object> clusterByFunc;

private BiFunction<Long, Long, Void> validateFileCountsFunc =
(createdManifestsFilesCount, replacedManifestsFilesCount) -> {
if (!createdManifestsFilesCount.equals(replacedManifestsFilesCount)) {
throw new ValidationException(
"Replaced and created manifests must have the same number of active files: %d (new), %d (old)",
createdManifestsFilesCount, replacedManifestsFilesCount);
}
return null;
};

private Predicate<ManifestFile> predicate;

private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
Expand Down Expand Up @@ -152,6 +164,12 @@ public RewriteManifests addManifest(ManifestFile manifest) {
return this;
}

@Override
public RewriteManifests validateWith(BiFunction<Long, Long, Void> func) {
this.validateFileCountsFunc = func;
return this;
}

private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest);
Expand Down Expand Up @@ -295,11 +313,8 @@ private void validateFilesCounts() {
Iterables.concat(rewrittenManifests, deletedManifests);
int replacedManifestsFilesCount = activeFilesCount(replacedManifests);

if (createdManifestsFilesCount != replacedManifestsFilesCount) {
throw new ValidationException(
"Replaced and created manifests must have the same number of active files: %d (new), %d (old)",
createdManifestsFilesCount, replacedManifestsFilesCount);
}
validateFileCountsFunc.apply(
(long) createdManifestsFilesCount, (long) replacedManifestsFilesCount);
}

private int activeFilesCount(Iterable<ManifestFile> manifests) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import org.immutables.value.Value;

@Value.Enclosing
@SuppressWarnings("ImmutablesStyle")
@Value.Style(
typeImmutableEnclosing = "ImmutableRepairManifests",
visibilityString = "PUBLIC",
builderVisibilityString = "PUBLIC")
interface BaseRepairManifests extends RepairManifests {

@Value.Immutable
interface Result extends RepairManifests.Result {}
}
Loading