Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ private static List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<Manifes
* @param startFileIndex A startFileIndex used to skip processed files.
* @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes must be smaller than
* this size.
* @param isStarting Used to check where all the data file should be processed, or only added files.
* @param scanAllFiles Used to check whether all the data file should be processed, or only added files.
* @return A MicroBatch.
*/
private MicroBatch generateMicroBatch(List<Pair<ManifestFile, Integer>> indexedManifests,
int startFileIndex, long targetSizeInBytes, boolean isStarting) {
int startFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
if (indexedManifests.isEmpty()) {
return new MicroBatch(snapshot.snapshotId(), startFileIndex, startFileIndex + 1, 0L,
Collections.emptyList(), true);
Expand All @@ -195,7 +195,7 @@ private MicroBatch generateMicroBatch(List<Pair<ManifestFile, Integer>> indexedM
for (int idx = 0; idx < indexedManifests.size(); idx++) {
currentFileIndex = indexedManifests.get(idx).second();

try (CloseableIterable<FileScanTask> taskIterable = open(indexedManifests.get(idx).first(), isStarting);
try (CloseableIterable<FileScanTask> taskIterable = open(indexedManifests.get(idx).first(), scanAllFiles);
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
while (taskIter.hasNext()) {
Copy link
Contributor

@rdblue rdblue Jul 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you're using try-with-resources for the Iterable, you don't really need to worry about the Iterator that was created from it. The Iterable will close all open Iterators that were created in the block. It's fine either way, though.

FileScanTask task = taskIter.next();
Expand Down Expand Up @@ -238,11 +238,11 @@ private MicroBatch generateMicroBatch(List<Pair<ManifestFile, Integer>> indexedM
tasks, isLastIndex);
}

private CloseableIterable<FileScanTask> open(ManifestFile manifestFile, boolean isStarting) {
private CloseableIterable<FileScanTask> open(ManifestFile manifestFile, boolean scanAllFiles) {
ManifestGroup manifestGroup = new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
if (isStarting) {
if (!scanAllFiles) {
manifestGroup = manifestGroup
.filterManifestEntries(entry ->
entry.snapshotId() == snapshot.snapshotId() && entry.status() == ManifestEntry.Status.ADDED)
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.util;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Snapshot;
Expand Down Expand Up @@ -69,8 +70,20 @@ public static List<Long> currentAncestors(Table table) {
* This method assumes that fromSnapshotId is an ancestor of toSnapshotId.
*/
public static List<Long> snapshotIdsBetween(Table table, long fromSnapshotId, long toSnapshotId) {
AtomicBoolean isAncestor = new AtomicBoolean(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be clearer and not require the Atomic if we do a little refactoring and do two recursive walks,

Precondition.require((ancestorOf(table, fromSnapshotId, toSnapshotId))

then doing the ancestorIds lookup?

Or if we want to keep the current logic I believe we can just throw an IllegalStateException in the lambda instead of returning null or using the atomic boolean.

Copy link
Contributor Author

@jerryshao jerryshao Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think doing two rounds of traversing is quite time consuming if we have many snapshots in a table, so I prefer not to change to that way.

throw an IllegalStateException in the lambda instead of returning null or using the atomic boolean.

The AtomicBoolean here is to check if fromSnapshotId is existed or not. We can only know if fromSnapshotId is existed or not after doing a full traversing, so throwing an Exception during traversing in lambda seems hard to achieve.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't just have

if (snapshotId == fromSnapshotId) throw new IllegalStateException

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't just have if (snapshotId == fromSnapshotId) throw new IllegalStateException ?

That would be odd, as the condition is actually "expected" one. Even we throw a custom exception to terminate it, would the snapshots collected in previous call be "available" after handling the exception?

Btw this logic is great to prune the case (say, optimization), though I agree the code is less friendly to understand intuitively. Probably leaving a small code comment would make it clearer.

List<Long> snapshotIds = Lists.newArrayList(ancestorIds(table.snapshot(toSnapshotId),
snapshotId -> snapshotId != fromSnapshotId ? table.snapshot(snapshotId) : null));
snapshotId -> {
if (snapshotId == fromSnapshotId) {
isAncestor.set(true);
return null;
} else {
return table.snapshot(snapshotId);
}
}));
if (!isAncestor.get()) {
throw new IllegalStateException(fromSnapshotId + " is not an ancestor of " + toSnapshotId);
}

return snapshotIds;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.source;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.JsonUtil;
import org.apache.spark.sql.sources.v2.reader.streaming.Offset;

/**
* An implementation of Spark Structured Streaming Offset, to track the current processed
* files of Iceberg table. This StreamingOffset consists of:
*
* version: The version of StreamingOffset. The offset was created with a version number used to validate
* when deserializing from json string.
* snapshot_id: The current processed snapshot id.
* index: The index of last scanned file in snapshot.
* scan_all_files: Denote whether to scan all files in a snapshot, currently we only scan all files in the starting
* snapshot.
* snapshot_fully_processed: Denote whether the current snapshot is fully processed, to avoid revisiting the processed
* snapshot.
*/
class StreamingOffset extends Offset {
static final StreamingOffset START_OFFSET = new StreamingOffset(-1L, -1, false, true);

private static final int CURR_VERSION = 1;
private static final String VERSION = "version";
private static final String SNAPSHOT_ID = "snapshot_id";
private static final String INDEX = "index";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like Index needs to be a little more clear, I believe this is more like index of last Scanned file in snapshot? Or something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the index of last scanned file in snapshot.

private static final String SCAN_ALL_FILES = "scan_all_files";
private static final String SNAPSHOT_FULLY_PROCESSED = "snapshot_fully_processed";

private final long snapshotId;
private final int index;
private final boolean scanAllFiles;
private final boolean snapshotFullyProcessed;

StreamingOffset(long snapshotId, int index, boolean scanAllFiles, boolean snapshotFullyProcessed) {
this.snapshotId = snapshotId;
this.index = index;
this.scanAllFiles = scanAllFiles;
this.snapshotFullyProcessed = snapshotFullyProcessed;
}

static StreamingOffset fromJson(String json) {
Preconditions.checkNotNull(json, "The input JSON string is null");

try {
JsonNode node = JsonUtil.mapper().readValue(json, JsonNode.class);
int version = JsonUtil.getInt(VERSION, node);
if (version > CURR_VERSION) {
throw new IOException(String.format("Cannot deserialize a JSON offset from version %d. %d is not compatible " +
"with the version of Iceberg %d and cannot be used. Please use a compatible version of Iceberg " +
"to read this offset", version, version, CURR_VERSION));
}

long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node);
int index = JsonUtil.getInt(INDEX, node);
boolean shouldScanAllFiles = JsonUtil.getBool(SCAN_ALL_FILES, node);
boolean snapshotFullyProcessed = JsonUtil.getBool(SNAPSHOT_FULLY_PROCESSED, node);

return new StreamingOffset(snapshotId, index, shouldScanAllFiles, snapshotFullyProcessed);
} catch (IOException e) {
throw new IllegalStateException(String.format("Failed to parse StreamingOffset from JSON string %s", json), e);
}
}

@Override
public String json() {
StringWriter writer = new StringWriter();
try {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.writeStartObject();
generator.writeNumberField(VERSION, CURR_VERSION);
generator.writeNumberField(SNAPSHOT_ID, snapshotId);
generator.writeNumberField(INDEX, index);
generator.writeBooleanField(SCAN_ALL_FILES, scanAllFiles);
generator.writeBooleanField(SNAPSHOT_FULLY_PROCESSED, snapshotFullyProcessed);
generator.writeEndObject();
generator.flush();

} catch (IOException e) {
throw new UncheckedIOException("Failed to write StreamingOffset to json", e);
}

return writer.toString();
}

long snapshotId() {
return snapshotId;
}

int index() {
return index;
}

boolean shouldScanAllFiles() {
return scanAllFiles;
}

boolean isSnapshotFullyProcessed() {
return snapshotFullyProcessed;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof StreamingOffset) {
StreamingOffset offset = (StreamingOffset) obj;
return offset.snapshotId == snapshotId &&
offset.index == index &&
offset.scanAllFiles == scanAllFiles &&
offset.snapshotFullyProcessed == snapshotFullyProcessed;
} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hashCode(snapshotId, index, scanAllFiles, snapshotFullyProcessed);
}

@Override
public String toString() {
return String.format("Streaming Offset[%d: index (%d) scan_all_files (%b) snapshot_fully_processed (%b)]",
snapshotId, index, scanAllFiles, snapshotFullyProcessed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,19 @@
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.StreamWriteSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, StreamWriteSupport {
public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister,
StreamWriteSupport, MicroBatchReadSupport {

private SparkSession lazySpark = null;
private JavaSparkContext lazySparkContext = null;
Expand Down Expand Up @@ -129,6 +132,23 @@ public StreamWriter createStreamWriter(String runId, StructType dsStruct,
return new StreamingWriter(table, io, encryptionManager, options, queryId, mode, appId, writeSchema, dsStruct);
}

@Override
public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, String checkpointLocation,
DataSourceOptions options) {
if (schema.isPresent()) {
throw new IllegalStateException("Iceberg does not support specifying the schema at read time");
}

Configuration conf = new Configuration(lazyBaseConf());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There looks to be same patterns across this file, though the PR is already too huge and better to do in other PR.

Table table = getTableAndResolveHadoopConfiguration(options, conf);
String caseSensitive = lazySparkSession().conf().get("spark.sql.caseSensitive");

Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());

return new StreamingReader(table, io, encryptionManager, Boolean.parseBoolean(caseSensitive), options);
}

protected Table findTable(DataSourceOptions options, Configuration conf) {
Optional<String> path = options.get("path");
Preconditions.checkArgument(path.isPresent(), "Cannot open table: path is not set");
Expand Down
72 changes: 49 additions & 23 deletions spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,26 @@ private StructType lazyType() {
return type;
}

protected Long splitSize() {
return splitSize;
}

protected Integer splitLookback() {
return splitLookback;
}

protected Long splitOpenFileCost() {
return splitOpenFileCost;
}

protected boolean caseSensitive() {
return caseSensitive;
}

protected List<Expression> filterExpressions() {
return filterExpressions;
}

@Override
public StructType readSchema() {
return lazyType();
Expand Down Expand Up @@ -311,28 +331,7 @@ public Statistics estimateStatistics() {
@Override
public boolean enableBatchRead() {
if (readUsingBatch == null) {
boolean allParquetFileScanTasks =
tasks().stream()
.allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
.stream()
.allMatch(fileScanTask -> fileScanTask.file().format().equals(
FileFormat.PARQUET)));

boolean allOrcFileScanTasks =
tasks().stream()
.allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
.stream()
.allMatch(fileScanTask -> fileScanTask.file().format().equals(
FileFormat.ORC)));

boolean atLeastOneColumn = lazySchema().columns().size() > 0;

boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType());

boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);

this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
this.readUsingBatch = checkEnableBatchRead(tasks());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to add reviewer-friendly comment that whether the extracted method is change or not, and if the method is changed, where.

}
return readUsingBatch;
}
Expand All @@ -344,7 +343,7 @@ private static void mergeIcebergHadoopConfs(
.forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
}

private List<CombinedScanTask> tasks() {
protected List<CombinedScanTask> tasks() {
if (tasks == null) {
TableScan scan = table
.newScan()
Expand Down Expand Up @@ -395,6 +394,33 @@ private List<CombinedScanTask> tasks() {
return tasks;
}

// An extracted method which will be overrided by StreamingReader. This is because the tasks generated by Streaming is
// per batch and cannot be planned like Reader beforehand.
protected boolean checkEnableBatchRead(List<CombinedScanTask> taskList) {
boolean allParquetFileScanTasks =
taskList.stream()
.allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
.stream()
.allMatch(fileScanTask -> fileScanTask.file().format().equals(
FileFormat.PARQUET)));

boolean allOrcFileScanTasks =
taskList.stream()
.allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
.stream()
.allMatch(fileScanTask -> fileScanTask.file().format().equals(
FileFormat.ORC)));

boolean atLeastOneColumn = lazySchema().columns().size() > 0;

boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType());

boolean hasNoDeleteFiles = taskList.stream().noneMatch(TableScanUtil::hasDeletes);

return batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
}

@Override
public String toString() {
return String.format(
Expand Down
Loading