diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 85f08e057340..beafcc29c90c 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -25,6 +25,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -63,6 +64,19 @@ public static List currentAncestors(Table table) { return ancestorIds(table.currentSnapshot(), table::snapshot); } + /** + * Traverses the history of the table's current snapshot and finds the oldest Snapshot. + * @return null if there is no current snapshot in the table, else the oldest Snapshot. + */ + public static Snapshot oldestSnapshot(Table table) { + Snapshot current = table.currentSnapshot(); + while (current != null && current.parentId() != null) { + current = table.snapshot(current.parentId()); + } + + return current; + } + /** * Returns list of snapshot ids in the range - (fromSnapshotId, toSnapshotId] *

@@ -107,4 +121,27 @@ public static List newFiles(Long baseSnapshotId, long latestSnapshotId return newFiles; } + + /** + * Traverses the history of the table's current snapshot and finds the snapshot with the given snapshot id as its + * parent. + * @return the snapshot for which the given snapshot is the parent + * @throws IllegalArgumentException when the given snapshotId is not found in the table + * @throws IllegalStateException when the given snapshotId is not an ancestor of the current table state + */ + public static Snapshot snapshotAfter(Table table, long snapshotId) { + Preconditions.checkArgument(table.snapshot(snapshotId) != null, "Cannot find parent snapshot: %s", snapshotId); + + Snapshot current = table.currentSnapshot(); + while (current != null) { + if (current.parentId() == snapshotId) { + return current; + } + + current = table.snapshot(current.parentId()); + } + + throw new IllegalStateException( + String.format("Cannot find snapshot after %s: not an ancestor of table's current snapshot", snapshotId)); + } } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 1999b568b9e0..8cf755b82eb9 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -52,6 +52,7 @@ import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.Statistics; import org.apache.spark.sql.connector.read.SupportsReportStatistics; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -108,6 +109,12 @@ public Batch toBatch() { return this; } + @Override + public MicroBatchStream toMicroBatchStream(String checkpointLocation) { + return new SparkMicroBatchStream( + sparkContext, table, caseSensitive, expectedSchema, options, checkpointLocation); + } + @Override public StructType readSchema() { if (readSchema == null) { @@ -213,10 +220,10 @@ public String description() { return String.format("%s [filters=%s]", table, filters); } - private static class ReaderFactory implements PartitionReaderFactory { + static class ReaderFactory implements PartitionReaderFactory { private final int batchSize; - private ReaderFactory(int batchSize) { + ReaderFactory(int batchSize) { this.batchSize = batchSize; } @@ -256,7 +263,7 @@ private static class BatchReader extends BatchDataReader implements PartitionRea } } - private static class ReadTask implements InputPartition, Serializable { + static class ReadTask implements InputPartition, Serializable { private final CombinedScanTask task; private final Broadcast tableBroadcast; private final String expectedSchemaString; diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java new file mode 100644 index 000000000000..1a7f217af7f9 --- /dev/null +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Joiner SLASH = Joiner.on("/"); + + private final Table table; + private final boolean caseSensitive; + private final String expectedSchema; + private final Broadcast
tableBroadcast; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean caseSensitive, + Schema expectedSchema, CaseInsensitiveStringMap options, String checkpointLocation) { + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = SchemaParser.toJson(expectedSchema); + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + this.tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); + + long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT); + this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize); + + int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT); + this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback); + + long tableSplitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT); + this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost); + + InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation); + this.initialOffset = initialOffsetStore.initialOffset(); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + return new StreamingOffset(latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles()), false); + } + + @Override + public InputPartition[] planInputPartitions(Offset start, Offset end) { + Preconditions.checkArgument(end instanceof StreamingOffset, "Invalid end offset: %s is not a StreamingOffset", end); + Preconditions.checkArgument( + start instanceof StreamingOffset, "Invalid start offset: %s is not a StreamingOffset", start); + + if (end.equals(StreamingOffset.START_OFFSET)) { + return new InputPartition[0]; + } + + StreamingOffset endOffset = (StreamingOffset) end; + StreamingOffset startOffset = (StreamingOffset) start; + + List fileScanTasks = planFiles(startOffset, endOffset); + + CloseableIterable splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(fileScanTasks), + splitSize); + List combinedScanTasks = Lists.newArrayList( + TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); + InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()]; + + for (int i = 0; i < combinedScanTasks.size(); i++) { + readTasks[i] = new ReadTask( + combinedScanTasks.get(i), tableBroadcast, expectedSchema, + caseSensitive, localityPreferred); + } + + return readTasks; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + return new ReaderFactory(0); + } + + @Override + public Offset initialOffset() { + return initialOffset; + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + } + + @Override + public void stop() { + } + + private List planFiles(StreamingOffset startOffset, StreamingOffset endOffset) { + List fileScanTasks = Lists.newArrayList(); + MicroBatch latestMicroBatch = null; + StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ? + new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, false) : + startOffset; + + do { + StreamingOffset currentOffset = + latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ? + new StreamingOffset(snapshotAfter(latestMicroBatch.snapshotId()), 0L, false) : + batchStartOffset; + + latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io()) + .caseSensitive(caseSensitive) + .specsById(table.specs()) + .generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles()); + + fileScanTasks.addAll(latestMicroBatch.tasks()); + } while (latestMicroBatch.snapshotId() != endOffset.snapshotId()); + + return fileScanTasks; + } + + private long snapshotAfter(long snapshotId) { + Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, snapshotId); + + Preconditions.checkState(snapshotAfter.operation().equals(DataOperations.APPEND), + "Invalid Snapshot operation: %s, only APPEND is allowed.", snapshotAfter.operation()); + + return snapshotAfter.snapshotId(); + } + + private static class InitialOffsetStore { + private final Table table; + private final FileIO io; + private final String initialOffsetLocation; + + InitialOffsetStore(Table table, String checkpointLocation) { + this.table = table; + this.io = table.io(); + this.initialOffsetLocation = SLASH.join(checkpointLocation, "offsets/0"); + } + + public StreamingOffset initialOffset() { + InputFile inputFile = io.newInputFile(initialOffsetLocation); + if (inputFile.exists()) { + return readOffset(inputFile); + } + + table.refresh(); + StreamingOffset offset = table.currentSnapshot() == null ? + StreamingOffset.START_OFFSET : + new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, false); + + OutputFile outputFile = io.newOutputFile(initialOffsetLocation); + writeOffset(offset, outputFile); + + return offset; + } + + private void writeOffset(StreamingOffset offset, OutputFile file) { + try (OutputStream outputStream = file.create()) { + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)); + writer.write(offset.json()); + writer.flush(); + } catch (IOException ioException) { + throw new UncheckedIOException( + String.format("Failed writing offset to: %s", initialOffsetLocation), ioException); + } + } + + private StreamingOffset readOffset(InputFile file) { + try (InputStream in = file.newStream()) { + return StreamingOffset.fromJson(in); + } catch (IOException ioException) { + throw new UncheckedIOException( + String.format("Failed reading offset from: %s", initialOffsetLocation), ioException); + } + } + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 52fbf7a1bf2d..c535a3534954 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -70,6 +70,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, private static final Set CAPABILITIES = ImmutableSet.of( TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, + TableCapability.MICRO_BATCH_READ, TableCapability.STREAMING_WRITE, TableCapability.OVERWRITE_BY_FILTER, TableCapability.OVERWRITE_DYNAMIC); diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java b/spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java index f43578ada310..64277ecf3be5 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; +import java.io.InputStream; import java.io.StringWriter; import java.io.UncheckedIOException; import org.apache.iceberg.relocated.com.google.common.base.Objects; @@ -62,20 +63,23 @@ static StreamingOffset fromJson(String json) { try { JsonNode node = JsonUtil.mapper().readValue(json, JsonNode.class); - // The version of StreamingOffset. The offset was created with a version number - // used to validate when deserializing from json string. - int version = JsonUtil.getInt(VERSION, node); - Preconditions.checkArgument(version == CURR_VERSION, - "Cannot parse offset JSON: offset version %s is not supported", version); + return fromJsonNode(node); + } catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to parse StreamingOffset from JSON string %s", json), e); + } + } - long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); - int position = JsonUtil.getInt(POSITION, node); - boolean shouldScanAllFiles = JsonUtil.getBool(SCAN_ALL_FILES, node); + static StreamingOffset fromJson(InputStream inputStream) { + Preconditions.checkNotNull(inputStream, "Cannot parse StreamingOffset from inputStream: null"); - return new StreamingOffset(snapshotId, position, shouldScanAllFiles); + JsonNode node; + try { + node = JsonUtil.mapper().readValue(inputStream, JsonNode.class); } catch (IOException e) { - throw new IllegalArgumentException(String.format("Failed to parse StreamingOffset from JSON string %s", json), e); + throw new UncheckedIOException("Failed to read StreamingOffset from json", e); } + + return fromJsonNode(node); } @Override @@ -132,4 +136,19 @@ public String toString() { return String.format("Streaming Offset[%d: position (%d) scan_all_files (%b)]", snapshotId, position, scanAllFiles); } + + private static StreamingOffset fromJsonNode(JsonNode node) { + // The version of StreamingOffset. The offset was created with a version number + // used to validate when deserializing from json string. + int version = JsonUtil.getInt(VERSION, node); + Preconditions.checkArgument(version == CURR_VERSION, + "This version of Iceberg source only supports version %s. Version %s is not supported.", + CURR_VERSION, version); + + long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); + int position = JsonUtil.getInt(POSITION, node); + boolean shouldScanAllFiles = JsonUtil.getBool(SCAN_ALL_FILES, node); + + return new StreamingOffset(snapshotId, position, shouldScanAllFiles); + } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java new file mode 100644 index 000000000000..07f3df4ea4aa --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.streaming.DataStreamWriter; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.streaming.StreamingQueryException; +import org.apache.spark.sql.streaming.Trigger; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.expressions.Expressions.ref; +import static org.apache.iceberg.types.Types.NestedField.optional; + +@RunWith(Parameterized.class) +public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase { + public TestStructuredStreamingRead3( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + private Table table; + private String tableIdentifier; + + private static final Configuration CONF = new Configuration(); + private static final Schema SCHEMA = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()) + ); + + /** + * test data to be used by multiple writes + * each write creates a snapshot and writes a list of records + */ + private static final List> TEST_DATA_MULTIPLE_SNAPSHOTS = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")), + Lists.newArrayList( + new SimpleRecord(6, "six"), + new SimpleRecord(7, "seven"))); + + /** + * test data - to be used for multiple write batches + * each batch inturn will have multiple snapshots + */ + private static final List>> TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS = Lists.newArrayList( + Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five"))), + Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(6, "six"), + new SimpleRecord(7, "seven")), + Lists.newArrayList( + new SimpleRecord(8, "eight"), + new SimpleRecord(9, "nine"))), + Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(10, "ten"), + new SimpleRecord(11, "eleven"), + new SimpleRecord(12, "twelve")), + Lists.newArrayList( + new SimpleRecord(13, "thirteen"), + new SimpleRecord(14, "fourteen")), + Lists.newArrayList( + new SimpleRecord(15, "fifteen"), + new SimpleRecord(16, "sixteen")))); + + @Before + public void setupTable() { + sql("CREATE TABLE %s " + + "(id INT, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(3, id))", tableName); + this.table = validationCatalog.loadTable(tableIdent); + this.tableIdentifier = tableName; + } + + @After + public void stopStreams() throws TimeoutException { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception { + List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(expected, tableIdentifier); + + table.refresh(); + + Dataset df = spark.readStream() + .format("iceberg") + .load(tableIdentifier); + List actual = processAvailable(df); + + Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); + } + + @SuppressWarnings("unchecked") + @Test + public void testResumingStreamReadFromCheckpoint() throws Exception { + File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder"); + File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint"); + final String tempView = "microBatchView"; + + Dataset df = spark.readStream() + .format("iceberg") + .load(tableIdentifier); + + // Trigger.Once with the combination of StreamingQuery.awaitTermination, which succeeds after this code + // will result in stopping the stream. + // This is how Stream STOP and RESUME is simulated in this Test Case. + DataStreamWriter singleBatchWriter = df.writeStream() + .trigger(Trigger.Once()) + .option("checkpointLocation", writerCheckpoint.toString()) + .foreachBatch((batchDF, batchId) -> { + batchDF.createOrReplaceGlobalTempView(tempView); + }); + + String globalTempView = "global_temp." + tempView; + + List processStreamOnEmptyIcebergTable = processMicroBatch(singleBatchWriter, globalTempView); + Assert.assertEquals(Collections.emptyList(), processStreamOnEmptyIcebergTable); + + for (List> expectedCheckpoint : TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) { + appendDataAsMultipleSnapshots(expectedCheckpoint, tableIdentifier); + table.refresh(); + + List actualDataInCurrentMicroBatch = processMicroBatch(singleBatchWriter, globalTempView); + Assertions.assertThat(actualDataInCurrentMicroBatch) + .containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedCheckpoint)); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testParquetOrcAvroDataInOneTable() throws Exception { + List parquetFileRecords = Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")); + + List orcFileRecords = Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")); + + List avroFileRecords = Lists.newArrayList( + new SimpleRecord(6, "six"), + new SimpleRecord(7, "seven")); + + appendData(parquetFileRecords, tableIdentifier, "parquet"); + appendData(orcFileRecords, tableIdentifier, "orc"); + appendData(avroFileRecords, tableIdentifier, "avro"); + + table.refresh(); + + Dataset ds = spark.readStream() + .format("iceberg") + .load(tableIdentifier); + Assertions.assertThat(processAvailable(ds)) + .containsExactlyInAnyOrderElementsOf(Iterables.concat(parquetFileRecords, orcFileRecords, avroFileRecords)); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadStreamFromEmptyTable() throws Exception { + table.refresh(); + + Dataset df = spark.readStream() + .format("iceberg") + .load(tableIdentifier); + + List actual = processAvailable(df); + Assert.assertEquals(Collections.emptyList(), actual); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception { + // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE. + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + // fill table with some initial data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier); + + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = Lists.newArrayList( + dataDelete.copy("data", "one") // id = 1 + ); + + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); + + table.newRowDelta() + .addDeletes(eqDeletes) + .commit(); + + // check pre-condition - that the above Delete file write - actually resulted in snapshot of type OVERWRITE + Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation()); + + Dataset df = spark.readStream() + .format("iceberg") + .load(tableIdentifier); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("testtablewithoverwrites") + .outputMode(OutputMode.Append()) + .start(); + + AssertHelpers.assertThrowsCause( + "Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND", + IllegalStateException.class, + "Invalid Snapshot operation", + () -> streamingQuery.processAllAvailable() + ); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws Exception { + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier); + + table.refresh(); + + // this should create a snapshot with type Replace. + table.rewriteManifests() + .clusterBy(f -> 1) + .commit(); + + // check pre-condition + Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation()); + + Dataset df = spark.readStream() + .format("iceberg") + .load(tableIdentifier); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("testtablewithreplace") + .outputMode(OutputMode.Append()) + .start(); + + AssertHelpers.assertThrowsCause( + "Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND", + IllegalStateException.class, + "Invalid Snapshot operation", + () -> streamingQuery.processAllAvailable() + ); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception { + table.updateSpec() + .removeField("id_bucket") + .addField(ref("id")) + .commit(); + + table.refresh(); + + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier); + + table.refresh(); + + // this should create a snapshot with type delete. + table.newDelete() + .deleteFromRowFilter(Expressions.equal("id", 4)) + .commit(); + + // check pre-condition - that the above delete operation on table resulted in Snapshot of Type DELETE. + table.refresh(); + Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation()); + + Dataset df = spark.readStream() + .format("iceberg") + .load(tableIdentifier); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("testtablewithdelete") + .outputMode(OutputMode.Append()) + .start(); + + AssertHelpers.assertThrowsCause( + "Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND", + IllegalStateException.class, + "Invalid Snapshot operation", + () -> streamingQuery.processAllAvailable() + ); + } + + private static List processMicroBatch(DataStreamWriter singleBatchWriter, String viewName) + throws TimeoutException, StreamingQueryException { + StreamingQuery streamingQuery = singleBatchWriter.start(); + streamingQuery.awaitTermination(); + + return spark.sql(String.format("select * from %s", viewName)) + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + } + + /** + * appends each list as a Snapshot on the iceberg table at the given location. + * accepts a list of lists - each list representing data per snapshot. + */ + private static void appendDataAsMultipleSnapshots(List> data, String tableIdentifier) { + for (List l : data) { + appendData(l, tableIdentifier, "parquet"); + } + } + + private static void appendData(List data, String tableIdentifier, String fileFormat) { + Dataset df = spark.createDataFrame(data, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .option("write-format", fileFormat) + .mode("append") + .save(tableIdentifier); + } + + private static List processAvailable(Dataset df) throws TimeoutException { + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("test12") + .outputMode(OutputMode.Append()) + .start(); + streamingQuery.processAllAvailable(); + return spark.sql("select * from test12") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + } +}