diff --git a/core/src/main/java/org/apache/iceberg/MicroBatches.java b/core/src/main/java/org/apache/iceberg/MicroBatches.java index 846f20a07b7c..ba3d463da136 100644 --- a/core/src/main/java/org/apache/iceberg/MicroBatches.java +++ b/core/src/main/java/org/apache/iceberg/MicroBatches.java @@ -177,11 +177,11 @@ private static List> skipManifests(List> indexedManifests, - int startFileIndex, long targetSizeInBytes, boolean isStarting) { + int startFileIndex, long targetSizeInBytes, boolean scanAllFiles) { if (indexedManifests.isEmpty()) { return new MicroBatch(snapshot.snapshotId(), startFileIndex, startFileIndex + 1, 0L, Collections.emptyList(), true); @@ -195,7 +195,7 @@ private MicroBatch generateMicroBatch(List> indexedM for (int idx = 0; idx < indexedManifests.size(); idx++) { currentFileIndex = indexedManifests.get(idx).second(); - try (CloseableIterable taskIterable = open(indexedManifests.get(idx).first(), isStarting); + try (CloseableIterable taskIterable = open(indexedManifests.get(idx).first(), scanAllFiles); CloseableIterator taskIter = taskIterable.iterator()) { while (taskIter.hasNext()) { FileScanTask task = taskIter.next(); @@ -238,11 +238,11 @@ private MicroBatch generateMicroBatch(List> indexedM tasks, isLastIndex); } - private CloseableIterable open(ManifestFile manifestFile, boolean isStarting) { + private CloseableIterable open(ManifestFile manifestFile, boolean scanAllFiles) { ManifestGroup manifestGroup = new ManifestGroup(io, ImmutableList.of(manifestFile)) .specsById(specsById) .caseSensitive(caseSensitive); - if (isStarting) { + if (!scanAllFiles) { manifestGroup = manifestGroup .filterManifestEntries(entry -> entry.snapshotId() == snapshot.snapshotId() && entry.status() == ManifestEntry.Status.ADDED) diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 85f08e057340..3cf69259c252 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -20,6 +20,7 @@ package org.apache.iceberg.util; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.apache.iceberg.DataFile; import org.apache.iceberg.Snapshot; @@ -69,8 +70,20 @@ public static List currentAncestors(Table table) { * This method assumes that fromSnapshotId is an ancestor of toSnapshotId. */ public static List snapshotIdsBetween(Table table, long fromSnapshotId, long toSnapshotId) { + AtomicBoolean isAncestor = new AtomicBoolean(false); List snapshotIds = Lists.newArrayList(ancestorIds(table.snapshot(toSnapshotId), - snapshotId -> snapshotId != fromSnapshotId ? table.snapshot(snapshotId) : null)); + snapshotId -> { + if (snapshotId == fromSnapshotId) { + isAncestor.set(true); + return null; + } else { + return table.snapshot(snapshotId); + } + })); + if (!isAncestor.get()) { + throw new IllegalStateException(fromSnapshotId + " is not an ancestor of " + toSnapshotId); + } + return snapshotIds; } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java new file mode 100644 index 000000000000..2fe0b02fe729 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.io.StringWriter; +import java.io.UncheckedIOException; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; +import org.apache.spark.sql.sources.v2.reader.streaming.Offset; + +/** + * An implementation of Spark Structured Streaming Offset, to track the current processed + * files of Iceberg table. This StreamingOffset consists of: + * + * version: The version of StreamingOffset. The offset was created with a version number used to validate + * when deserializing from json string. + * snapshot_id: The current processed snapshot id. + * index: The index of last scanned file in snapshot. + * scan_all_files: Denote whether to scan all files in a snapshot, currently we only scan all files in the starting + * snapshot. + * snapshot_fully_processed: Denote whether the current snapshot is fully processed, to avoid revisiting the processed + * snapshot. + */ +class StreamingOffset extends Offset { + static final StreamingOffset START_OFFSET = new StreamingOffset(-1L, -1, false, true); + + private static final int CURR_VERSION = 1; + private static final String VERSION = "version"; + private static final String SNAPSHOT_ID = "snapshot_id"; + private static final String INDEX = "index"; + private static final String SCAN_ALL_FILES = "scan_all_files"; + private static final String SNAPSHOT_FULLY_PROCESSED = "snapshot_fully_processed"; + + private final long snapshotId; + private final int index; + private final boolean scanAllFiles; + private final boolean snapshotFullyProcessed; + + StreamingOffset(long snapshotId, int index, boolean scanAllFiles, boolean snapshotFullyProcessed) { + this.snapshotId = snapshotId; + this.index = index; + this.scanAllFiles = scanAllFiles; + this.snapshotFullyProcessed = snapshotFullyProcessed; + } + + static StreamingOffset fromJson(String json) { + Preconditions.checkNotNull(json, "The input JSON string is null"); + + try { + JsonNode node = JsonUtil.mapper().readValue(json, JsonNode.class); + int version = JsonUtil.getInt(VERSION, node); + if (version > CURR_VERSION) { + throw new IOException(String.format("Cannot deserialize a JSON offset from version %d. %d is not compatible " + + "with the version of Iceberg %d and cannot be used. Please use a compatible version of Iceberg " + + "to read this offset", version, version, CURR_VERSION)); + } + + long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); + int index = JsonUtil.getInt(INDEX, node); + boolean shouldScanAllFiles = JsonUtil.getBool(SCAN_ALL_FILES, node); + boolean snapshotFullyProcessed = JsonUtil.getBool(SNAPSHOT_FULLY_PROCESSED, node); + + return new StreamingOffset(snapshotId, index, shouldScanAllFiles, snapshotFullyProcessed); + } catch (IOException e) { + throw new IllegalStateException(String.format("Failed to parse StreamingOffset from JSON string %s", json), e); + } + } + + @Override + public String json() { + StringWriter writer = new StringWriter(); + try { + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + generator.writeStartObject(); + generator.writeNumberField(VERSION, CURR_VERSION); + generator.writeNumberField(SNAPSHOT_ID, snapshotId); + generator.writeNumberField(INDEX, index); + generator.writeBooleanField(SCAN_ALL_FILES, scanAllFiles); + generator.writeBooleanField(SNAPSHOT_FULLY_PROCESSED, snapshotFullyProcessed); + generator.writeEndObject(); + generator.flush(); + + } catch (IOException e) { + throw new UncheckedIOException("Failed to write StreamingOffset to json", e); + } + + return writer.toString(); + } + + long snapshotId() { + return snapshotId; + } + + int index() { + return index; + } + + boolean shouldScanAllFiles() { + return scanAllFiles; + } + + boolean isSnapshotFullyProcessed() { + return snapshotFullyProcessed; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof StreamingOffset) { + StreamingOffset offset = (StreamingOffset) obj; + return offset.snapshotId == snapshotId && + offset.index == index && + offset.scanAllFiles == scanAllFiles && + offset.snapshotFullyProcessed == snapshotFullyProcessed; + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hashCode(snapshotId, index, scanAllFiles, snapshotFullyProcessed); + } + + @Override + public String toString() { + return String.format("Streaming Offset[%d: index (%d) scan_all_files (%b) snapshot_fully_processed (%b)]", + snapshotId, index, scanAllFiles, snapshotFullyProcessed); + } +} diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index 589bbad22435..8c3f2dfa8e96 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -42,16 +42,19 @@ import org.apache.spark.sql.sources.DataSourceRegister; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.MicroBatchReadSupport; import org.apache.spark.sql.sources.v2.ReadSupport; import org.apache.spark.sql.sources.v2.StreamWriteSupport; import org.apache.spark.sql.sources.v2.WriteSupport; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.types.StructType; -public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, StreamWriteSupport { +public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, + StreamWriteSupport, MicroBatchReadSupport { private SparkSession lazySpark = null; private JavaSparkContext lazySparkContext = null; @@ -129,6 +132,23 @@ public StreamWriter createStreamWriter(String runId, StructType dsStruct, return new StreamingWriter(table, io, encryptionManager, options, queryId, mode, appId, writeSchema, dsStruct); } + @Override + public MicroBatchReader createMicroBatchReader(Optional schema, String checkpointLocation, + DataSourceOptions options) { + if (schema.isPresent()) { + throw new IllegalStateException("Iceberg does not support specifying the schema at read time"); + } + + Configuration conf = new Configuration(lazyBaseConf()); + Table table = getTableAndResolveHadoopConfiguration(options, conf); + String caseSensitive = lazySparkSession().conf().get("spark.sql.caseSensitive"); + + Broadcast io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table)); + Broadcast encryptionManager = lazySparkContext().broadcast(table.encryption()); + + return new StreamingReader(table, io, encryptionManager, Boolean.parseBoolean(caseSensitive), options); + } + protected Table findTable(DataSourceOptions options, Configuration conf) { Optional path = options.get("path"); Preconditions.checkArgument(path.isPresent(), "Cannot open table: path is not set"); diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 826c4508381b..816c9ccfc3ba 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -191,6 +191,26 @@ private StructType lazyType() { return type; } + protected Long splitSize() { + return splitSize; + } + + protected Integer splitLookback() { + return splitLookback; + } + + protected Long splitOpenFileCost() { + return splitOpenFileCost; + } + + protected boolean caseSensitive() { + return caseSensitive; + } + + protected List filterExpressions() { + return filterExpressions; + } + @Override public StructType readSchema() { return lazyType(); @@ -311,28 +331,7 @@ public Statistics estimateStatistics() { @Override public boolean enableBatchRead() { if (readUsingBatch == null) { - boolean allParquetFileScanTasks = - tasks().stream() - .allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files() - .stream() - .allMatch(fileScanTask -> fileScanTask.file().format().equals( - FileFormat.PARQUET))); - - boolean allOrcFileScanTasks = - tasks().stream() - .allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files() - .stream() - .allMatch(fileScanTask -> fileScanTask.file().format().equals( - FileFormat.ORC))); - - boolean atLeastOneColumn = lazySchema().columns().size() > 0; - - boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType()); - - boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); - - this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || - (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + this.readUsingBatch = checkEnableBatchRead(tasks()); } return readUsingBatch; } @@ -344,7 +343,7 @@ private static void mergeIcebergHadoopConfs( .forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key))); } - private List tasks() { + protected List tasks() { if (tasks == null) { TableScan scan = table .newScan() @@ -395,6 +394,33 @@ private List tasks() { return tasks; } + // An extracted method which will be overrided by StreamingReader. This is because the tasks generated by Streaming is + // per batch and cannot be planned like Reader beforehand. + protected boolean checkEnableBatchRead(List taskList) { + boolean allParquetFileScanTasks = + taskList.stream() + .allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files() + .stream() + .allMatch(fileScanTask -> fileScanTask.file().format().equals( + FileFormat.PARQUET))); + + boolean allOrcFileScanTasks = + taskList.stream() + .allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files() + .stream() + .allMatch(fileScanTask -> fileScanTask.file().format().equals( + FileFormat.ORC))); + + boolean atLeastOneColumn = lazySchema().columns().size() > 0; + + boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType()); + + boolean hasNoDeleteFiles = taskList.stream().noneMatch(TableScanUtil::hasDeletes); + + return batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || + (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + } + @Override public String toString() { return String.format( diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java new file mode 100644 index 000000000000..ee44d785bb33 --- /dev/null +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; +import org.apache.spark.sql.sources.v2.reader.streaming.Offset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +/** + * A micro-batch based Spark Structured Streaming reader for Iceberg table. It will track the added + * files and generate tasks per batch to process newly added files. By default it will process + * all the newly added files to the current snapshot in each batch, user could also set this + * configuration "max-files-per-trigger" to control the number of files processed per batch. + */ +class StreamingReader extends Reader implements MicroBatchReader { + private static final Logger LOG = LoggerFactory.getLogger(StreamingReader.class); + private static final String MAX_SIZE_PER_BATCH = "max-size-per-batch"; + private static final String START_SNAPSHOT_ID = "start-snapshot-id"; + + private StreamingOffset startOffset; + private StreamingOffset endOffset; + + private final Table table; + private final long maxSizePerBatch; + private final Long startSnapshotId; + private final long splitSize; + private final int splitLookback; + private final long splitOpenFileCost; + private Boolean readUsingBatch = null; + + // Used to cache the pending batches for this streaming batch interval. + private Pair> cachedPendingBatches = null; + + StreamingReader(Table table, Broadcast io, Broadcast encryptionManager, + boolean caseSensitive, DataSourceOptions options) { + super(table, io, encryptionManager, caseSensitive, options); + + this.table = table; + this.maxSizePerBatch = options.get(MAX_SIZE_PER_BATCH).map(Long::parseLong).orElse(Long.MAX_VALUE); + Preconditions.checkArgument(maxSizePerBatch > 0L, + "Option max-size-per-batch '%s' should > 0", maxSizePerBatch); + + this.startSnapshotId = options.get(START_SNAPSHOT_ID).map(Long::parseLong).orElse(null); + if (startSnapshotId != null) { + if (!SnapshotUtil.ancestorOf(table, table.currentSnapshot().snapshotId(), startSnapshotId)) { + throw new IllegalArgumentException("The option start-snapshot-id " + startSnapshotId + + " is not an ancestor of the current snapshot"); + } + } + + this.splitSize = Optional.ofNullable(splitSize()) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT)); + this.splitLookback = Optional.ofNullable(splitLookback()) + .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT)); + this.splitOpenFileCost = Optional.ofNullable(splitOpenFileCost()) + .orElseGet(() -> + PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT)); + } + + @Override + @SuppressWarnings("unchecked") + public void setOffsetRange(Optional start, Optional end) { + table.refresh(); + + if (start.isPresent() && !StreamingOffset.START_OFFSET.equals(start.get())) { + this.startOffset = (StreamingOffset) start.get(); + this.endOffset = (StreamingOffset) end.orElseGet(() -> calculateEndOffset(startOffset)); + } else { + // If starting offset is "START_OFFSET" (there's no snapshot in the last batch), or starting + // offset is not set, then we need to calculate the starting offset again. + this.startOffset = calculateStartingOffset(); + this.endOffset = calculateEndOffset(startOffset); + } + } + + @Override + public Offset getStartOffset() { + if (startOffset == null) { + throw new IllegalStateException("Start offset is not set"); + } + + return startOffset; + } + + @Override + public Offset getEndOffset() { + if (endOffset == null) { + throw new IllegalStateException("End offset is not set"); + } + + return endOffset; + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + // Since all the data and metadata of Iceberg is as it is, nothing needs to commit when + // offset is processed, so no need to implement this method. + } + + @Override + public void stop() { + } + + @Override + public boolean enableBatchRead() { + return readUsingBatch == null ? false : readUsingBatch; + } + + @Override + public String toString() { + return String.format( + "IcebergStreamScan(table=%s, type=%s)", table, table.schema().asStruct()); + } + + @Override + @SuppressWarnings("unchecked") + protected List tasks() { + if (startOffset.equals(endOffset)) { + LOG.info("Start offset {} equals to end offset {}, no data to process", startOffset, endOffset); + return Collections.emptyList(); + } + + Preconditions.checkState(cachedPendingBatches != null, + "pendingBatches is null, which is unexpected as it will be set when calculating end offset"); + + List pendingBatches = cachedPendingBatches.second(); + if (pendingBatches.isEmpty()) { + LOG.info("Current start offset {} and end offset {}, there's no task to process in this batch", + startOffset, endOffset); + return Collections.emptyList(); + } + + MicroBatch lastBatch = pendingBatches.get(pendingBatches.size() - 1); + Preconditions.checkState( + lastBatch.snapshotId() == endOffset.snapshotId() && lastBatch.endFileIndex() == endOffset.index(), + "The cached pendingBatches doesn't match the current end offset " + endOffset); + + LOG.info("Processing data from {} to {}", startOffset, endOffset); + List tasks = pendingBatches.stream() + .flatMap(batch -> batch.tasks().stream()) + .collect(Collectors.toList()); + CloseableIterable splitTasks = TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks), + splitSize); + List combinedScanTasks = Lists.newArrayList( + TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); + + if (readUsingBatch == null) { + this.readUsingBatch = checkEnableBatchRead(combinedScanTasks); + } + + return combinedScanTasks; + } + + private StreamingOffset calculateStartingOffset() { + StreamingOffset startingOffset; + if (startSnapshotId != null) { + startingOffset = new StreamingOffset(startSnapshotId, 0, true, false); + } else { + List snapshotIds = SnapshotUtil.currentAncestors(table); + if (snapshotIds.isEmpty()) { + // there's no snapshot currently. + startingOffset = StreamingOffset.START_OFFSET; + } else { + startingOffset = new StreamingOffset(Iterables.getLast(snapshotIds), 0, true, false); + } + } + + return startingOffset; + } + + private StreamingOffset calculateEndOffset(StreamingOffset start) { + if (start.equals(StreamingOffset.START_OFFSET)) { + return StreamingOffset.START_OFFSET; + } + + // Spark will invoke setOffsetRange more than once. If this is already calculated, use the cached one to avoid + // calculating again. + if (cachedPendingBatches == null || !cachedPendingBatches.first().equals(start)) { + this.cachedPendingBatches = Pair.of(start, getChangesWithRateLimit(start, maxSizePerBatch)); + } + + List batches = cachedPendingBatches.second(); + MicroBatch lastBatch = Iterables.getLast(batches, null); + + if (lastBatch == null) { + return start; + } else { + boolean isStarting = lastBatch.snapshotId() == start.snapshotId() && start.shouldScanAllFiles(); + return new StreamingOffset(lastBatch.snapshotId(), lastBatch.endFileIndex(), isStarting, + lastBatch.lastIndexOfSnapshot()); + } + } + + @VisibleForTesting + @SuppressWarnings("checkstyle:HiddenField") + List getChangesWithRateLimit(StreamingOffset startOffset, long maxSize) { + List batches = Lists.newArrayList(); + long currentLeftSize = maxSize; + MicroBatch lastBatch = null; + + assertNoOverwrite(table.snapshot(startOffset.snapshotId())); + if (shouldGenerateFromStartOffset(startOffset)) { + MicroBatch batch = generateMicroBatch(startOffset.snapshotId(), startOffset.index(), + startOffset.shouldScanAllFiles(), currentLeftSize); + batches.add(batch); + currentLeftSize -= batch.sizeInBytes(); + lastBatch = Iterables.getLast(batches); + } + + // Current snapshot can already satisfy the size needs. + if (currentLeftSize <= 0L || (lastBatch != null && !lastBatch.lastIndexOfSnapshot())) { + return batches; + } + + long currentSnapshotId = table.currentSnapshot().snapshotId(); + if (currentSnapshotId == startOffset.snapshotId()) { + // the snapshot of current offset is already the latest snapshot of this table. + return batches; + } + + ImmutableList snapshotIds = ImmutableList.builder() + .addAll(SnapshotUtil.snapshotIdsBetween(table, startOffset.snapshotId(), currentSnapshotId)) + .build() + .reverse(); + + for (Long id : snapshotIds) { + Snapshot snapshot = table.snapshot(id); + assertNoOverwrite(snapshot); + if (!isAppend(snapshot)) { + continue; + } + + MicroBatch batch = generateMicroBatch(id, 0, false, currentLeftSize); + batches.add(batch); + currentLeftSize -= batch.sizeInBytes(); + lastBatch = Iterables.getLast(batches); + + // If the current request size is already satisfied, or none of the DataFile size can satisfy the current left + // size, break the current loop. + if (currentLeftSize <= 0L || !lastBatch.lastIndexOfSnapshot()) { + break; + } + } + + return batches; + } + + private MicroBatch generateMicroBatch(long snapshotId, int startIndex, boolean isStart, long currentLeftSize) { + return MicroBatches.from(table.snapshot(snapshotId), table.io()) + .caseSensitive(caseSensitive()) + .specsById(table.specs()) + .generate(startIndex, currentLeftSize, isStart); + } + + @SuppressWarnings("checkstyle:HiddenField") + private boolean shouldGenerateFromStartOffset(StreamingOffset startOffset) { + return !startOffset.isSnapshotFullyProcessed() && + (startOffset.shouldScanAllFiles() || isAppend(table.snapshot(startOffset.snapshotId()))); + } + + private static void assertNoOverwrite(Snapshot snapshot) { + if (snapshot.operation().equals(DataOperations.OVERWRITE)) { + throw new UnsupportedOperationException(String.format("Found %s operation, cannot support incremental data for " + + "snapshot %d", DataOperations.OVERWRITE, snapshot.snapshotId())); + } + } + + private static boolean isAppend(Snapshot snapshot) { + return snapshot.operation().equals(DataOperations.APPEND); + } +} diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java new file mode 100644 index 000000000000..0a656be7638b --- /dev/null +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.streaming.DataStreamWriter; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public class TestStructuredStreamingRead { + private static final Configuration CONF = new Configuration(); + private static final Schema SCHEMA = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()) + ); + private static SparkSession spark = null; + private static Path parent = null; + private static File tableLocation = null; + private static Table table = null; + private static List> expected = null; + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + @BeforeClass + public static void startSpark() throws Exception { + TestStructuredStreamingRead.spark = SparkSession.builder() + .master("local[2]") + .config("spark.sql.shuffle.partitions", 4) + .getOrCreate(); + + parent = Files.createTempDirectory("test"); + tableLocation = new File(parent.toFile(), "table"); + tableLocation.mkdir(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); + table = tables.create(SCHEMA, spec, tableLocation.toString()); + + expected = Lists.newArrayList( + Lists.newArrayList(new SimpleRecord(1, "1")), + Lists.newArrayList(new SimpleRecord(2, "2")), + Lists.newArrayList(new SimpleRecord(3, "3")), + Lists.newArrayList(new SimpleRecord(4, "4")) + ); + + // Write records one by one to generate 4 snapshots. + for (List l : expected) { + Dataset df = spark.createDataFrame(l, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(tableLocation.toString()); + } + table.refresh(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestStructuredStreamingRead.spark; + TestStructuredStreamingRead.spark = null; + currentSpark.stop(); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesFromStart() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + IcebergSource source = new IcebergSource(); + + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + long initialSnapshotId = snapshotIds.get(0); + + // Getting all appends from initial snapshot. + List pendingBatches = streamingReader.getChangesWithRateLimit( + new StreamingOffset(initialSnapshotId, 0, true, false), Long.MAX_VALUE); + Assert.assertEquals("Batches with unlimited size control should have 4 snapshots", 4, pendingBatches.size()); + + List batchSnapshotIds = pendingBatches.stream() + .map(MicroBatch::snapshotId) + .collect(Collectors.toList()); + Assert.assertEquals("Snapshot id of each batch should match snapshot id of table", snapshotIds, batchSnapshotIds); + + // Getting appends from initial snapshot with last index, 1st snapshot should be an empty batch. + List pendingBatches1 = streamingReader.getChangesWithRateLimit( + new StreamingOffset(initialSnapshotId, 1, true, false), Long.MAX_VALUE); + + Assert.assertEquals("Batches with unlimited size control from initial id should have 4 snapshots", + 4, pendingBatches1.size()); + MicroBatch batch = pendingBatches1.get(0); + // 1st batch should be empty, since the starting offset is the end of this snapshot. + Assert.assertEquals("1st batch should be empty, have 0 size batch", 0L, batch.sizeInBytes()); + Assert.assertEquals("1st batch should be empty, endFileIndex should be equal to start", 1, batch.endFileIndex()); + Assert.assertTrue("1st batch should be empty", Iterables.isEmpty(batch.tasks())); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesFrom2ndSnapshot() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + IcebergSource source = new IcebergSource(); + + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + long initialSnapshotId = snapshotIds.get(0); + + // Getting appends from 2nd snapshot, 1st snapshot should be filtered out. + List pendingBatches = streamingReader.getChangesWithRateLimit( + new StreamingOffset(snapshotIds.get(1), 0, false, false), Long.MAX_VALUE); + + Assert.assertEquals(3, pendingBatches.size()); + List batchSnapshotIds = pendingBatches.stream() + .map(MicroBatch::snapshotId) + .collect(Collectors.toList()); + Assert.assertFalse("1st snapshot should be filtered", batchSnapshotIds.contains(initialSnapshotId)); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesFromLastSnapshot() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + IcebergSource source = new IcebergSource(); + + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + // Getting appends from last snapshot with last index, should get an empty batch. + long lastSnapshotId = snapshotIds.get(3); + List pendingBatches = streamingReader.getChangesWithRateLimit( + new StreamingOffset(lastSnapshotId, 1, false, false), Long.MAX_VALUE); + + Assert.assertEquals("Should only have 1 batch with last snapshot", 1, pendingBatches.size()); + MicroBatch batch = pendingBatches.get(0); + Assert.assertEquals("batch should have 0 size", 0L, batch.sizeInBytes()); + Assert.assertEquals("batch endFileIndex should be euqal to start", 1, batch.endFileIndex()); + Assert.assertTrue("batch should be empty", Iterables.isEmpty(batch.tasks())); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesWithRateLimit1000() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + long initialSnapshotId = snapshotIds.get(0); + + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + // The size of each data file is around 600 bytes. + // Max size set to 1000. One additional batch will be added because the left size is less than file size, + // MicroBatchBuilder will add one more to avoid stuck. + List rateLimitedBatches = streamingReader.getChangesWithRateLimit( + new StreamingOffset(initialSnapshotId, 0, true, false), 1000); + + Assert.assertEquals("Should have 2 batches", 2L, rateLimitedBatches.size()); + MicroBatch batch = rateLimitedBatches.get(0); + Assert.assertEquals("1st batch's endFileIndex should reach to the end of file indexes", 1, batch.endFileIndex()); + Assert.assertTrue("1st batch should be the last index of 1st snapshot", batch.lastIndexOfSnapshot()); + Assert.assertEquals("1st batch should only have 1 task", 1, batch.tasks().size()); + Assert.assertTrue("1st batch's size should be around 600", batch.sizeInBytes() < 1000 && batch.sizeInBytes() > 0); + + MicroBatch batch1 = rateLimitedBatches.get(1); + Assert.assertEquals("2nd batch's endFileIndex should reach to the end of file indexes", 1, batch1.endFileIndex()); + Assert.assertTrue("2nd batch should be the last of 2nd snapshot", batch1.lastIndexOfSnapshot()); + Assert.assertEquals("2nd batch should only have 1 task", 1, batch1.tasks().size()); + Assert.assertTrue("2nd batch's size should be aound 600", batch1.sizeInBytes() < 1000 && batch1.sizeInBytes() > 0); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesWithRateLimit100() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + // Max size less than file size, should have one batch added to avoid stuck. + List rateLimitedBatches = streamingReader.getChangesWithRateLimit( + new StreamingOffset(snapshotIds.get(1), 1, false, true), 100); + + Assert.assertEquals("Should only have 1 batch", 1, rateLimitedBatches.size()); + MicroBatch batch = rateLimitedBatches.get(0); + Assert.assertEquals("Batch's endFileIndex should reach to the end of file indexes", 1, batch.endFileIndex()); + Assert.assertTrue("Batch should be the last of 1st snapshot", batch.lastIndexOfSnapshot()); + Assert.assertEquals("Batch should have 1 task", 1, batch.tasks().size()); + Assert.assertTrue("Batch's size should be around 600", batch.sizeInBytes() < 1000 && batch.sizeInBytes() > 0); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesWithRateLimit10000() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + // Max size set to 10000, the last left batch will be added. + List rateLimitedBatches = streamingReader.getChangesWithRateLimit( + new StreamingOffset(snapshotIds.get(2), 1, false, true), 10000); + + Assert.assertEquals("Should only have 1 batch", 1, rateLimitedBatches.size()); + MicroBatch batch = rateLimitedBatches.get(0); + Assert.assertEquals("Batch's endFileIndex should reach to the end of file indexes", 1, batch.endFileIndex()); + Assert.assertEquals("Batch should have 1 task", 1, batch.tasks().size()); + Assert.assertTrue("Batch should have 1 task", batch.lastIndexOfSnapshot()); + Assert.assertTrue("Batch's size should be around 600", batch.sizeInBytes() < 1000 && batch.sizeInBytes() > 0); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetOffsetWithDefaultRateLimit() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + // Default max size per batch, this will consume all the data of this table. + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString())); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + streamingReader.setOffsetRange(Optional.empty(), Optional.empty()); + + StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset(); + Assert.assertEquals("Start offset's snapshot id should be 1st snapshot id", + snapshotIds.get(0).longValue(), start.snapshotId()); + Assert.assertEquals("Start offset's index should be the start index of 1st snapshot", 0, start.index()); + Assert.assertTrue("Start offset's snapshot should do a full table scan", start.shouldScanAllFiles()); + Assert.assertFalse("Start offset should not fully process the snapshot", start.isSnapshotFullyProcessed()); + + StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset(); + Assert.assertEquals("End offset's snapshot should be the last snapshot id", + snapshotIds.get(3).longValue(), end.snapshotId()); + Assert.assertEquals("End offset's index should be the last index", 1, end.index()); + Assert.assertFalse("End offset's snapshot should not do a full table scan", end.shouldScanAllFiles()); + Assert.assertTrue("End offset should fully process the snapshot", end.isSnapshotFullyProcessed()); + + streamingReader.setOffsetRange(Optional.of(end), Optional.empty()); + StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset(); + Assert.assertEquals("End offset should be same to start offset since there's no more batches to consume", + end1, end); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetOffsetWithRateLimit1000() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + // Max size to 1000, this will generate two MicroBatches per consuming. + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString(), + "max-size-per-batch", "1000")); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + streamingReader.setOffsetRange(Optional.empty(), Optional.empty()); + StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(0), 0, true, false), start); + + StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(1), 1, false, true), end); + + streamingReader.setOffsetRange(Optional.of(end), Optional.empty()); + StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(3), 1, false, true), end1); + + streamingReader.setOffsetRange(Optional.of(end1), Optional.empty()); + StreamingOffset end2 = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(end2, end1); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetOffsetWithRateLimit100() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + // Max size to 100, will generate 1 MicroBatch per consuming. + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString(), + "max-size-per-batch", "100")); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + streamingReader.setOffsetRange(Optional.empty(), Optional.empty()); + StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(0), 0, true, false), start); + + StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(0), 1, true, true), end); + + streamingReader.setOffsetRange(Optional.of(end), Optional.empty()); + StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(1), 1, false, true), end1); + } + + @SuppressWarnings("unchecked") + @Test + public void testSpecifyInvalidSnapshotId() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + IcebergSource source = new IcebergSource(); + + // test invalid snapshot id + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString(), + "start-snapshot-id", "-1")); + AssertHelpers.assertThrows("Test invalid snapshot id", + IllegalArgumentException.class, "The option start-snapshot-id -1 is not an ancestor", + () -> source.createMicroBatchReader(Optional.empty(), checkpoint.toString(), options)); + } + + @SuppressWarnings("unchecked") + @Test + public void testSpecifySnapshotId() throws IOException { + File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile(); + + IcebergSource source = new IcebergSource(); + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + // test specify snapshot-id + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", tableLocation.toString(), + "checkpointLocation", checkpoint.toString(), + "start-snapshot-id", snapshotIds.get(1).toString(), + "max-size-per-batch", "1000")); + StreamingReader streamingReader = (StreamingReader) source.createMicroBatchReader( + Optional.empty(), checkpoint.toString(), options); + + streamingReader.setOffsetRange(Optional.empty(), Optional.empty()); + StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(1), 0, true, false), start); + + StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(1), 1, true, false), end); + + streamingReader.setOffsetRange(Optional.of(end), Optional.empty()); + StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(2), 1, false, true), end1); + + streamingReader.setOffsetRange(Optional.of(end1), Optional.empty()); + StreamingOffset end2 = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(new StreamingOffset(snapshotIds.get(3), 1, false, true), end2); + + streamingReader.setOffsetRange(Optional.of(end2), Optional.empty()); + StreamingOffset end3 = (StreamingOffset) streamingReader.getEndOffset(); + validateOffset(end2, end3); + } + + @Test + public void testStreamingRead() { + Dataset read = spark.readStream() + .format("iceberg") + .load(tableLocation.toString()); + DataStreamWriter streamWriter = read.writeStream() + .format("memory") + .outputMode("append") + .queryName("memoryStream"); + + try { + StreamingQuery query = streamWriter.start(); + query.processAllAvailable(); + + List actual = spark.table("memoryStream") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + List expectedResult = expected.stream().flatMap(List::stream).collect(Collectors.toList()); + validateResult(expectedResult, actual); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @Test + public void testStreamingReadWithSpecifiedSnapshotId() { + List snapshotIds = SnapshotUtil.currentAncestors(table); + Collections.reverse(snapshotIds); + + Dataset read = spark.readStream() + .format("iceberg") + .option("start-snapshot-id", snapshotIds.get(1).toString()) + .load(tableLocation.toString()); + + try { + StreamingQuery query = read.writeStream() + .format("memory") + .outputMode("append") + .queryName("memoryStream1") + .start(); + + query.processAllAvailable(); + + List actual = spark.table("memoryStream1") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + + List expectedResult = expected.stream().flatMap(List::stream).collect(Collectors.toList()); + validateResult(expectedResult, actual); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + private static void validateOffset(StreamingOffset expectedResult, StreamingOffset actualResult) { + Assert.assertEquals(String.format("Actual StreamingOffset %s doesn't equal to the expected StreamingOffset %s", + actualResult, expectedResult), expectedResult, actualResult); + } + + private static void validateResult(List expectedResult, List actualResult) { + expectedResult.sort(Comparator.comparingInt(SimpleRecord::getId)); + actualResult.sort(Comparator.comparingInt(SimpleRecord::getId)); + + Assert.assertEquals("Streaming result should be matched", expectedResult, actualResult); + } +}