Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,26 @@ public interface Snapshot extends Serializable {
*/
Iterable<DataFile> deletedFiles();

/**
* Return all delete files added to the table in this snapshot.
* <p>
* The files returned include the following columns: file_path, file_format, partition,
* record_count, and file_size_in_bytes. Other columns will be null.
*
* @return all delete files added to the table in this snapshot.
*/
Iterable<DeleteFile> addedDeleteFiles();

/**
* Return all delete files deleted from the table in this snapshot.
* <p>
* The files returned include the following columns: file_path, file_format, partition,
* record_count, and file_size_in_bytes. Other columns will be null.
*
* @return all delete files deleted from the table in this snapshot.
*/
Iterable<DeleteFile> deletedDeleteFiles();

/**
* Return the location of this snapshot's manifest list, or null if it is not separate.
*
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/TableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ public interface TableScan {
*/
TableScan useSnapshot(long snapshotId);


/**
* Create a new {@link TableScan} from this scan's configuration that will scan the manifests
* specified by the parameters.
*
* @param manifests specified manifest group which will be scanned
* @return a new scan based on this with the given manifest group
* @throws IllegalArgumentException if the snapshot cannot be found
*/
TableScan useManifests(Iterable<ManifestFile> manifests);

/**
* Create a new {@link TableScan} from this scan's configuration that will use the most recent
* snapshot as of the given time in milliseconds.
Expand Down
76 changes: 55 additions & 21 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableCollection;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -49,8 +51,10 @@ class BaseSnapshot implements Snapshot {
private transient List<ManifestFile> allManifests = null;
private transient List<ManifestFile> dataManifests = null;
private transient List<ManifestFile> deleteManifests = null;
private transient List<DataFile> cachedAdds = null;
private transient List<DataFile> cachedDeletes = null;
private transient List<DataFile> cachedAddedDataFiles = null;
private transient List<DataFile> cachedDeletedDataFiles = null;
private transient List<DeleteFile> cachedAddedDeleteFiles = null;
private transient List<DeleteFile> cachedDeletedDeleteFiles = null;

/**
* For testing only.
Expand Down Expand Up @@ -175,40 +179,73 @@ public List<ManifestFile> deleteManifests() {

@Override
public List<DataFile> addedFiles() {
if (cachedAdds == null) {
cacheChanges();
if (cachedAddedDataFiles == null) {
cacheDataFileChanges();
}
return cachedAdds;
return cachedAddedDataFiles;
}

@Override
public List<DataFile> deletedFiles() {
if (cachedDeletes == null) {
cacheChanges();
if (cachedDeletedDataFiles == null) {
cacheDataFileChanges();
}
return cachedDeletes;
return cachedDeletedDataFiles;
}

@Override
public String manifestListLocation() {
return manifestListLocation;
public Iterable<DeleteFile> addedDeleteFiles() {
if (cachedAddedDeleteFiles == null) {
cacheDeleteFileChanges();
}
return cachedAddedDeleteFiles;
}

private void cacheChanges() {
if (io == null) {
throw new IllegalStateException("Cannot cache changes: FileIO is null");
@Override
public Iterable<DeleteFile> deletedDeleteFiles() {
if (cachedDeletedDeleteFiles == null) {
cacheDeleteFileChanges();
}
return cachedDeletedDeleteFiles;
}

@Override
public String manifestListLocation() {
return manifestListLocation;
}

private void cacheDataFileChanges() {
ImmutableList.Builder<DataFile> adds = ImmutableList.builder();
ImmutableList.Builder<DataFile> deletes = ImmutableList.builder();
collectFileChanges(adds, deletes, dataManifests(),
manifest -> ManifestFiles.read(manifest, io, null).entries()
);
this.cachedAddedDataFiles = adds.build();
this.cachedDeletedDataFiles = deletes.build();
}

private void cacheDeleteFileChanges() {
ImmutableList.Builder<DeleteFile> adds = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> deletes = ImmutableList.builder();
collectFileChanges(adds, deletes, deleteManifests(),
manifest -> ManifestFiles.readDeleteManifest(manifest, io, null).entries()
);
this.cachedAddedDeleteFiles = adds.build();
this.cachedDeletedDeleteFiles = deletes.build();
}

private <F extends ContentFile<F>> void collectFileChanges(
ImmutableCollection.Builder<F> adds, ImmutableCollection.Builder<F> deletes, Iterable<ManifestFile> manifests,
Function<ManifestFile, CloseableIterable<ManifestEntry<F>>> manifestReader) {
// read only manifests that were created by this snapshot
Iterable<ManifestFile> changedManifests = Iterables.filter(dataManifests(),
Iterable<ManifestFile> changedManifests = Iterables.filter(manifests,
manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
try (CloseableIterable<ManifestEntry<DataFile>> entries = new ManifestGroup(io, changedManifests)
.ignoreExisting()
.entries()) {
for (ManifestEntry<DataFile> entry : entries) {

try (CloseableIterable<ManifestEntry<F>> entries = CloseableIterable.filter(
CloseableIterable.concat(Iterables.transform(changedManifests, manifestReader::apply)),
entry -> entry.status() != ManifestEntry.Status.EXISTING
)) {
for (ManifestEntry<F> entry : entries) {
switch (entry.status()) {
case ADDED:
adds.add(entry.file().copy());
Expand All @@ -224,9 +261,6 @@ private void cacheChanges() {
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close entries while caching changes");
}

this.cachedAdds = adds.build();
this.cachedDeletes = deletes.build();
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ public TableScan appendsAfter(long fromSnapshotId) {
throw new UnsupportedOperationException("Incremental scan is not supported");
}

@Override
public TableScan useManifests(Iterable<ManifestFile> manifests) {
throw new UnsupportedOperationException("Specific manifests scan is not supported");
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
Preconditions.checkArgument(context.snapshotId() == null,
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ public TableScan appendsAfter(long fromSnapshotId) {
return appendsBetween(fromSnapshotId, currentSnapshot.snapshotId());
}

@Override
public TableScan useManifests(Iterable<ManifestFile> manifests) {
Long scanSnapshotId = snapshotId();
Preconditions.checkState(scanSnapshotId == null,
"Cannot enable specific manifests scan, scan-snapshot set to id=%s", scanSnapshotId);
return new ManifestsDataTableScan(tableOps(), table(), schema(), context().useManifests(manifests));
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
// call method in superclass just for the side effect of argument validation;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestRea
Iterable<ManifestFile> matchingManifests = evalCache == null ? deleteManifests :
Iterables.filter(deleteManifests, manifest ->
manifest.content() == ManifestContent.DELETES &&
(manifest.hasAddedFiles() || manifest.hasExistingFiles()) &&
(manifest.hasAddedFiles() || manifest.hasExistingFiles() || manifest.hasDeletedFiles()) &&
evalCache.get(manifest.partitionSpecId()).eval(manifest));

return Iterables.transform(
Expand Down
92 changes: 92 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestsDataTableScan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.ThreadPools;

public class ManifestsDataTableScan extends DataTableScan {

ManifestsDataTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
super(ops, table, schema, context);
Preconditions.checkState(context.manifests() != null && !Iterables.isEmpty(context.manifests()),
"Scanned manifests cannot be null or empty");
}

@Override
public TableScan asOfTime(long timestampMillis) {
throw new UnsupportedOperationException(String.format(
"Cannot scan table as of time %s: configured for specific manifests %s",
timestampMillis, context().manifests()));
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
throw new UnsupportedOperationException(String.format(
"Cannot scan table using scan snapshot id %s: configured for specific manifests %s",
scanSnapshotId, context().manifests()));
}

@Override
public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
throw new UnsupportedOperationException(String.format(
"Cannot scan table in snapshots (%s, %s]: configured for specific manifests %s",
fromSnapshotId, toSnapshotId, context().manifests()));
}

@Override
public TableScan appendsAfter(long fromSnapshotId) {
throw new UnsupportedOperationException(String.format(
"Cannot scan appends after %s: configured for specific manifests %s",
fromSnapshotId, context().manifests()));
}

@Override
public TableScan useManifests(Iterable<ManifestFile> manifests) {
return new ManifestsDataTableScan(tableOps(), table(), schema(), context().useManifests(manifests));
}

@Override
protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
return new ManifestsDataTableScan(ops, table, schema, context);
}

@Override
public CloseableIterable<FileScanTask> planFiles() {
ManifestGroup manifestGroup = new ManifestGroup(tableOps().io(), context().manifests())
.caseSensitive(isCaseSensitive())
.select(colStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
.filterData(filter())
.specsById(tableOps().current().specsById())
.ignoreDeleted();

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
}

if (PLAN_SCANS_WITH_WORKER_POOL && Iterables.size(context().manifests()) > 1) {
manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
}

return manifestGroup.planFiles();
}
}
Loading