diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 083aee90a383..55bf27d61955 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -102,43 +102,37 @@ public static Iterable ancestorsOf(long snapshotId, Function=} targetTimestamp. - *

- * Given the snapshots (with timestamp): [S1 (10), S2 (11), S3 (12), S4 (14)] - *

- * firstSnapshotAfterTimestamp(table, x {@literal <=} 10) = S1 - * firstSnapshotAfterTimestamp(table, 11) = S2 - * firstSnapshotAfterTimestamp(table, 13) = S4 - * firstSnapshotAfterTimestamp(table, 14) = S4 - * firstSnapshotAfterTimestamp(table, x {@literal >} 14) = null - *

- * where x is the target timestamp in milliseconds and Si is the snapshot + * Traverses the history of the table's current snapshot and finds the first snapshot committed after the given time. * * @param table a table - * @param targetTimestampMillis a timestamp in milliseconds - * @return the first snapshot which satisfies {@literal >=} targetTimestamp, or null if the current snapshot is - * more recent than the target timestamp + * @param timestampMillis a timestamp in milliseconds + * @return the first snapshot after the given timestamp, or null if the current snapshot is older than the timestamp + * @throws IllegalStateException if the first ancestor after the given time can't be determined */ - public static Snapshot firstSnapshotAfterTimestamp(Table table, Long targetTimestampMillis) { - Snapshot currentSnapshot = table.currentSnapshot(); - // Return null if no snapshot exists or target timestamp is more recent than the current snapshot - if (currentSnapshot == null || currentSnapshot.timestampMillis() < targetTimestampMillis) { + public static Snapshot oldestAncestorAfter(Table table, long timestampMillis) { + if (table.currentSnapshot() == null) { + // there are no snapshots or ancestors return null; } - // Return the oldest snapshot which satisfies >= targetTimestamp Snapshot lastSnapshot = null; for (Snapshot snapshot : currentAncestors(table)) { - if (snapshot.timestampMillis() < targetTimestampMillis) { + if (snapshot.timestampMillis() < timestampMillis) { return lastSnapshot; + } else if (snapshot.timestampMillis() == timestampMillis) { + return snapshot; } + lastSnapshot = snapshot; } - // Return the oldest snapshot if the target timestamp is less than the oldest snapshot of the table - return lastSnapshot; + if (lastSnapshot != null && lastSnapshot.parentId() == null) { + // this is the first snapshot in the table, return it + return lastSnapshot; + } + + throw new IllegalStateException( + "Cannot find snapshot older than " + DateTimeUtil.formatTimestampMillis(timestampMillis)); } /** diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 156912af5a05..b88e924be7d0 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -99,12 +99,12 @@ public class SparkMicroBatchStream implements MicroBatchStream { @Override public Offset latestOffset() { table.refresh(); - if (isStreamEmpty(table)) { + if (table.currentSnapshot() == null) { return StreamingOffset.START_OFFSET; } - if (isFutureStartTime(table, fromTimestamp)) { - return initialFutureStartOffset(table); + if (table.currentSnapshot().timestampMillis() < fromTimestamp) { + return StreamingOffset.START_OFFSET; } Snapshot latestSnapshot = table.currentSnapshot(); @@ -169,8 +169,7 @@ public void stop() { private List planFiles(StreamingOffset startOffset, StreamingOffset endOffset) { List fileScanTasks = Lists.newArrayList(); StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ? - new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false) : - startOffset; + determineStartingOffset(table, fromTimestamp) : startOffset; StreamingOffset currentOffset = null; @@ -208,26 +207,31 @@ private boolean shouldProcess(Snapshot snapshot) { return op.equals(DataOperations.APPEND); } - private static boolean isStreamEmpty(Table table) { - return table.currentSnapshot() == null; - } - - private static boolean isStreamNotEmpty(Table table) { - return table.currentSnapshot() != null; - } + private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) { + if (table.currentSnapshot() == null) { + return StreamingOffset.START_OFFSET; + } - private static boolean isFutureStartTime(Table table, Long streamStartTimeStampMillis) { - if (streamStartTimeStampMillis == null) { - return false; + if (fromTimestamp == null) { + // match existing behavior and start from the oldest snapshot + return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false); } - return table.currentSnapshot().timestampMillis() < streamStartTimeStampMillis; - } + if (table.currentSnapshot().timestampMillis() < fromTimestamp) { + return StreamingOffset.START_OFFSET; + } - private static StreamingOffset initialFutureStartOffset(Table table) { - Preconditions.checkNotNull(table, "Cannot process future start offset with invalid table input."); - Snapshot latestSnapshot = table.currentSnapshot(); - return new StreamingOffset(latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles()) + 1, false); + try { + Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, fromTimestamp); + if (snapshot != null) { + return new StreamingOffset(snapshot.snapshotId(), 0, false); + } else { + return StreamingOffset.START_OFFSET; + } + } catch (IllegalStateException e) { + // could not determine the first snapshot after the timestamp. use the oldest ancestor instead + return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false); + } } private static class InitialOffsetStore { @@ -250,11 +254,7 @@ public StreamingOffset initialOffset() { } table.refresh(); - StreamingOffset offset = StreamingOffset.START_OFFSET; - if (isStreamNotEmpty(table)) { - offset = isFutureStartTime(table, fromTimestamp) ? initialFutureStartOffset(table) : - new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false); - } + StreamingOffset offset = determineStartingOffset(table, fromTimestamp); OutputFile outputFile = io.newOutputFile(initialOffsetLocation); writeOffset(offset, outputFile); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 156912af5a05..b88e924be7d0 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -99,12 +99,12 @@ public class SparkMicroBatchStream implements MicroBatchStream { @Override public Offset latestOffset() { table.refresh(); - if (isStreamEmpty(table)) { + if (table.currentSnapshot() == null) { return StreamingOffset.START_OFFSET; } - if (isFutureStartTime(table, fromTimestamp)) { - return initialFutureStartOffset(table); + if (table.currentSnapshot().timestampMillis() < fromTimestamp) { + return StreamingOffset.START_OFFSET; } Snapshot latestSnapshot = table.currentSnapshot(); @@ -169,8 +169,7 @@ public void stop() { private List planFiles(StreamingOffset startOffset, StreamingOffset endOffset) { List fileScanTasks = Lists.newArrayList(); StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ? - new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false) : - startOffset; + determineStartingOffset(table, fromTimestamp) : startOffset; StreamingOffset currentOffset = null; @@ -208,26 +207,31 @@ private boolean shouldProcess(Snapshot snapshot) { return op.equals(DataOperations.APPEND); } - private static boolean isStreamEmpty(Table table) { - return table.currentSnapshot() == null; - } - - private static boolean isStreamNotEmpty(Table table) { - return table.currentSnapshot() != null; - } + private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) { + if (table.currentSnapshot() == null) { + return StreamingOffset.START_OFFSET; + } - private static boolean isFutureStartTime(Table table, Long streamStartTimeStampMillis) { - if (streamStartTimeStampMillis == null) { - return false; + if (fromTimestamp == null) { + // match existing behavior and start from the oldest snapshot + return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false); } - return table.currentSnapshot().timestampMillis() < streamStartTimeStampMillis; - } + if (table.currentSnapshot().timestampMillis() < fromTimestamp) { + return StreamingOffset.START_OFFSET; + } - private static StreamingOffset initialFutureStartOffset(Table table) { - Preconditions.checkNotNull(table, "Cannot process future start offset with invalid table input."); - Snapshot latestSnapshot = table.currentSnapshot(); - return new StreamingOffset(latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles()) + 1, false); + try { + Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, fromTimestamp); + if (snapshot != null) { + return new StreamingOffset(snapshot.snapshotId(), 0, false); + } else { + return StreamingOffset.START_OFFSET; + } + } catch (IllegalStateException e) { + // could not determine the first snapshot after the timestamp. use the oldest ancestor instead + return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false); + } } private static class InitialOffsetStore { @@ -250,11 +254,7 @@ public StreamingOffset initialOffset() { } table.refresh(); - StreamingOffset offset = StreamingOffset.START_OFFSET; - if (isStreamNotEmpty(table)) { - offset = isFutureStartTime(table, fromTimestamp) ? initialFutureStartOffset(table) : - new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false); - } + StreamingOffset offset = determineStartingOffset(table, fromTimestamp); OutputFile outputFile = io.newOutputFile(initialOffsetLocation); writeOffset(offset, outputFile); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 156912af5a05..b88e924be7d0 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -99,12 +99,12 @@ public class SparkMicroBatchStream implements MicroBatchStream { @Override public Offset latestOffset() { table.refresh(); - if (isStreamEmpty(table)) { + if (table.currentSnapshot() == null) { return StreamingOffset.START_OFFSET; } - if (isFutureStartTime(table, fromTimestamp)) { - return initialFutureStartOffset(table); + if (table.currentSnapshot().timestampMillis() < fromTimestamp) { + return StreamingOffset.START_OFFSET; } Snapshot latestSnapshot = table.currentSnapshot(); @@ -169,8 +169,7 @@ public void stop() { private List planFiles(StreamingOffset startOffset, StreamingOffset endOffset) { List fileScanTasks = Lists.newArrayList(); StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ? - new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false) : - startOffset; + determineStartingOffset(table, fromTimestamp) : startOffset; StreamingOffset currentOffset = null; @@ -208,26 +207,31 @@ private boolean shouldProcess(Snapshot snapshot) { return op.equals(DataOperations.APPEND); } - private static boolean isStreamEmpty(Table table) { - return table.currentSnapshot() == null; - } - - private static boolean isStreamNotEmpty(Table table) { - return table.currentSnapshot() != null; - } + private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) { + if (table.currentSnapshot() == null) { + return StreamingOffset.START_OFFSET; + } - private static boolean isFutureStartTime(Table table, Long streamStartTimeStampMillis) { - if (streamStartTimeStampMillis == null) { - return false; + if (fromTimestamp == null) { + // match existing behavior and start from the oldest snapshot + return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false); } - return table.currentSnapshot().timestampMillis() < streamStartTimeStampMillis; - } + if (table.currentSnapshot().timestampMillis() < fromTimestamp) { + return StreamingOffset.START_OFFSET; + } - private static StreamingOffset initialFutureStartOffset(Table table) { - Preconditions.checkNotNull(table, "Cannot process future start offset with invalid table input."); - Snapshot latestSnapshot = table.currentSnapshot(); - return new StreamingOffset(latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles()) + 1, false); + try { + Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, fromTimestamp); + if (snapshot != null) { + return new StreamingOffset(snapshot.snapshotId(), 0, false); + } else { + return StreamingOffset.START_OFFSET; + } + } catch (IllegalStateException e) { + // could not determine the first snapshot after the timestamp. use the oldest ancestor instead + return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false); + } } private static class InitialOffsetStore { @@ -250,11 +254,7 @@ public StreamingOffset initialOffset() { } table.refresh(); - StreamingOffset offset = StreamingOffset.START_OFFSET; - if (isStreamNotEmpty(table)) { - offset = isFutureStartTime(table, fromTimestamp) ? initialFutureStartOffset(table) : - new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false); - } + StreamingOffset offset = determineStartingOffset(table, fromTimestamp); OutputFile outputFile = io.newOutputFile(initialOffsetLocation); writeOffset(offset, outputFile); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index d5bb1a1902ee..145cf78fba86 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -24,14 +24,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; -import org.apache.hadoop.conf.Configuration; +import java.util.stream.IntStream; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataOperations; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; -import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; @@ -40,19 +39,17 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkCatalogTestBase; import org.apache.iceberg.spark.SparkReadOptions; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.streaming.DataStreamWriter; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.StreamingQuery; -import org.apache.spark.sql.streaming.StreamingQueryException; -import org.apache.spark.sql.streaming.Trigger; import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; @@ -62,7 +59,6 @@ import org.junit.runners.Parameterized; import static org.apache.iceberg.expressions.Expressions.ref; -import static org.apache.iceberg.types.Types.NestedField.optional; @RunWith(Parameterized.class) public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase { @@ -72,13 +68,6 @@ public TestStructuredStreamingRead3( } private Table table; - private String tableIdentifier; - - private static final Configuration CONF = new Configuration(); - private static final Schema SCHEMA = new Schema( - optional(1, "id", Types.IntegerType.get()), - optional(2, "data", Types.StringType.get()) - ); /** * test data to be used by multiple writes @@ -135,7 +124,6 @@ public void setupTable() { "USING iceberg " + "PARTITIONED BY (bucket(3, id))", tableName); this.table = validationCatalog.loadTable(tableIdent); - this.tableIdentifier = tableName; } @After @@ -150,19 +138,26 @@ public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); } - @SuppressWarnings("unchecked") @Test public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception { List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; - appendDataAsMultipleSnapshots(expected, tableIdentifier); + appendDataAsMultipleSnapshots(expected); - table.refresh(); + StreamingQuery query = startStream(); - Dataset df = spark.readStream() - .format("iceberg") - .load(tableIdentifier); - List actual = processAvailable(df); + List actual = rowsAvailable(query); + Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); + } + + @Test + public void testReadStreamOnIcebergThenAddData() throws Exception { + List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; + + StreamingQuery query = startStream(); + appendDataAsMultipleSnapshots(expected); + + List actual = rowsAvailable(query); Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } @@ -172,76 +167,93 @@ public void testReadingStreamFromTimestamp() throws Exception { new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")); - appendData(dataBeforeTimestamp, tableIdentifier, "parquet"); + + appendData(dataBeforeTimestamp); table.refresh(); long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1; - waitUntilAfter(streamStartTimestamp); - List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; - appendDataAsMultipleSnapshots(expected, tableIdentifier); + StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)); - table.refresh(); + List empty = rowsAvailable(query); + Assertions.assertThat(empty.isEmpty()).isTrue(); - Dataset df = spark.readStream() - .format("iceberg") - .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)) - .load(tableIdentifier); - List actual = processAvailable(df); + List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(expected); + + List actual = rowsAvailable(query); Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } @Test - public void testReadingStreamAfterLatestTimestamp() throws Exception { - List dataBeforeTimestamp = Lists.newArrayList( + public void testReadingStreamFromFutureTimetsamp() throws Exception { + long futureTimestamp = System.currentTimeMillis() + 10000; + + StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp)); + + List actual = rowsAvailable(query); + Assertions.assertThat(actual.isEmpty()).isTrue(); + + List data = Lists.newArrayList( new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")); - appendData(dataBeforeTimestamp, tableIdentifier, "parquet"); - table.refresh(); - long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1; - waitUntilAfter(streamStartTimestamp); + // Perform several inserts that should not show up because the fromTimestamp has not elapsed + IntStream.range(0, 3).forEach(x -> { + appendData(data); + Assertions.assertThat(rowsAvailable(query).isEmpty()).isTrue(); + }); - Dataset df = spark.readStream() - .format("iceberg") - .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)) - .load(tableIdentifier); - List actual = processAvailable(df); - Assertions.assertThat(actual.isEmpty()).isTrue(); + waitUntilAfter(futureTimestamp); + + // Data appended after the timestamp should appear + appendData(data); + actual = rowsAvailable(query); + Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(data); } @Test - public void testReadingStreamFromTimestampStartWithExistingTimestamp() throws Exception { + public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws Exception { List dataBeforeTimestamp = Lists.newArrayList( - new SimpleRecord(-2, "minustwo"), - new SimpleRecord(-1, "minusone"), - new SimpleRecord(0, "zero")); - appendData(dataBeforeTimestamp, tableIdentifier, "parquet"); + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")); + appendData(dataBeforeTimestamp); - table.refresh(); + long streamStartTimestamp = System.currentTimeMillis() + 2000; + + // Start the stream with a future timestamp after the current snapshot + StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)); + List actual = rowsAvailable(query); + Assert.assertEquals(Collections.emptyList(), actual); + + // Stream should contain data added after the timestamp elapses + waitUntilAfter(streamStartTimestamp); + List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(expected); + Assertions.assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); + } + + @Test + public void testReadingStreamFromTimestampOfExistingSnapshot() throws Exception { List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; - // Append the first expected data - appendData(expected.get(0), tableIdentifier, "parquet"); + // Create an existing snapshot with some data + appendData(expected.get(0)); table.refresh(); - long streamStartTimestamp = table.currentSnapshot().timestampMillis(); + long firstSnapshotTime = table.currentSnapshot().timestampMillis(); - // Start stream - Dataset df = spark.readStream() - .format("iceberg") - .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)) - .load(tableIdentifier); + // Start stream giving the first Snapshot's time as the start point + StreamingQuery stream = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(firstSnapshotTime)); // Append rest of expected data for (int i = 1; i < expected.size(); i++) { - appendData(expected.get(i), tableIdentifier, "parquet"); + appendData(expected.get(i)); } - table.refresh(); - List actual = processAvailable(df); - + List actual = rowsAvailable(stream); Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } @@ -256,97 +268,65 @@ public void testReadingStreamWithExpiredSnapshotFromTimestamp() throws TimeoutEx List thirdSnapshotRecordList = Lists.newArrayList( new SimpleRecord(3, "three")); - List expectedRecordList = Lists.newArrayList( - new SimpleRecord(2, "two"), - new SimpleRecord(3, "three")); + List expectedRecordList = Lists.newArrayList(); + expectedRecordList.addAll(secondSnapshotRecordList); + expectedRecordList.addAll(thirdSnapshotRecordList); - appendData(firstSnapshotRecordList, tableIdentifier, "parquet"); + appendData(firstSnapshotRecordList); table.refresh(); - Snapshot firstSnapshot = table.currentSnapshot(); - - Dataset df = spark.readStream() - .format("iceberg") - .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(firstSnapshot.timestampMillis())) - .load(tableIdentifier); + long firstSnapshotid = table.currentSnapshot().snapshotId(); + long firstSnapshotCommitTime = table.currentSnapshot().timestampMillis(); - appendData(secondSnapshotRecordList, tableIdentifier, "parquet"); - table.refresh(); - appendData(thirdSnapshotRecordList, tableIdentifier, "parquet"); - table.refresh(); + appendData(secondSnapshotRecordList); + appendData(thirdSnapshotRecordList); - table.expireSnapshots().expireSnapshotId(firstSnapshot.snapshotId()).commit(); - table.refresh(); + table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit(); - List actual = processAvailable(df); - Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedRecordList)); + StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, String.valueOf(firstSnapshotCommitTime)); + List actual = rowsAvailable(query); + Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecordList); } - @Test - public void testReadingStreamFromTimestampGreaterThanLatestSnapshotTime() throws Exception { - List dataBeforeTimestamp = Lists.newArrayList( - new SimpleRecord(1, "one"), - new SimpleRecord(2, "two"), - new SimpleRecord(3, "three")); - appendData(dataBeforeTimestamp, tableIdentifier, "parquet"); - - table.refresh(); - long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1; - waitUntilAfter(streamStartTimestamp); - - // Test stream with Timestamp > Latest Snapshot Time - Dataset df = spark.readStream() - .format("iceberg") - .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)) - .load(tableIdentifier); - List actual = processAvailable(df); - Assert.assertEquals(Collections.emptyList(), actual); - - // Test stream after new data is added - List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; - appendDataAsMultipleSnapshots(expected, tableIdentifier); - table.refresh(); - Assertions.assertThat(processAvailable(df, "newdata")) - .containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); - } - - @SuppressWarnings("unchecked") @Test public void testResumingStreamReadFromCheckpoint() throws Exception { File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder"); File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint"); - final String tempView = "microBatchView"; + File output = temp.newFolder(); - Dataset df = spark.readStream() + DataStreamWriter querySource = spark.readStream() .format("iceberg") - .load(tableIdentifier); - - // Trigger.Once with the combination of StreamingQuery.awaitTermination, which succeeds after this code - // will result in stopping the stream. - // This is how Stream STOP and RESUME is simulated in this Test Case. - DataStreamWriter singleBatchWriter = df.writeStream() - .trigger(Trigger.Once()) + .load(tableName) + .writeStream() .option("checkpointLocation", writerCheckpoint.toString()) - .foreachBatch((batchDF, batchId) -> { - batchDF.createOrReplaceGlobalTempView(tempView); - }); - - String globalTempView = "global_temp." + tempView; + .format("parquet") + .queryName("checkpoint_test") + .option("path", output.getPath()); - List processStreamOnEmptyIcebergTable = processMicroBatch(singleBatchWriter, globalTempView); - Assert.assertEquals(Collections.emptyList(), processStreamOnEmptyIcebergTable); + StreamingQuery startQuery = querySource.start(); + startQuery.processAllAvailable(); + startQuery.stop(); + List expected = Lists.newArrayList(); for (List> expectedCheckpoint : TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) { - appendDataAsMultipleSnapshots(expectedCheckpoint, tableIdentifier); - table.refresh(); - - List actualDataInCurrentMicroBatch = processMicroBatch(singleBatchWriter, globalTempView); - Assertions.assertThat(actualDataInCurrentMicroBatch) - .containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedCheckpoint)); + // New data was added while the stream was down + appendDataAsMultipleSnapshots(expectedCheckpoint); + expected.addAll(Lists.newArrayList(Iterables.concat(Iterables.concat(expectedCheckpoint)))); + + // Stream starts up again from checkpoint read the newly added data and shut down + StreamingQuery restartedQuery = querySource.start(); + restartedQuery.processAllAvailable(); + restartedQuery.stop(); + + // Read data added by the stream + List actual = spark.read() + .load(output.getPath()) + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } } - @SuppressWarnings("unchecked") @Test public void testParquetOrcAvroDataInOneTable() throws Exception { List parquetFileRecords = Lists.newArrayList( @@ -362,33 +342,24 @@ public void testParquetOrcAvroDataInOneTable() throws Exception { new SimpleRecord(6, "six"), new SimpleRecord(7, "seven")); - appendData(parquetFileRecords, tableIdentifier, "parquet"); - appendData(orcFileRecords, tableIdentifier, "orc"); - appendData(avroFileRecords, tableIdentifier, "avro"); + appendData(parquetFileRecords); + appendData(orcFileRecords, "orc"); + appendData(avroFileRecords, "avro"); table.refresh(); - Dataset ds = spark.readStream() - .format("iceberg") - .load(tableIdentifier); - Assertions.assertThat(processAvailable(ds)) + StreamingQuery query = startStream(); + Assertions.assertThat(rowsAvailable(query)) .containsExactlyInAnyOrderElementsOf(Iterables.concat(parquetFileRecords, orcFileRecords, avroFileRecords)); } - @SuppressWarnings("unchecked") @Test public void testReadStreamFromEmptyTable() throws Exception { - table.refresh(); - - Dataset df = spark.readStream() - .format("iceberg") - .load(tableIdentifier); - - List actual = processAvailable(df); + StreamingQuery stream = startStream(); + List actual = rowsAvailable(stream); Assert.assertEquals(Collections.emptyList(), actual); } - @SuppressWarnings("unchecked") @Test public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception { // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE. @@ -398,7 +369,7 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception // fill table with some initial data List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; - appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier); + appendDataAsMultipleSnapshots(dataAcrossSnapshots); Schema deleteRowSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(deleteRowSchema); @@ -416,29 +387,21 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception // check pre-condition - that the above Delete file write - actually resulted in snapshot of type OVERWRITE Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation()); - Dataset df = spark.readStream() - .format("iceberg") - .load(tableIdentifier); - StreamingQuery streamingQuery = df.writeStream() - .format("memory") - .queryName("testtablewithoverwrites") - .outputMode(OutputMode.Append()) - .start(); + StreamingQuery query = startStream(); AssertHelpers.assertThrowsCause( "Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND", IllegalStateException.class, "Cannot process overwrite snapshot", - () -> streamingQuery.processAllAvailable() + () -> query.processAllAvailable() ); } - @SuppressWarnings("unchecked") @Test public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception { // fill table with some data List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; - appendDataAsMultipleSnapshots(expected, tableIdentifier); + appendDataAsMultipleSnapshots(expected); table.refresh(); @@ -450,15 +413,11 @@ public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Excepti // check pre-condition Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation()); - Dataset df = spark.readStream() - .format("iceberg") - .load(tableIdentifier); - - List actual = processAvailable(df); + StreamingQuery query = startStream(); + List actual = rowsAvailable(query); Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } - @SuppressWarnings("unchecked") @Test public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception { table.updateSpec() @@ -470,7 +429,7 @@ public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception { // fill table with some data List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; - appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier); + appendDataAsMultipleSnapshots(dataAcrossSnapshots); table.refresh(); @@ -483,24 +442,16 @@ public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception { table.refresh(); Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation()); - Dataset df = spark.readStream() - .format("iceberg") - .load(tableIdentifier); - StreamingQuery streamingQuery = df.writeStream() - .format("memory") - .queryName("testtablewithdelete") - .outputMode(OutputMode.Append()) - .start(); + StreamingQuery query = startStream(); AssertHelpers.assertThrowsCause( "Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND", IllegalStateException.class, "Cannot process delete snapshot", - () -> streamingQuery.processAllAvailable() + () -> query.processAllAvailable() ); } - @SuppressWarnings("unchecked") @Test public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exception { table.updateSpec() @@ -512,7 +463,7 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exc // fill table with some data List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; - appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier); + appendDataAsMultipleSnapshots(dataAcrossSnapshots); table.refresh(); @@ -525,56 +476,62 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exc table.refresh(); Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation()); - Dataset df = spark.readStream() - .format("iceberg") - .option(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true") - .load(tableIdentifier); - Assertions.assertThat(processAvailable(df)) + StreamingQuery query = startStream(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true"); + Assertions.assertThat(rowsAvailable(query)) .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); } - private static List processMicroBatch(DataStreamWriter singleBatchWriter, String viewName) - throws TimeoutException, StreamingQueryException { - StreamingQuery streamingQuery = singleBatchWriter.start(); - streamingQuery.awaitTermination(); - - return spark.sql(String.format("select * from %s", viewName)) - .as(Encoders.bean(SimpleRecord.class)) - .collectAsList(); - } - /** * appends each list as a Snapshot on the iceberg table at the given location. * accepts a list of lists - each list representing data per snapshot. */ - private static void appendDataAsMultipleSnapshots(List> data, String tableIdentifier) { + private void appendDataAsMultipleSnapshots(List> data) { for (List l : data) { - appendData(l, tableIdentifier, "parquet"); + appendData(l); } } - private static void appendData(List data, String tableIdentifier, String fileFormat) { + private void appendData(List data) { + appendData(data, "parquet"); + } + + private void appendData(List data, String format) { Dataset df = spark.createDataFrame(data, SimpleRecord.class); df.select("id", "data").write() .format("iceberg") - .option("write-format", fileFormat) + .option("write-format", format) .mode("append") - .save(tableIdentifier); + .save(tableName); } - private static List processAvailable(Dataset df, String tableName) throws TimeoutException { - StreamingQuery streamingQuery = df.writeStream() + private static final String MEMORY_TABLE = "_stream_view_mem"; + + private StreamingQuery startStream(Map options) throws TimeoutException { + return spark.readStream() + .options(options) + .format("iceberg") + .load(tableName) + .writeStream() + .options(options) .format("memory") - .queryName(tableName) + .queryName(MEMORY_TABLE) .outputMode(OutputMode.Append()) .start(); - streamingQuery.processAllAvailable(); - return spark.sql("select * from " + tableName) + } + + private StreamingQuery startStream() throws TimeoutException { + return startStream(Collections.emptyMap()); + } + + private StreamingQuery startStream(String key, String value) throws TimeoutException { + return startStream(ImmutableMap.of(key, value)); + } + + private List rowsAvailable(StreamingQuery query) { + query.processAllAvailable(); + return spark.sql("select * from " + MEMORY_TABLE) .as(Encoders.bean(SimpleRecord.class)) .collectAsList(); } - private static List processAvailable(Dataset df) throws TimeoutException { - return processAvailable(df, "test12"); - } }