Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
24d8691
Flink: init streaming rewrite.
Reo-LEI Oct 19, 2021
4d94772
Merge remote-tracking branch 'community/master' into flink-streaming-…
Reo-LEI Oct 19, 2021
c9c125b
refactor rewrite files committer.
Reo-LEI Oct 19, 2021
1df1814
Add rewrite parallelism config.
Reo-LEI Oct 20, 2021
d96704f
Add unittest for stream rewriter.
Reo-LEI Oct 20, 2021
98e2832
Add unittest for rewrite rile committer.
Reo-LEI Oct 21, 2021
0ff66bf
Add unittest for file committer.
Reo-LEI Oct 21, 2021
69689e5
Merge remote-tracking branch 'community/master' into flink-streaming-…
Reo-LEI Oct 21, 2021
c5ca44b
Fix unittest.
Reo-LEI Oct 21, 2021
47f4765
Fix unittest.
Reo-LEI Oct 21, 2021
d8a7f69
Fix unittest.
Reo-LEI Oct 21, 2021
6439bbb
Fix rewrite commit error when no rewrite result.
Reo-LEI Oct 23, 2021
5a1739d
Merge remote-tracking branch 'community/master' into flink-streaming-…
Reo-LEI Nov 5, 2021
8fadd91
Merge remote-tracking branch 'community/master' into flink-streaming-…
Reo-LEI Nov 17, 2021
971d1c7
Move to flink v1.13.
Reo-LEI Nov 17, 2021
682a2b2
Wrap PartitionData by StructLikeWrapper.
Reo-LEI Nov 18, 2021
11814e9
Merge branch 'apache:master' into flink-streaming-rewrite
Reo-LEI Nov 18, 2021
25a2a90
support to emit all needed files for restore and concurrent writer.
Reo-LEI Dec 25, 2021
2a52cd5
Fix unittest.
Reo-LEI Dec 26, 2021
189c946
Merge remote-tracking branch 'origin/master' into flink-streaming-rew…
Reo-LEI Dec 26, 2021
268392a
fix class not found.
Reo-LEI Dec 26, 2021
25a93e0
not swallow exception when rewrite fail.
Reo-LEI Dec 26, 2021
328c876
port to flink 0.14
Reo-LEI Dec 26, 2021
1e5c7a5
Flink: fix streaming rewrite kryo serialization error for partition b…
Reo-LEI Jan 3, 2022
7d131f3
Flink: refactor streaming rewrite.
Reo-LEI Jan 27, 2022
521d62f
Merge branch 'master' into flink-streaming-rewrite
Reo-LEI Jan 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,26 @@ public interface Snapshot extends Serializable {
*/
Iterable<DataFile> deletedFiles();

/**
* Return all delete files added to the table in this snapshot.
* <p>
* The files returned include the following columns: file_path, file_format, partition,
* record_count, and file_size_in_bytes. Other columns will be null.
*
* @return all delete files added to the table in this snapshot.
*/
Iterable<DeleteFile> addedDeleteFiles();

/**
* Return all delete files deleted from the table in this snapshot.
* <p>
* The files returned include the following columns: file_path, file_format, partition,
* record_count, and file_size_in_bytes. Other columns will be null.
*
* @return all delete files deleted from the table in this snapshot.
*/
Iterable<DeleteFile> deletedDeleteFiles();

/**
* Return the location of this snapshot's manifest list, or null if it is not separate.
*
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/TableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ public interface TableScan {
*/
TableScan useSnapshot(long snapshotId);


/**
* Create a new {@link TableScan} from this scan's configuration that will scan the manifests
* specified by the parameters.
*
* @param manifests specified manifest group which will be scanned
* @return a new scan based on this with the given manifest group
* @throws IllegalArgumentException if the snapshot cannot be found
*/
TableScan useManifests(Iterable<ManifestFile> manifests);

/**
* Create a new {@link TableScan} from this scan's configuration that will use the most recent
* snapshot as of the given time in milliseconds.
Expand Down
78 changes: 60 additions & 18 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableCollection;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -49,8 +51,10 @@ class BaseSnapshot implements Snapshot {
private transient List<ManifestFile> allManifests = null;
private transient List<ManifestFile> dataManifests = null;
private transient List<ManifestFile> deleteManifests = null;
private transient List<DataFile> cachedAdds = null;
private transient List<DataFile> cachedDeletes = null;
private transient List<DataFile> cachedAddedDataFiles = null;
private transient List<DataFile> cachedDeletedDataFiles = null;
private transient List<DeleteFile> cachedAddedDeleteFiles = null;
private transient List<DeleteFile> cachedDeletedDeleteFiles = null;

/**
* For testing only.
Expand Down Expand Up @@ -175,40 +179,81 @@ public List<ManifestFile> deleteManifests() {

@Override
public List<DataFile> addedFiles() {
if (cachedAdds == null) {
cacheChanges();
if (cachedAddedDataFiles == null) {
cacheDataFileChanges();
}
return cachedAdds;
return cachedAddedDataFiles;
}

@Override
public List<DataFile> deletedFiles() {
if (cachedDeletes == null) {
cacheChanges();
if (cachedDeletedDataFiles == null) {
cacheDataFileChanges();
}
return cachedDeletes;
return cachedDeletedDataFiles;
}

@Override
public Iterable<DeleteFile> addedDeleteFiles() {
if (cachedAddedDeleteFiles == null) {
cacheDeleteFileChanges();
}
return cachedAddedDeleteFiles;
}

@Override
public Iterable<DeleteFile> deletedDeleteFiles() {
if (cachedDeletedDeleteFiles == null) {
cacheDeleteFileChanges();
}
return cachedDeletedDeleteFiles;
}

@Override
public String manifestListLocation() {
return manifestListLocation;
}

private void cacheChanges() {
private void cacheDataFileChanges() {
if (io == null) {
throw new IllegalStateException("Cannot cache changes: FileIO is null");
throw new IllegalStateException("Cannot cache data file changes: FileIO is null");
}

ImmutableList.Builder<DataFile> adds = ImmutableList.builder();
ImmutableList.Builder<DataFile> deletes = ImmutableList.builder();
collectFileChanges(adds, deletes, dataManifests(),
manifest -> ManifestFiles.read(manifest, io, null).entries()
);
this.cachedAddedDataFiles = adds.build();
this.cachedDeletedDataFiles = deletes.build();
}

private void cacheDeleteFileChanges() {
if (io == null) {
throw new IllegalStateException("Cannot cache delete file changes: FileIO is null");
}

ImmutableList.Builder<DeleteFile> adds = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> deletes = ImmutableList.builder();
collectFileChanges(adds, deletes, deleteManifests(),
manifest -> ManifestFiles.readDeleteManifest(manifest, io, null).entries()
);
this.cachedAddedDeleteFiles = adds.build();
this.cachedDeletedDeleteFiles = deletes.build();
}

private <F extends ContentFile<F>> void collectFileChanges(
ImmutableCollection.Builder<F> adds, ImmutableCollection.Builder<F> deletes, Iterable<ManifestFile> manifests,
Function<ManifestFile, CloseableIterable<ManifestEntry<F>>> manifestReader) {
// read only manifests that were created by this snapshot
Iterable<ManifestFile> changedManifests = Iterables.filter(dataManifests(),
Iterable<ManifestFile> changedManifests = Iterables.filter(manifests,
manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
try (CloseableIterable<ManifestEntry<DataFile>> entries = new ManifestGroup(io, changedManifests)
.ignoreExisting()
.entries()) {
for (ManifestEntry<DataFile> entry : entries) {

try (CloseableIterable<ManifestEntry<F>> entries = CloseableIterable.filter(
CloseableIterable.concat(Iterables.transform(changedManifests, manifestReader::apply)),
entry -> entry.status() != ManifestEntry.Status.EXISTING
)) {
for (ManifestEntry<F> entry : entries) {
switch (entry.status()) {
case ADDED:
adds.add(entry.file().copy());
Expand All @@ -224,9 +269,6 @@ private void cacheChanges() {
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close entries while caching changes");
}

this.cachedAdds = adds.build();
this.cachedDeletes = deletes.build();
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ public TableScan appendsAfter(long fromSnapshotId) {
throw new UnsupportedOperationException("Incremental scan is not supported");
}

@Override
public TableScan useManifests(Iterable<ManifestFile> manifests) {
throw new UnsupportedOperationException("Specific manifests scan is not supported");
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
Preconditions.checkArgument(context.snapshotId() == null,
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ public TableScan appendsAfter(long fromSnapshotId) {
return appendsBetween(fromSnapshotId, currentSnapshot.snapshotId());
}

@Override
public TableScan useManifests(Iterable<ManifestFile> manifests) {
Long scanSnapshotId = snapshotId();
Preconditions.checkState(scanSnapshotId == null,
"Cannot enable specific manifests scan, scan-snapshot set to id=%s", scanSnapshotId);
return new ManifestsDataTableScan(tableOps(), table(), schema(), context().useManifests(manifests));
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
// call method in superclass just for the side effect of argument validation;
Expand Down
92 changes: 92 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestsDataTableScan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.ThreadPools;

public class ManifestsDataTableScan extends DataTableScan {

ManifestsDataTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
super(ops, table, schema, context);
Preconditions.checkState(context.manifests() != null && !Iterables.isEmpty(context.manifests()),
"Scanned manifests cannot be null or empty");
}

@Override
public TableScan asOfTime(long timestampMillis) {
throw new UnsupportedOperationException(String.format(
"Cannot scan table as of time %s: configured for specific manifests %s",
timestampMillis, context().manifests()));
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
throw new UnsupportedOperationException(String.format(
"Cannot scan table using scan snapshot id %s: configured for specific manifests %s",
scanSnapshotId, context().manifests()));
}

@Override
public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
throw new UnsupportedOperationException(String.format(
"Cannot scan table in snapshots (%s, %s]: configured for specific manifests %s",
fromSnapshotId, toSnapshotId, context().manifests()));
}

@Override
public TableScan appendsAfter(long fromSnapshotId) {
throw new UnsupportedOperationException(String.format(
"Cannot scan appends after %s: configured for specific manifests %s",
fromSnapshotId, context().manifests()));
}

@Override
public TableScan useManifests(Iterable<ManifestFile> manifests) {
return new ManifestsDataTableScan(tableOps(), table(), schema(), context().useManifests(manifests));
}

@Override
protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
return new ManifestsDataTableScan(ops, table, schema, context);
}

@Override
public CloseableIterable<FileScanTask> planFiles() {
ManifestGroup manifestGroup = new ManifestGroup(tableOps().io(), context().manifests())
.caseSensitive(isCaseSensitive())
.select(colStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
.filterData(filter())
.specsById(tableOps().current().specsById())
.ignoreDeleted();

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
}

if (PLAN_SCANS_WITH_WORKER_POOL && Iterables.size(context().manifests()) > 1) {
manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
}

return manifestGroup.planFiles();
}
}
Loading