diff --git a/core/src/main/java/org/apache/iceberg/MicroBatches.java b/core/src/main/java/org/apache/iceberg/MicroBatches.java index 798c3379b1fd..d7be8c02dc28 100644 --- a/core/src/main/java/org/apache/iceberg/MicroBatches.java +++ b/core/src/main/java/org/apache/iceberg/MicroBatches.java @@ -21,11 +21,11 @@ import java.io.IOException; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -109,18 +109,18 @@ public MicroBatchBuilder specsById(Map specs) { return this; } - public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) { + public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean isStarting) { Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0"); Preconditions.checkArgument(targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0"); - List manifests = scanAllFiles ? snapshot.dataManifests() : + List manifests = isStarting ? snapshot.dataManifests() : snapshot.dataManifests().stream().filter(m -> m.snapshotId().equals(snapshot.snapshotId())) .collect(Collectors.toList()); List> manifestIndexes = indexManifests(manifests); List> skippedManifestIndexes = skipManifests(manifestIndexes, startFileIndex); - return generateMicroBatch(skippedManifestIndexes, startFileIndex, targetSizeInBytes, scanAllFiles); + return generateMicroBatch(skippedManifestIndexes, startFileIndex, targetSizeInBytes, isStarting); } /** @@ -177,9 +177,10 @@ private static List> skipManifests(List> indexedManifests, long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) { if (indexedManifests.isEmpty()) { @@ -195,8 +196,9 @@ private MicroBatch generateMicroBatch(List> indexedM for (int idx = 0; idx < indexedManifests.size(); idx++) { currentFileIndex = indexedManifests.get(idx).second(); - try (CloseableIterable taskIterable = open(indexedManifests.get(idx).first(), scanAllFiles); - CloseableIterator taskIter = taskIterable.iterator()) { + try (CloseableIterable taskIterable = open(indexedManifests.get(idx).first(), + scanAllFiles)) { + Iterator taskIter = taskIterable.iterator(); while (taskIter.hasNext()) { FileScanTask task = taskIter.next(); if (currentFileIndex >= startFileIndex) { diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index 9b86e004c06d..5916fcfc4077 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -36,16 +36,19 @@ import org.apache.spark.sql.sources.DataSourceRegister; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.MicroBatchReadSupport; import org.apache.spark.sql.sources.v2.ReadSupport; import org.apache.spark.sql.sources.v2.StreamWriteSupport; import org.apache.spark.sql.sources.v2.WriteSupport; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.types.StructType; -public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, StreamWriteSupport { +public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, + StreamWriteSupport, MicroBatchReadSupport { private SparkSession lazySpark = null; private Configuration lazyConf = null; @@ -113,6 +116,21 @@ public StreamWriter createStreamWriter(String runId, StructType dsStruct, return new StreamingWriter(lazySparkSession(), table, options, queryId, mode, appId, writeSchema, dsStruct); } + @Override + public MicroBatchReader createMicroBatchReader(Optional schema, String checkpointLocation, + DataSourceOptions options) { + if (schema.isPresent()) { + throw new IllegalStateException("Iceberg does not support specifying the schema at read time"); + } + + Configuration conf = new Configuration(lazyBaseConf()); + Table table = getTableAndResolveHadoopConfiguration(options, conf); + String caseSensitive = conf.get("spark.sql.caseSensitive"); + + return new StreamingReader(lazySparkSession(), table, checkpointLocation, + Boolean.parseBoolean(caseSensitive), options); + } + protected Table findTable(DataSourceOptions options, Configuration conf) { Optional path = options.get("path"); Preconditions.checkArgument(path.isPresent(), "Cannot open table: path is not set"); diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 7a9b73e82d43..a3f7212ee4ca 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -188,6 +188,26 @@ private StructType lazyType() { return type; } + protected Long splitSize() { + return splitSize; + } + + protected Integer splitLookback() { + return splitLookback; + } + + protected Long splitOpenFileCost() { + return splitOpenFileCost; + } + + protected boolean caseSensitive() { + return caseSensitive; + } + + protected List filterExpressions() { + return filterExpressions; + } + @Override public StructType readSchema() { return lazyType(); @@ -315,33 +335,7 @@ public Statistics estimateStatistics() { @Override public boolean enableBatchRead() { - if (readUsingBatch == null) { - boolean allParquetFileScanTasks = - tasks().stream() - .allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files() - .stream() - .allMatch(fileScanTask -> fileScanTask.file().format().equals( - FileFormat.PARQUET))); - - boolean allOrcFileScanTasks = - tasks().stream() - .allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files() - .stream() - .allMatch(fileScanTask -> fileScanTask.file().format().equals( - FileFormat.ORC))); - - boolean atLeastOneColumn = lazySchema().columns().size() > 0; - - boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType()); - - boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); - - boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks); - - this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || - (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); - } - return readUsingBatch; + return readUsingBatch == null ? checkEnableBatchRead(tasks()) : readUsingBatch; } private boolean batchReadsEnabled(boolean isParquetOnly, boolean isOrcOnly) { @@ -389,7 +383,7 @@ private static void mergeIcebergHadoopConfs( .forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key))); } - private List tasks() { + protected List tasks() { if (tasks == null) { TableScan scan = table .newScan() @@ -440,6 +434,35 @@ private List tasks() { return tasks; } + // An extracted method which will be overrided by StreamingReader. This is because the tasks generated by Streaming is + // per batch and cannot be planned like Reader beforehand. + protected boolean checkEnableBatchRead(List taskList) { + boolean allParquetFileScanTasks = + taskList.stream() + .allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files() + .stream() + .allMatch(fileScanTask -> fileScanTask.file().format().equals( + FileFormat.PARQUET))); + + boolean allOrcFileScanTasks = + taskList.stream() + .allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files() + .stream() + .allMatch(fileScanTask -> fileScanTask.file().format().equals( + FileFormat.ORC))); + + boolean atLeastOneColumn = lazySchema().columns().size() > 0; + + boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType()); + + boolean hasNoDeleteFiles = taskList.stream().noneMatch(TableScanUtil::hasDeletes); + + boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks); + + return batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || + (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + } + @Override public String toString() { return String.format( diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java b/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java index f15c5c6536e9..04937d5f2128 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; +import java.io.InputStream; import java.io.StringWriter; import java.io.UncheckedIOException; import org.apache.iceberg.relocated.com.google.common.base.Objects; @@ -78,6 +79,19 @@ static StreamingOffset fromJson(String json) { } } + static StreamingOffset fromJson(InputStream inputStream) { + Preconditions.checkNotNull(inputStream, "Cannot parse StreamingOffset from inputStream: null"); + + JsonNode node; + try { + node = JsonUtil.mapper().readValue(inputStream, JsonNode.class); + } catch (IOException e) { + throw new UncheckedIOException("Failed to read StreamingOffset from json", e); + } + + return fromJsonNode(node); + } + @Override public String json() { StringWriter writer = new StringWriter(); @@ -132,4 +146,19 @@ public String toString() { return String.format("Streaming Offset[%d: position (%d) scan_all_files (%b)]", snapshotId, position, scanAllFiles); } + + private static StreamingOffset fromJsonNode(JsonNode node) { + // The version of StreamingOffset. The offset was created with a version number + // used to validate when deserializing from json string. + int version = JsonUtil.getInt(VERSION, node); + Preconditions.checkArgument(version == CURR_VERSION, + "This version of Iceberg source only supports version %s. Version %s is not supported.", + CURR_VERSION, version); + + long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); + int position = JsonUtil.getInt(POSITION, node); + boolean shouldScanAllFiles = JsonUtil.getBool(SCAN_ALL_FILES, node); + + return new StreamingOffset(snapshotId, position, shouldScanAllFiles); + } } diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java new file mode 100644 index 000000000000..27c0394b8b5d --- /dev/null +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.commons.collections.CollectionUtils; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; +import org.apache.spark.sql.sources.v2.reader.streaming.Offset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +/** + * A micro-batch based Spark Structured Streaming reader for Iceberg table. It will track the added + * files and generate tasks per batch to process newly added files. By default it will process + * all the newly added files to the current snapshot in each batch, user could also set this + * configuration "max-files-per-trigger" to control the number of files processed per batch. + */ +class StreamingReader extends Reader implements MicroBatchReader { + private static final Logger LOG = LoggerFactory.getLogger(StreamingReader.class); + private static final String MAX_SIZE_PER_BATCH = "max-size-per-batch"; + private static final String START_SNAPSHOT_ID = "start-snapshot-id"; + private static final Joiner SLASH = Joiner.on("/"); + + private StreamingOffset startOffset; + private StreamingOffset endOffset; + + private final Table table; + private final long maxSizePerBatch; + private final Long startSnapshotId; + private final long splitSize; + private final int splitLookback; + private final long splitOpenFileCost; + private Boolean readUsingBatch = null; + + /** + * Used to cache the pending batches for this streaming batch interval. + */ + private Pair> cachedPendingBatches = null; + + StreamingReader(SparkSession spark, Table table, String checkpointLocation, boolean caseSensitive, + DataSourceOptions options) { + super(spark, table, caseSensitive, options); + + this.table = table; + this.maxSizePerBatch = options.get(MAX_SIZE_PER_BATCH).map(Long::parseLong).orElse(Long.MAX_VALUE); + Preconditions.checkArgument(maxSizePerBatch > 0L, + "Option max-size-per-batch '%s' should > 0", maxSizePerBatch); + + this.startSnapshotId = options.get(START_SNAPSHOT_ID).map(Long::parseLong).orElse(null); + if (startSnapshotId != null) { + if (!SnapshotUtil.ancestorOf(table, table.currentSnapshot().snapshotId(), startSnapshotId)) { + throw new IllegalArgumentException("The option start-snapshot-id " + startSnapshotId + + " is not an ancestor of the current snapshot"); + } + } + + long tableSplitSize = Optional.ofNullable(splitSize()) + .orElseGet( + () -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT)); + int tableSplitLookback = Optional.ofNullable(splitLookback()) + .orElseGet(() -> PropertyUtil + .propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT)); + long tableSplitOpenFileCost = Optional.ofNullable(splitOpenFileCost()) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, + SPLIT_OPEN_FILE_COST_DEFAULT)); + + this.splitSize = options.getLong(SparkReadOptions.SPLIT_SIZE, tableSplitSize); + this.splitLookback = options.getInt(SparkReadOptions.LOOKBACK, tableSplitLookback); + this.splitOpenFileCost = options + .getLong(SparkReadOptions.FILE_OPEN_COST, tableSplitOpenFileCost); + + InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation); + this.startOffset = initialOffsetStore.initialOffset(); + } + + @Override + @SuppressWarnings("unchecked") + public void setOffsetRange(Optional start, Optional end) { + table.refresh(); + + if (start.isPresent() && !StreamingOffset.START_OFFSET.equals(start.get())) { + this.startOffset = (StreamingOffset) start.get(); + this.endOffset = (StreamingOffset) end.orElseGet(() -> calculateEndOffset(startOffset)); + } else { + // If starting offset is "START_OFFSET" (there's no snapshot in the last batch), or starting + // offset is not set, then we need to calculate the starting offset again. + this.startOffset = calculateStartingOffset(); + this.endOffset = calculateEndOffset(startOffset); + } + } + + @Override + public Offset getStartOffset() { + if (startOffset == null) { + throw new IllegalStateException("Start offset is not set"); + } + + return startOffset; + } + + @Override + public Offset getEndOffset() { + if (endOffset == null) { + throw new IllegalStateException("End offset is not set"); + } + + return endOffset; + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + // Since all the data and metadata of Iceberg is as it is, nothing needs to commit when + // offset is processed, so no need to implement this method. + } + + @Override + public void stop() { + } + + @Override + public boolean enableBatchRead() { + return readUsingBatch != null && readUsingBatch; + } + + @Override + public String toString() { + return String.format( + "IcebergStreamScan(table=%s, type=%s)", table, table.schema().asStruct()); + } + + @Override + @SuppressWarnings("unchecked") + protected List tasks() { + if (startOffset.equals(endOffset)) { + LOG.info("Start offset {} equals to end offset {}, no data to process", startOffset, endOffset); + return Collections.emptyList(); + } + + Preconditions.checkState(cachedPendingBatches != null, + "pendingBatches is null, which is unexpected as it will be set when calculating end offset"); + + List pendingBatches = cachedPendingBatches.second(); + if (pendingBatches.isEmpty()) { + LOG.info("Current start offset {} and end offset {}, there's no task to process in this batch", + startOffset, endOffset); + return Collections.emptyList(); + } + + MicroBatch lastBatch = pendingBatches.get(pendingBatches.size() - 1); + Preconditions.checkState( + lastBatch.snapshotId() == endOffset.snapshotId() && lastBatch.endFileIndex() == endOffset.position(), + "The cached pendingBatches doesn't match the current end offset " + endOffset); + + LOG.info("Processing data from {} to {}", startOffset, endOffset); + List tasks = pendingBatches.stream() + .flatMap(batch -> batch.tasks().stream()) + .collect(Collectors.toList()); + CloseableIterable splitTasks = TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks), + splitSize); + List combinedScanTasks = Lists.newArrayList( + TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); + + if (readUsingBatch == null) { + this.readUsingBatch = checkEnableBatchRead(combinedScanTasks); + } + + return combinedScanTasks; + } + + /** + * Used to calculate start offset. If the startSnapshotId has a value, start the construction + * from the specified snapshot, otherwise, start the construction from the beginning. + * + * @return The start offset to scan from. + */ + private StreamingOffset calculateStartingOffset() { + StreamingOffset startingOffset; + if (startSnapshotId != null) { + startingOffset = new StreamingOffset(startSnapshotId, 0, false); + } else { + List snapshotIds = SnapshotUtil.currentAncestors(table); + if (snapshotIds.isEmpty()) { + // there's no snapshot currently. + startingOffset = StreamingOffset.START_OFFSET; + } else { + startingOffset = new StreamingOffset(Iterables.getLast(snapshotIds), 0, true); + } + } + + return startingOffset; + } + + /** + * Used to calculate end offset. + * + * @param start The start offset to scan from + * @return The end offset to scan to + */ + private StreamingOffset calculateEndOffset(StreamingOffset start) { + if (start.equals(StreamingOffset.START_OFFSET)) { + return StreamingOffset.START_OFFSET; + } + + // Spark will invoke setOffsetRange more than once. If this is already calculated, use the cached one to avoid + // calculating again. + if (cachedPendingBatches == null || CollectionUtils.isEmpty(cachedPendingBatches.second()) || + !cachedPendingBatches.first().equals(start)) { + this.cachedPendingBatches = Pair.of(start, getChangesWithRateLimit(start, maxSizePerBatch)); + } + + List batches = cachedPendingBatches.second(); + MicroBatch lastBatch = Iterables.getLast(batches, null); + + if (lastBatch == null) { + return start; + } else { + boolean isStarting = lastBatch.snapshotId() == start.snapshotId() && start.shouldScanAllFiles(); + return new StreamingOffset(lastBatch.snapshotId(), lastBatch.endFileIndex(), isStarting); + } + } + + /** + * Streaming Read control is performed by changing the offset and maxSize. + * + * @param offset The start offset to scan from + * @param maxSize The maximum size of Bytes can calculate how many batches + * @return MicroBatch of list + */ + @VisibleForTesting + List getChangesWithRateLimit(StreamingOffset offset, long maxSize) { + List batches = Lists.newArrayList(); + long currentLeftSize = maxSize; + MicroBatch lastBatch = null; + + assertNoOverwrite(table.snapshot(offset.snapshotId())); + if (shouldGenerateFromStartOffset(offset)) { + MicroBatch batch = generateMicroBatch(offset.snapshotId(), offset.position(), + offset.shouldScanAllFiles(), currentLeftSize); + if (!batch.tasks().isEmpty()) { + batches.add(batch); + currentLeftSize -= batch.sizeInBytes(); + lastBatch = Iterables.getLast(batches); + } + } + + // Current snapshot can already satisfy the size needs. + if (currentLeftSize <= 0L || (lastBatch != null && !lastBatch.lastIndexOfSnapshot())) { + return batches; + } + + long currentSnapshotId = table.currentSnapshot().snapshotId(); + if (currentSnapshotId == offset.snapshotId()) { + // the snapshot of current offset is already the latest snapshot of this table. + return batches; + } + + ImmutableList snapshotIds = ImmutableList.builder() + .addAll(SnapshotUtil.snapshotIdsBetween(table, offset.snapshotId(), currentSnapshotId)) + .build() + .reverse(); + + for (Long id : snapshotIds) { + Snapshot snapshot = table.snapshot(id); + assertNoOverwrite(snapshot); + if (!isAppend(snapshot)) { + continue; + } + + MicroBatch batch = generateMicroBatch(id, 0, false, currentLeftSize); + if (!batch.tasks().isEmpty()) { + batches.add(batch); + currentLeftSize -= batch.sizeInBytes(); + lastBatch = Iterables.getLast(batches); + } + + // If the current request size is already satisfied, or none of the DataFile size can satisfy the current left + // size, break the current loop. + if (currentLeftSize <= 0L || !Objects.requireNonNull(lastBatch).lastIndexOfSnapshot()) { + break; + } + } + + return batches; + } + + private MicroBatch generateMicroBatch(long snapshotId, long startIndex, boolean isStart, long currentLeftSize) { + return MicroBatches.from(table.snapshot(snapshotId), table.io()) + .caseSensitive(caseSensitive()) + .specsById(table.specs()) + .generate(startIndex, currentLeftSize, isStart); + } + + @SuppressWarnings("checkstyle:HiddenField") + private boolean shouldGenerateFromStartOffset(StreamingOffset startOffset) { + boolean isSnapshotFullyProcessed; + if (cachedPendingBatches != null && !cachedPendingBatches.second().isEmpty()) { + List batches = cachedPendingBatches.second(); + MicroBatch lastBatch = Iterables.getLast(batches, null); + isSnapshotFullyProcessed = lastBatch.lastIndexOfSnapshot(); + } else { + isSnapshotFullyProcessed = false; + } + return !isSnapshotFullyProcessed && + (startOffset.shouldScanAllFiles() || isAppend(table.snapshot(startOffset.snapshotId()))); + } + + private static void assertNoOverwrite(Snapshot snapshot) { + if (!snapshot.operation().equals(DataOperations.APPEND)) { + throw new UnsupportedOperationException(String.format("Found %s operation, cannot support incremental data for " + + "snapshot %d", snapshot.operation(), snapshot.snapshotId())); + } + } + + private static boolean isAppend(Snapshot snapshot) { + return snapshot.operation().equals(DataOperations.APPEND); + } + + private static class InitialOffsetStore { + private final Table table; + private final FileIO io; + private final String initialOffsetLocation; + + InitialOffsetStore(Table table, String checkpointLocation) { + this.table = table; + this.io = table.io(); + this.initialOffsetLocation = SLASH.join(checkpointLocation, "offsets/0"); + } + + public StreamingOffset initialOffset() { + InputFile inputFile = io.newInputFile(initialOffsetLocation); + if (inputFile.exists()) { + return readOffset(inputFile); + } + + table.refresh(); + StreamingOffset offset = table.currentSnapshot() == null ? + StreamingOffset.START_OFFSET : + new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, false); + + OutputFile outputFile = io.newOutputFile(initialOffsetLocation); + writeOffset(offset, outputFile); + + return offset; + } + + private void writeOffset(StreamingOffset offset, OutputFile file) { + try (OutputStream outputStream = file.create()) { + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)); + writer.write(offset.json()); + writer.flush(); + } catch (IOException ioException) { + throw new UncheckedIOException( + String.format("Failed writing offset to: %s", initialOffsetLocation), ioException); + } + } + + private StreamingOffset readOffset(InputFile file) { + try (InputStream in = file.newStream()) { + return StreamingOffset.fromJson(in); + } catch (IOException ioException) { + throw new UncheckedIOException( + String.format("Failed reading offset from: %s", initialOffsetLocation), ioException); + } + } + } +} diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java new file mode 100644 index 000000000000..834952ddfacb --- /dev/null +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.streaming.DataStreamWriter; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public class TestStructuredStreamingRead { + private static final Configuration CONF = new Configuration(); + private static final Schema SCHEMA = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()) + ); + private static SparkSession spark = null; + private static Path parent = null; + private static File tableLocation = null; + private static Table table = null; + private static List> expected = null; + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + @BeforeClass + public static void startSpark() throws Exception { + TestStructuredStreamingRead.spark = SparkSession.builder() + .master("local[2]") + .config("spark.sql.shuffle.partitions", 4) + .getOrCreate(); + + parent = Files.createTempDirectory("test"); + tableLocation = new File(parent.toFile(), "table"); + tableLocation.mkdir(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); + table = tables.create(SCHEMA, spec, tableLocation.toString()); + + expected = Lists.newArrayList( + Lists.newArrayList(new SimpleRecord(1, "1")), + Lists.newArrayList(new SimpleRecord(2, "2")), + Lists.newArrayList(new SimpleRecord(3, "3")), + Lists.newArrayList(new SimpleRecord(4, "4")) + ); + + // Write records one by one to generate 4 snapshots. + for (List l : expected) { + Dataset df = spark.createDataFrame(l, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(tableLocation.toString()); + } + table.refresh(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestStructuredStreamingRead.spark; + TestStructuredStreamingRead.spark = null; + currentSpark.stop(); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesFromStart() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + IcebergSource source = new IcebergSource(); + + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + long initialSnapshotId = snapshotIds.get(0); + + // Getting all appends from initial snapshot. + List pendingBatches = streamingReader.getChangesWithRateLimit( + new StreamingOffset(initialSnapshotId, 0, true), Long.MAX_VALUE); + Assert.assertEquals("Batches with unlimited size control should have 4 snapshots", 4, pendingBatches.size()); + + List batchSnapshotIds = pendingBatches.stream() + .map(MicroBatch::snapshotId) + .collect(Collectors.toList()); + Assert.assertEquals("Snapshot id of each batch should match snapshot id of table", snapshotIds, batchSnapshotIds); + + // Getting appends from initial snapshot with last index, 1st snapshot should be an empty batch. + List pendingBatches1 = streamingReader.getChangesWithRateLimit( + new StreamingOffset(initialSnapshotId, 1, true), Long.MAX_VALUE); + + Assert.assertEquals("Batches with unlimited size control from initial id should have 3 snapshots", + 3, pendingBatches1.size()); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesFrom2ndSnapshot() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + IcebergSource source = new IcebergSource(); + + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + long initialSnapshotId = snapshotIds.get(0); + + // Getting appends from 2nd snapshot, 1st snapshot should be filtered out. + List pendingBatches = streamingReader.getChangesWithRateLimit( + new StreamingOffset(snapshotIds.get(1), 0, false), Long.MAX_VALUE); + + Assert.assertEquals(3, pendingBatches.size()); + List batchSnapshotIds = pendingBatches.stream() + .map(MicroBatch::snapshotId) + .collect(Collectors.toList()); + Assert.assertFalse("1st snapshot should be filtered", batchSnapshotIds.contains(initialSnapshotId)); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesFromLastSnapshot() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + IcebergSource source = new IcebergSource(); + + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + // Getting appends from last snapshot with last index, should get an empty batch. + long lastSnapshotId = snapshotIds.get(3); + List pendingBatches = streamingReader.getChangesWithRateLimit( + new StreamingOffset(lastSnapshotId, 0, false), Long.MAX_VALUE); + + Assert.assertEquals("Should only have 1 batch with last snapshot", 1, pendingBatches.size()); + MicroBatch batch = pendingBatches.get(0); + Assert.assertTrue("Batch's size should be around 600", batch.sizeInBytes() < 1000 && batch.sizeInBytes() > 0); + Assert.assertEquals("Batch endFileIndex should be euqal to start", 1, batch.endFileIndex()); + Assert.assertEquals("Batch should have 1 task", 1, batch.tasks().size()); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesWithRateLimit1000() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + long initialSnapshotId = snapshotIds.get(0); + + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + // The size of each data file is around 600 bytes. + // Max size set to 1000. One additional batch will be added because the left size is less than file size, + // MicroBatchBuilder will add one more to avoid stuck. + List rateLimitedBatches = streamingReader.getChangesWithRateLimit( + new StreamingOffset(initialSnapshotId, 0, true), 1000); + + Assert.assertEquals("Should have 2 batches", 2L, rateLimitedBatches.size()); + MicroBatch batch = rateLimitedBatches.get(0); + Assert.assertEquals("1st batch's endFileIndex should reach to the end of file indexes", 1, batch.endFileIndex()); + Assert.assertTrue("1st batch should be the last index of 1st snapshot", batch.lastIndexOfSnapshot()); + Assert.assertEquals("1st batch should only have 1 task", 1, batch.tasks().size()); + Assert.assertTrue("1st batch's size should be around 600", batch.sizeInBytes() < 1000 && batch.sizeInBytes() > 0); + + MicroBatch batch1 = rateLimitedBatches.get(1); + Assert.assertEquals("2nd batch's endFileIndex should reach to the end of file indexes", 1, batch1.endFileIndex()); + Assert.assertTrue("2nd batch should be the last of 2nd snapshot", batch1.lastIndexOfSnapshot()); + Assert.assertEquals("2nd batch should only have 1 task", 1, batch1.tasks().size()); + Assert.assertTrue("2nd batch's size should be aound 600", batch1.sizeInBytes() < 1000 && batch1.sizeInBytes() > 0); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesWithRateLimit100() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + // Max size less than file size, should have one batch added to avoid stuck. + List rateLimitedBatches = streamingReader.getChangesWithRateLimit( + new StreamingOffset(snapshotIds.get(1), 1, false), 100); + + Assert.assertEquals("Should only have 1 batch", 1, rateLimitedBatches.size()); + MicroBatch batch = rateLimitedBatches.get(0); + Assert.assertEquals("Batch's endFileIndex should reach to the end of file indexes", 1, batch.endFileIndex()); + Assert.assertTrue("Batch should be the last of 1st snapshot", batch.lastIndexOfSnapshot()); + Assert.assertEquals("Batch should have 1 task", 1, batch.tasks().size()); + Assert.assertTrue("Batch's size should be around 600", batch.sizeInBytes() < 1000 && batch.sizeInBytes() > 0); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesWithRateLimit10000() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + // Max size set to 10000, the last left batch will be added. + List rateLimitedBatches = streamingReader.getChangesWithRateLimit( + new StreamingOffset(snapshotIds.get(2), 1, false), 10000); + + Assert.assertEquals("Should only have 1 batch", 1, rateLimitedBatches.size()); + MicroBatch batch = rateLimitedBatches.get(0); + Assert.assertEquals("Batch's endFileIndex should reach to the end of file indexes", 1, batch.endFileIndex()); + Assert.assertEquals("Batch should have 1 task", 1, batch.tasks().size()); + Assert.assertTrue("Batch should have 1 task", batch.lastIndexOfSnapshot()); + Assert.assertTrue("Batch's size should be around 600", batch.sizeInBytes() < 1000 && batch.sizeInBytes() > 0); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetOffsetWithDefaultRateLimit() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + // Default max size per batch, this will consume all the data of this table. + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + streamingReader.setOffsetRange(Optional.empty(), Optional.empty()); + + StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset(); + Assert.assertEquals("Start offset's snapshot id should be 1st snapshot id", + snapshotIds.get(0).longValue(), start.snapshotId()); + Assert.assertEquals("Start offset's index should be the start index of 1st snapshot", 0, start.position()); + Assert.assertTrue("Start offset's snapshot should do a full table scan", start.shouldScanAllFiles()); + + StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset(); + Assert.assertEquals("End offset's snapshot should be the last snapshot id", + snapshotIds.get(3).longValue(), end.snapshotId()); + Assert.assertEquals("End offset's index should be the last index", 1, end.position()); + Assert.assertFalse("End offset's snapshot should not do a full table scan", end.shouldScanAllFiles()); + + streamingReader.setOffsetRange(Optional.of(end), Optional.empty()); + StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset(); + Assert.assertEquals("End offset should be same to start offset since there's no more batches to consume", + end1, end); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetOffsetWithRateLimit1000() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + // Max size to 1000, this will generate two MicroBatches per consuming. + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString(), + "max-size-per-batch", "1000")); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + streamingReader.setOffsetRange(Optional.empty(), Optional.empty()); + StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(0), 0, true), start); + + StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(1), 1, false), end); + + streamingReader.setOffsetRange(Optional.of(end), Optional.empty()); + StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(3), 1, false), end1); + + streamingReader.setOffsetRange(Optional.of(end1), Optional.empty()); + StreamingOffset end2 = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(end2, end1); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetOffsetWithRateLimit100() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + // Max size to 100, will generate 1 MicroBatch per consuming. + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString(), + "max-size-per-batch", "100")); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + streamingReader.setOffsetRange(Optional.empty(), Optional.empty()); + StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(0), 0, true), start); + + StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(0), 1, true), end); + + streamingReader.setOffsetRange(Optional.of(end), Optional.empty()); + StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(1), 1, false), end1); + } + + @SuppressWarnings("unchecked") + @Test + public void testSpecifyInvalidSnapshotId() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + IcebergSource source = new IcebergSource(); + + // test invalid snapshot id + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString(), + "start-snapshot-id", "-1")); + AssertHelpers.assertThrows("Test invalid snapshot id", + IllegalArgumentException.class, "The option start-snapshot-id -1 is not an ancestor", + () -> source.createMicroBatchReader(Optional.empty(), checkpoint.toString(), options)); + } + + @SuppressWarnings("unchecked") + @Test + public void testSpecifySnapshotId() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + // test specify snapshot-id + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString(), + "start-snapshot-id", snapshotIds.get(1).toString(), + "max-size-per-batch", "1000")); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + streamingReader.setOffsetRange(Optional.empty(), Optional.empty()); + StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(1), 0, false), start); + + StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(2), 1, false), end); + + streamingReader.setOffsetRange(Optional.of(end), Optional.empty()); + StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(3), 1, false), end1); + + streamingReader.setOffsetRange(Optional.of(end1), Optional.empty()); + StreamingOffset end2 = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(3), 1, false), end2); + } + + @Test + public void testStreamingRead() { + Dataset read = spark.readStream() + .format("iceberg") + .load(tableLocation.toString()); + DataStreamWriter streamWriter = read.writeStream() + .format("memory") + .outputMode("append") + .queryName("memoryStream"); + + try { + StreamingQuery query = streamWriter.start(); + query.processAllAvailable(); + + List actual = spark.table("memoryStream") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + List expectedResult = expected.stream().flatMap(List::stream).collect(Collectors.toList()); + validateResult(expectedResult, actual); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @Test + public void testStreamingReadWithSpecifiedSnapshotId() { + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + Dataset read = spark.readStream() + .format("iceberg") + .option("start-snapshot-id", snapshotIds.get(1).toString()) + .load(tableLocation.toString()); + + try { + StreamingQuery query = read.writeStream() + .format("memory") + .outputMode("append") + .queryName("memoryStream1") + .start(); + + query.processAllAvailable(); + + List actual = spark.table("memoryStream1") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + + List expectedResult = expected.stream().flatMap(List::stream) + .filter(d -> !d.equals(expected.get(0).get(0))).collect(Collectors.toList()); + validateResult(expectedResult, actual); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + private static void validateOffset(StreamingOffset expectedResult, StreamingOffset actualResult) { + Assert.assertEquals(String.format("Actual StreamingOffset %s doesn't equal to the expected StreamingOffset %s", + actualResult, expectedResult), expectedResult, actualResult); + } + + private static void validateResult(List expectedResult, List actualResult) { + expectedResult.sort(Comparator.comparingInt(SimpleRecord::getId)); + actualResult.sort(Comparator.comparingInt(SimpleRecord::getId)); + + Assert.assertEquals("Streaming result should be matched", expectedResult, actualResult); + } +}