From 22756e9070f00a83a016a97060a4bb351ec547ab Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 25 Jun 2021 18:21:14 -0700 Subject: [PATCH 1/4] idea checkpoint --- .../org/apache/iceberg/util/SnapshotUtil.java | 7 ++++++- .../spark/source/SparkMicroBatchStream.java | 21 ++++++++++++------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index beafcc29c90c..2b22dc7eda69 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -125,7 +125,8 @@ public static List newFiles(Long baseSnapshotId, long latestSnapshotId /** * Traverses the history of the table's current snapshot and finds the snapshot with the given snapshot id as its * parent. - * @return the snapshot for which the given snapshot is the parent + * @return null if the given snapshot is the table's current snapshot + * or else returns the snapshot for which the given snapshot is the parent * @throws IllegalArgumentException when the given snapshotId is not found in the table * @throws IllegalStateException when the given snapshotId is not an ancestor of the current table state */ @@ -133,6 +134,10 @@ public static Snapshot snapshotAfter(Table table, long snapshotId) { Preconditions.checkArgument(table.snapshot(snapshotId) != null, "Cannot find parent snapshot: %s", snapshotId); Snapshot current = table.currentSnapshot(); + if (current.snapshotId() == snapshotId) { + return null; + } + while (current != null) { if (current.parentId() == snapshotId) { return current; diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 1a7f217af7f9..be1a1d692eb5 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -79,6 +79,8 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final Long splitOpenFileCost; private final boolean localityPreferred; private final StreamingOffset initialOffset; + private final boolean ignoreDelete; + private final boolean ignoreRepace; SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean caseSensitive, Schema expectedSchema, CaseInsensitiveStringMap options, String checkpointLocation) { @@ -169,24 +171,29 @@ public void stop() { private List planFiles(StreamingOffset startOffset, StreamingOffset endOffset) { List fileScanTasks = Lists.newArrayList(); - MicroBatch latestMicroBatch = null; StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ? new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, false) : startOffset; + StreamingOffset currentOffset = null; + do { - StreamingOffset currentOffset = - latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ? - new StreamingOffset(snapshotAfter(latestMicroBatch.snapshotId()), 0L, false) : - batchStartOffset; + if (currentOffset == null) { + currentOffset = batchStartOffset; + } else { + Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId()); + } + currentOffset = currentOffset == null ? + batchStartOffset : + new StreamingOffset(snapshotAfter(currentOffset.snapshotId()), 0L, false); - latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io()) + MicroBatch latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io()) .caseSensitive(caseSensitive) .specsById(table.specs()) .generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles()); fileScanTasks.addAll(latestMicroBatch.tasks()); - } while (latestMicroBatch.snapshotId() != endOffset.snapshotId()); + } while (currentOffset.snapshotId() != endOffset.snapshotId()); return fileScanTasks; } From 4a5d3bd6d67288adc10920aeacaa034b051f2d7a Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 25 Jun 2021 19:16:10 -0700 Subject: [PATCH 2/4] implement skipDelete and skipReplace options --- .../iceberg/spark/SparkReadOptions.java | 6 +++ .../org/apache/iceberg/spark/Spark3Util.java | 13 +++++++ .../spark/source/SparkMicroBatchStream.java | 38 ++++++++++++------- .../source/TestStructuredStreamingRead3.java | 27 +++++++++++++ 4 files changed, 70 insertions(+), 14 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index bce0bf4e8bb5..f21d4fd0a344 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -50,4 +50,10 @@ private SparkReadOptions() { // Set ID that is used to fetch file scan tasks public static final String FILE_SCAN_TASK_SET_ID = "file-scan-task-set-id"; + + // skip snapshots of type delete while reading stream out of iceberg table + public static final String READ_STREAM_SKIP_DELETE = "read-stream-skip-delete"; + + // skip snapshots of type replace while reading stream out of iceberg table + public static final String READ_STREAM_SKIP_REPLACE = "read-stream-skip-replace"; } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java index 9a24b3e5ffb6..7b52ba118890 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java @@ -562,6 +562,19 @@ public static Integer propertyAsInt(CaseInsensitiveStringMap options, String pro return null; } + public static Boolean propertyAsBoolean(CaseInsensitiveStringMap options, String property, Boolean defaultValue) { + if (defaultValue != null) { + return options.getBoolean(property, defaultValue); + } + + String value = options.get(property); + if (value != null) { + return Boolean.parseBoolean(value); + } + + return null; + } + public static class DescribeSchemaVisitor extends TypeUtil.SchemaVisitor { private static final Joiner COMMA = Joiner.on(','); private static final DescribeSchemaVisitor INSTANCE = new DescribeSchemaVisitor(); diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index be1a1d692eb5..90e7dfd896c0 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -27,6 +27,7 @@ import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Set; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataOperations; import org.apache.iceberg.FileScanTask; @@ -45,6 +46,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; @@ -79,8 +81,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final Long splitOpenFileCost; private final boolean localityPreferred; private final StreamingOffset initialOffset; - private final boolean ignoreDelete; - private final boolean ignoreRepace; + private final Set skippableDataOperations; SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean caseSensitive, Schema expectedSchema, CaseInsensitiveStringMap options, String checkpointLocation) { @@ -102,6 +103,15 @@ public class SparkMicroBatchStream implements MicroBatchStream { InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation); this.initialOffset = initialOffsetStore.initialOffset(); + + this.skippableDataOperations = Sets.newHashSet(); + if (Spark3Util.propertyAsBoolean(options, SparkReadOptions.READ_STREAM_SKIP_DELETE, false)) { + this.skippableDataOperations.add(DataOperations.DELETE); + } + + if (Spark3Util.propertyAsBoolean(options, SparkReadOptions.READ_STREAM_SKIP_REPLACE, false)) { + this.skippableDataOperations.add(DataOperations.REPLACE); + } } @Override @@ -182,10 +192,19 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse currentOffset = batchStartOffset; } else { Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId()); + boolean shouldSkip = skippableDataOperations.contains(snapshotAfter.operation()); + + // TODO: fix error message + Preconditions.checkState( + snapshotAfter.operation().equals(DataOperations.APPEND) || shouldSkip, + "Invalid Snapshot operation: %s, only APPEND is allowed.", snapshotAfter.operation()); + + currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false); + + if (shouldSkip) { + continue; + } } - currentOffset = currentOffset == null ? - batchStartOffset : - new StreamingOffset(snapshotAfter(currentOffset.snapshotId()), 0L, false); MicroBatch latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io()) .caseSensitive(caseSensitive) @@ -198,15 +217,6 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse return fileScanTasks; } - private long snapshotAfter(long snapshotId) { - Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, snapshotId); - - Preconditions.checkState(snapshotAfter.operation().equals(DataOperations.APPEND), - "Invalid Snapshot operation: %s, only APPEND is allowed.", snapshotAfter.operation()); - - return snapshotAfter.snapshotId(); - } - private static class InitialOffsetStore { private final Table table; private final FileIO io; diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 07f3df4ea4aa..bb2169bdb132 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -42,6 +42,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -321,6 +322,32 @@ public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws Exception { ); } + @SuppressWarnings("unchecked") + @Test + public void testReadStreamWithSnapshotTypeReplaceAndSkipReplaceOption() throws Exception { + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier); + + table.refresh(); + + // this should create a snapshot with type Replace. + table.rewriteManifests() + .clusterBy(f -> 1) + .commit(); + + // check pre-condition + Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation()); + + Dataset df = spark.readStream() + .format("iceberg") + .option(SparkReadOptions.READ_STREAM_SKIP_REPLACE, "true") + .load(tableIdentifier); + + Assertions.assertThat(processAvailable(df)) + .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); + } + @SuppressWarnings("unchecked") @Test public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception { From 6b8d0ef48f50803214b41547a324601be30c8c3c Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 25 Jun 2021 19:20:33 -0700 Subject: [PATCH 3/4] revert changes in SnapshotUtil --- .../src/main/java/org/apache/iceberg/util/SnapshotUtil.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 2b22dc7eda69..5bcb9a9081d2 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -125,8 +125,7 @@ public static List newFiles(Long baseSnapshotId, long latestSnapshotId /** * Traverses the history of the table's current snapshot and finds the snapshot with the given snapshot id as its * parent. - * @return null if the given snapshot is the table's current snapshot - * or else returns the snapshot for which the given snapshot is the parent + * @return the snapshot for which the given snapshot is the parent * @throws IllegalArgumentException when the given snapshotId is not found in the table * @throws IllegalStateException when the given snapshotId is not an ancestor of the current table state */ @@ -134,9 +133,6 @@ public static Snapshot snapshotAfter(Table table, long snapshotId) { Preconditions.checkArgument(table.snapshot(snapshotId) != null, "Cannot find parent snapshot: %s", snapshotId); Snapshot current = table.currentSnapshot(); - if (current.snapshotId() == snapshotId) { - return null; - } while (current != null) { if (current.parentId() == snapshotId) { From 80fc752ddba227e14c8d2c55c3e7592f0e0e9c0d Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 25 Jun 2021 19:21:36 -0700 Subject: [PATCH 4/4] revert changes in SnapshotUtil --- core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 5bcb9a9081d2..beafcc29c90c 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -133,7 +133,6 @@ public static Snapshot snapshotAfter(Table table, long snapshotId) { Preconditions.checkArgument(table.snapshot(snapshotId) != null, "Cannot find parent snapshot: %s", snapshotId); Snapshot current = table.currentSnapshot(); - while (current != null) { if (current.parentId() == snapshotId) { return current;