Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,10 @@ private SparkReadOptions() {

// Set ID that is used to fetch file scan tasks
public static final String FILE_SCAN_TASK_SET_ID = "file-scan-task-set-id";

// skip snapshots of type delete while reading stream out of iceberg table
public static final String READ_STREAM_SKIP_DELETE = "read-stream-skip-delete";

// skip snapshots of type replace while reading stream out of iceberg table
public static final String READ_STREAM_SKIP_REPLACE = "read-stream-skip-replace";
}
13 changes: 13 additions & 0 deletions spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,19 @@ public static Integer propertyAsInt(CaseInsensitiveStringMap options, String pro
return null;
}

public static Boolean propertyAsBoolean(CaseInsensitiveStringMap options, String property, Boolean defaultValue) {
if (defaultValue != null) {
return options.getBoolean(property, defaultValue);
}

String value = options.get(property);
if (value != null) {
return Boolean.parseBoolean(value);
}

return null;
}

public static class DescribeSchemaVisitor extends TypeUtil.SchemaVisitor<String> {
private static final Joiner COMMA = Joiner.on(',');
private static final DescribeSchemaVisitor INSTANCE = new DescribeSchemaVisitor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
Expand All @@ -45,6 +46,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
Expand Down Expand Up @@ -79,6 +81,7 @@ public class SparkMicroBatchStream implements MicroBatchStream {
private final Long splitOpenFileCost;
private final boolean localityPreferred;
private final StreamingOffset initialOffset;
private final Set<String> skippableDataOperations;

SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean caseSensitive,
Schema expectedSchema, CaseInsensitiveStringMap options, String checkpointLocation) {
Expand All @@ -100,6 +103,15 @@ public class SparkMicroBatchStream implements MicroBatchStream {

InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation);
this.initialOffset = initialOffsetStore.initialOffset();

this.skippableDataOperations = Sets.newHashSet();
if (Spark3Util.propertyAsBoolean(options, SparkReadOptions.READ_STREAM_SKIP_DELETE, false)) {
this.skippableDataOperations.add(DataOperations.DELETE);
}

if (Spark3Util.propertyAsBoolean(options, SparkReadOptions.READ_STREAM_SKIP_REPLACE, false)) {
this.skippableDataOperations.add(DataOperations.REPLACE);
}
}

@Override
Expand Down Expand Up @@ -169,37 +181,42 @@ public void stop() {

private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset endOffset) {
List<FileScanTask> fileScanTasks = Lists.newArrayList();
MicroBatch latestMicroBatch = null;
StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ?
new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, false) :
startOffset;

StreamingOffset currentOffset = null;

do {
StreamingOffset currentOffset =
latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ?
new StreamingOffset(snapshotAfter(latestMicroBatch.snapshotId()), 0L, false) :
batchStartOffset;
if (currentOffset == null) {
currentOffset = batchStartOffset;
} else {
Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId());
boolean shouldSkip = skippableDataOperations.contains(snapshotAfter.operation());

// TODO: fix error message
Preconditions.checkState(
snapshotAfter.operation().equals(DataOperations.APPEND) || shouldSkip,
"Invalid Snapshot operation: %s, only APPEND is allowed.", snapshotAfter.operation());

currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);

latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
if (shouldSkip) {
continue;
}
}

MicroBatch latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles());

fileScanTasks.addAll(latestMicroBatch.tasks());
} while (latestMicroBatch.snapshotId() != endOffset.snapshotId());
} while (currentOffset.snapshotId() != endOffset.snapshotId());

return fileScanTasks;
}

private long snapshotAfter(long snapshotId) {
Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, snapshotId);

Preconditions.checkState(snapshotAfter.operation().equals(DataOperations.APPEND),
"Invalid Snapshot operation: %s, only APPEND is allowed.", snapshotAfter.operation());

return snapshotAfter.snapshotId();
}

private static class InitialOffsetStore {
private final Table table;
private final FileIO io;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
Expand Down Expand Up @@ -321,6 +322,32 @@ public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws Exception {
);
}

@SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeReplaceAndSkipReplaceOption() throws Exception {
// fill table with some data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);

table.refresh();

// this should create a snapshot with type Replace.
table.rewriteManifests()
.clusterBy(f -> 1)
.commit();

// check pre-condition
Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation());

Dataset<Row> df = spark.readStream()
.format("iceberg")
.option(SparkReadOptions.READ_STREAM_SKIP_REPLACE, "true")
.load(tableIdentifier);

Assertions.assertThat(processAvailable(df))
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}

@SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception {
Expand Down