Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -109,18 +109,18 @@ public MicroBatchBuilder specsById(Map<Integer, PartitionSpec> specs) {
return this;
}

public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean isStarting) {
Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0");

List<ManifestFile> manifests = scanAllFiles ? snapshot.dataManifests() :
List<ManifestFile> manifests = isStarting ? snapshot.dataManifests() :
snapshot.dataManifests().stream().filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
.collect(Collectors.toList());

List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests);
List<Pair<ManifestFile, Integer>> skippedManifestIndexes = skipManifests(manifestIndexes, startFileIndex);

return generateMicroBatch(skippedManifestIndexes, startFileIndex, targetSizeInBytes, scanAllFiles);
return generateMicroBatch(skippedManifestIndexes, startFileIndex, targetSizeInBytes, isStarting);
}

/**
Expand Down Expand Up @@ -177,9 +177,10 @@ private static List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<Manifes
* @param startFileIndex A startFileIndex used to skip processed files.
* @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes must be smaller than
* this size.
* @param scanAllFiles Used to check whether all the data files should be processed, or only added files.
* @param scanAllFiles Used to check whether all the data file should be processed, or only added files.
* @return A MicroBatch.
*/
@SuppressWarnings("CyclomaticComplexity")
private MicroBatch generateMicroBatch(List<Pair<ManifestFile, Integer>> indexedManifests,
long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
if (indexedManifests.isEmpty()) {
Expand All @@ -195,8 +196,9 @@ private MicroBatch generateMicroBatch(List<Pair<ManifestFile, Integer>> indexedM
for (int idx = 0; idx < indexedManifests.size(); idx++) {
currentFileIndex = indexedManifests.get(idx).second();

try (CloseableIterable<FileScanTask> taskIterable = open(indexedManifests.get(idx).first(), scanAllFiles);
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
try (CloseableIterable<FileScanTask> taskIterable = open(indexedManifests.get(idx).first(),
scanAllFiles)) {
Iterator<FileScanTask> taskIter = taskIterable.iterator();
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
if (currentFileIndex >= startFileIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.StreamWriteSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, StreamWriteSupport {
public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister,
StreamWriteSupport, MicroBatchReadSupport {

private SparkSession lazySpark = null;
private Configuration lazyConf = null;
Expand Down Expand Up @@ -113,6 +116,21 @@ public StreamWriter createStreamWriter(String runId, StructType dsStruct,
return new StreamingWriter(lazySparkSession(), table, options, queryId, mode, appId, writeSchema, dsStruct);
}

@Override
public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, String checkpointLocation,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems like we're not using the checkpointLocation here, do we not need to store anything to be able to recover on failure? I think this is the case because as we read data it's not like it gets "consumed" but I just want to make sure.

DataSourceOptions options) {
if (schema.isPresent()) {
throw new IllegalStateException("Iceberg does not support specifying the schema at read time");
}

Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
String caseSensitive = conf.get("spark.sql.caseSensitive");

return new StreamingReader(lazySparkSession(), table, checkpointLocation,
Boolean.parseBoolean(caseSensitive), options);
}

protected Table findTable(DataSourceOptions options, Configuration conf) {
Optional<String> path = options.get("path");
Preconditions.checkArgument(path.isPresent(), "Cannot open table: path is not set");
Expand Down
79 changes: 51 additions & 28 deletions spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,26 @@ private StructType lazyType() {
return type;
}

protected Long splitSize() {
return splitSize;
}

protected Integer splitLookback() {
return splitLookback;
}

protected Long splitOpenFileCost() {
return splitOpenFileCost;
}

protected boolean caseSensitive() {
return caseSensitive;
}

protected List<Expression> filterExpressions() {
return filterExpressions;
}

@Override
public StructType readSchema() {
return lazyType();
Expand Down Expand Up @@ -315,33 +335,7 @@ public Statistics estimateStatistics() {

@Override
public boolean enableBatchRead() {
if (readUsingBatch == null) {
boolean allParquetFileScanTasks =
tasks().stream()
.allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
.stream()
.allMatch(fileScanTask -> fileScanTask.file().format().equals(
FileFormat.PARQUET)));

boolean allOrcFileScanTasks =
tasks().stream()
.allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
.stream()
.allMatch(fileScanTask -> fileScanTask.file().format().equals(
FileFormat.ORC)));

boolean atLeastOneColumn = lazySchema().columns().size() > 0;

boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType());

boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);

boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks);

this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
}
return readUsingBatch;
return readUsingBatch == null ? checkEnableBatchRead(tasks()) : readUsingBatch;
}

private boolean batchReadsEnabled(boolean isParquetOnly, boolean isOrcOnly) {
Expand Down Expand Up @@ -389,7 +383,7 @@ private static void mergeIcebergHadoopConfs(
.forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
}

private List<CombinedScanTask> tasks() {
protected List<CombinedScanTask> tasks() {
if (tasks == null) {
TableScan scan = table
.newScan()
Expand Down Expand Up @@ -440,6 +434,35 @@ private List<CombinedScanTask> tasks() {
return tasks;
}

// An extracted method which will be overrided by StreamingReader. This is because the tasks generated by Streaming is
// per batch and cannot be planned like Reader beforehand.
protected boolean checkEnableBatchRead(List<CombinedScanTask> taskList) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this, enableBatchRead at L331 can be simplified.

boolean allParquetFileScanTasks =
taskList.stream()
.allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
.stream()
.allMatch(fileScanTask -> fileScanTask.file().format().equals(
FileFormat.PARQUET)));

boolean allOrcFileScanTasks =
taskList.stream()
.allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
.stream()
.allMatch(fileScanTask -> fileScanTask.file().format().equals(
FileFormat.ORC)));

boolean atLeastOneColumn = lazySchema().columns().size() > 0;

boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType());

boolean hasNoDeleteFiles = taskList.stream().noneMatch(TableScanUtil::hasDeletes);

boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks);

return batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
Expand Down Expand Up @@ -78,6 +79,19 @@ static StreamingOffset fromJson(String json) {
}
}

static StreamingOffset fromJson(InputStream inputStream) {
Preconditions.checkNotNull(inputStream, "Cannot parse StreamingOffset from inputStream: null");

JsonNode node;
try {
node = JsonUtil.mapper().readValue(inputStream, JsonNode.class);
} catch (IOException e) {
throw new UncheckedIOException("Failed to read StreamingOffset from json", e);
}

return fromJsonNode(node);
}

@Override
public String json() {
StringWriter writer = new StringWriter();
Expand Down Expand Up @@ -132,4 +146,19 @@ public String toString() {
return String.format("Streaming Offset[%d: position (%d) scan_all_files (%b)]",
snapshotId, position, scanAllFiles);
}

private static StreamingOffset fromJsonNode(JsonNode node) {
// The version of StreamingOffset. The offset was created with a version number
// used to validate when deserializing from json string.
int version = JsonUtil.getInt(VERSION, node);
Preconditions.checkArgument(version == CURR_VERSION,
"This version of Iceberg source only supports version %s. Version %s is not supported.",
CURR_VERSION, version);

long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node);
int position = JsonUtil.getInt(POSITION, node);
boolean shouldScanAllFiles = JsonUtil.getBool(SCAN_ALL_FILES, node);

return new StreamingOffset(snapshotId, position, shouldScanAllFiles);
}
}
Loading