Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ static StructType getType(StructType partitionType) {
UPPER_BOUNDS,
KEY_METADATA,
SPLIT_OFFSETS,
EQUALITY_IDS
EQUALITY_IDS,
ManifestFile.SPEC_ID.asOptional()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.expressions;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Comparator;
Expand All @@ -45,7 +46,7 @@
* rows and false if the file cannot contain matching rows. Files may be skipped if and only if the
* return value of {@code eval} is false.
*/
public class InclusiveMetricsEvaluator {
public class InclusiveMetricsEvaluator implements Serializable {
private final Expression expr;

public InclusiveMetricsEvaluator(Schema schema, Expression unbound) {
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public PartitionData copy() {
found = true;
fromProjectionPos[i] = j;
}
if (fields.get(i).fieldId() == ManifestFile.SPEC_ID.fieldId()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These modifications allow BaseFile to translate into a SparkRow with the specID as a column

found = true;
fromProjectionPos[i] = 14;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to your PR but while we're here: once we find the projected value and found is true, do we need to iterate over the rest of the entries?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't, but I don't think it's that much of a time sink

}

if (!found) {
Expand Down Expand Up @@ -255,6 +259,9 @@ public void put(int i, Object value) {
case 13:
this.equalityIds = ArrayUtil.toIntArray((List<Integer>) value);
return;
case 14:
this.partitionSpecId = (value != null) ? (Integer) value : -1;
return;
default:
// ignore the object, it must be from a newer version of the format
}
Expand Down Expand Up @@ -301,6 +308,8 @@ public Object get(int i) {
return splitOffsets();
case 13:
return equalityFieldIds();
case 14:
return partitionSpecId;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
Expand Down
27 changes: 15 additions & 12 deletions core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@

package org.apache.iceberg;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
Expand All @@ -47,7 +43,6 @@
abstract class BaseTableScan implements TableScan {
private static final Logger LOG = LoggerFactory.getLogger(TableScan.class);

private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

private final TableOperations ops;
private final Table table;
Expand All @@ -65,6 +60,10 @@ protected BaseTableScan(TableOperations ops, Table table, Schema schema, TableSc
this.context = context;
}

protected TableScanContext tableScanContext() {
return context;
}

protected TableOperations tableOps() {
return ops;
}
Expand All @@ -85,7 +84,7 @@ protected Collection<String> selectedColumns() {
return context.selectedColumns();
}

protected Map<String, String> options() {
public Map<String, String> options() {
return context.options();
}

Expand All @@ -105,6 +104,14 @@ protected abstract CloseableIterable<FileScanTask> planFiles(
TableOperations ops, Snapshot snapshot, Expression rowFilter,
boolean ignoreResiduals, boolean caseSensitive, boolean colStats);

public Long fromSnapshotId() {
return context().fromSnapshotId();
}

public Long toSnapshotId() {
return context.toSnapshotId();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


@Override
public Table table() {
return table;
Expand Down Expand Up @@ -145,7 +152,7 @@ public TableScan asOfTime(long timestampMillis) {
// the snapshot ID could be null if no entries were older than the requested time. in that case,
// there is no valid snapshot to read.
Preconditions.checkArgument(lastSnapshotId != null,
"Cannot find a snapshot older than %s", formatTimestampMillis(timestampMillis));
"Cannot find a snapshot older than %s", TableScanUtil.formatTimestampMillis(timestampMillis));

return useSnapshot(lastSnapshotId);
}
Expand Down Expand Up @@ -202,7 +209,7 @@ public CloseableIterable<FileScanTask> planFiles() {
Snapshot snapshot = snapshot();
if (snapshot != null) {
LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table,
snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()),
snapshot.snapshotId(), TableScanUtil.formatTimestampMillis(snapshot.timestampMillis()),
context.rowFilter());

Listeners.notifyAll(
Expand Down Expand Up @@ -307,8 +314,4 @@ private Schema lazyColumnProjection() {

return schema;
}

private static String formatTimestampMillis(long millis) {
return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneId.systemDefault()));
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,9 @@ protected long targetSplitSize(TableOperations ops) {
return ops.current().propertyAsLong(
TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT);
}

@Override
public TableScanContext tableScanContext() {
return super.tableScanContext();
}
}
26 changes: 15 additions & 11 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
* Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(long, DataFile)} or
* {@link #forEntry(ManifestEntry)} to get the the delete files to apply to a given data file.
*/
class DeleteFileIndex {
public class DeleteFileIndex {
private static final DeleteFile[] EMPTY_DELETES = new DeleteFile[0];
private final Map<Integer, PartitionSpec> specsById;
private final Map<Integer, Types.StructType> partitionTypeById;
private final Map<Integer, ThreadLocal<StructLikeWrapper>> wrapperById;
Expand Down Expand Up @@ -100,7 +101,10 @@ DeleteFile[] forEntry(ManifestEntry<DataFile> entry) {
return forDataFile(entry.sequenceNumber(), entry.file());
}

DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
public DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
if (isEmpty()) {
return EMPTY_DELETES;
}
Pair<Integer, StructLikeWrapper> partition = partition(file.specId(), file.partition());
Pair<long[], DeleteFile[]> partitionDeletes = sortedDeletesByPartition.get(partition);

Expand Down Expand Up @@ -305,11 +309,11 @@ private static Stream<DeleteFile> limitBySequenceNumber(long sequenceNumber, lon
return Arrays.stream(files, start, files.length);
}

static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests) {
public static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests) {
return new Builder(io, Sets.newHashSet(deleteManifests));
}

static class Builder {
public static class Builder {
private final FileIO io;
private final Set<ManifestFile> deleteManifests;
private Map<Integer, PartitionSpec> specsById = null;
Expand All @@ -318,37 +322,37 @@ static class Builder {
private boolean caseSensitive = true;
private ExecutorService executorService = null;

Builder(FileIO io, Set<ManifestFile> deleteManifests) {
public Builder(FileIO io, Set<ManifestFile> deleteManifests) {
this.io = io;
this.deleteManifests = Sets.newHashSet(deleteManifests);
}

Builder specsById(Map<Integer, PartitionSpec> newSpecsById) {
public Builder specsById(Map<Integer, PartitionSpec> newSpecsById) {
this.specsById = newSpecsById;
return this;
}

Builder filterData(Expression newDataFilter) {
public Builder filterData(Expression newDataFilter) {
this.dataFilter = Expressions.and(dataFilter, newDataFilter);
return this;
}

Builder filterPartitions(Expression newPartitionFilter) {
public Builder filterPartitions(Expression newPartitionFilter) {
this.partitionFilter = Expressions.and(partitionFilter, newPartitionFilter);
return this;
}

Builder caseSensitive(boolean newCaseSensitive) {
public Builder caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
return this;
}

Builder planWith(ExecutorService newExecutorService) {
public Builder planWith(ExecutorService newExecutorService) {
this.executorService = newExecutorService;
return this;
}

DeleteFileIndex build() {
public DeleteFileIndex build() {
// read all of the matching delete manifests in parallel and accumulate the matching files in a queue
Queue<ManifestEntry<DeleteFile>> deleteEntries = new ConcurrentLinkedQueue<>();
Tasks.foreach(deleteManifestReaders())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
Expand All @@ -34,7 +33,7 @@ class IncrementalDataTableScan extends DataTableScan {

IncrementalDataTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
super(ops, table, schema, context.useSnapshotId(null));
validateSnapshotIds(table, context.fromSnapshotId(), context.toSnapshotId());
SnapshotUtil.validateSnapshotIds(table, context.fromSnapshotId(), context.toSnapshotId());
}

@Override
Expand All @@ -53,7 +52,7 @@ public TableScan useSnapshot(long scanSnapshotId) {

@Override
public TableScan appendsBetween(long newFromSnapshotId, long newToSnapshotId) {
validateSnapshotIdsRefinement(newFromSnapshotId, newToSnapshotId);
SnapshotUtil.validateSnapshotIdsRefinement(newFromSnapshotId, newToSnapshotId, table(), context());
return new IncrementalDataTableScan(tableOps(), table(), schema(),
context().fromSnapshotId(newFromSnapshotId).toSnapshotId(newToSnapshotId));
}
Expand All @@ -69,7 +68,7 @@ public TableScan appendsAfter(long newFromSnapshotId) {
@Override
public CloseableIterable<FileScanTask> planFiles() {
// TODO publish an incremental appends scan event
List<Snapshot> snapshots = snapshotsWithin(table(),
List<Snapshot> snapshots = SnapshotUtil.snapshotsWithin(table(),
context().fromSnapshotId(), context().toSnapshotId());
Set<Long> snapshotIds = Sets.newHashSet(Iterables.transform(snapshots, Snapshot::snapshotId));
Set<ManifestFile> manifests = FluentIterable
Expand Down Expand Up @@ -106,45 +105,5 @@ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema sche
return new IncrementalDataTableScan(ops, table, schema, context);
}

private static List<Snapshot> snapshotsWithin(Table table, long fromSnapshotId, long toSnapshotId) {
List<Long> snapshotIds = SnapshotUtil.snapshotIdsBetween(table, fromSnapshotId, toSnapshotId);
List<Snapshot> snapshots = Lists.newArrayList();
for (Long snapshotId : snapshotIds) {
Snapshot snapshot = table.snapshot(snapshotId);
// for now, incremental scan supports only appends
if (snapshot.operation().equals(DataOperations.APPEND)) {
snapshots.add(snapshot);
} else if (snapshot.operation().equals(DataOperations.OVERWRITE)) {
throw new UnsupportedOperationException(
String.format("Found %s operation, cannot support incremental data in snapshots (%s, %s]",
DataOperations.OVERWRITE, fromSnapshotId, toSnapshotId));
}
}
return snapshots;
}

private void validateSnapshotIdsRefinement(long newFromSnapshotId, long newToSnapshotId) {
Set<Long> snapshotIdsRange = Sets.newHashSet(
SnapshotUtil.snapshotIdsBetween(table(), context().fromSnapshotId(), context().toSnapshotId()));
// since snapshotIdsBetween return ids in range (fromSnapshotId, toSnapshotId]
snapshotIdsRange.add(context().fromSnapshotId());
Preconditions.checkArgument(
snapshotIdsRange.contains(newFromSnapshotId),
"from snapshot id %s not in existing snapshot ids range (%s, %s]",
newFromSnapshotId, context().fromSnapshotId(), newToSnapshotId);
Preconditions.checkArgument(
snapshotIdsRange.contains(newToSnapshotId),
"to snapshot id %s not in existing snapshot ids range (%s, %s]",
newToSnapshotId, context().fromSnapshotId(), context().toSnapshotId());
}

private static void validateSnapshotIds(Table table, long fromSnapshotId, long toSnapshotId) {
Preconditions.checkArgument(fromSnapshotId != toSnapshotId, "from and to snapshot ids cannot be the same");
Preconditions.checkArgument(
table.snapshot(fromSnapshotId) != null, "from snapshot %s does not exist", fromSnapshotId);
Preconditions.checkArgument(
table.snapshot(toSnapshotId) != null, "to snapshot %s does not exist", toSnapshotId);
Preconditions.checkArgument(SnapshotUtil.ancestorOf(table, toSnapshotId, fromSnapshotId),
"from snapshot %s is not an ancestor of to snapshot %s", fromSnapshotId, toSnapshotId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ protected long targetSplitSize(TableOperations ops) {
protected CloseableIterable<FileScanTask> planFiles(
TableOperations ops, Snapshot snapshot, Expression rowFilter,
boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
// return entries from both data and delete manifests
CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.allManifests());
// Only return Data Manifests, TODO Handle Delete Manifests
CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.dataManifests());
Type fileProjection = schema().findType("data_file");
Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
String schemaString = SchemaParser.toJson(schema());
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/java/org/apache/iceberg/ScanTasks.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import org.apache.iceberg.expressions.ResidualEvaluator;

public class ScanTasks {

/**
* Utilty class no public constructor
*/
private ScanTasks() {
}

public static BaseFileScanTask createBaseFileScanTask(DataFile file, DeleteFile[] deletes, String schemaString,
String specString, ResidualEvaluator residuals) {
return new BaseFileScanTask(file, deletes, schemaString, specString, residuals);
}
}
Loading