diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index b91270f166ec..0f24844414fe 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -255,6 +255,22 @@ public Long endTimestamp() { return confParser.longConf().option(SparkReadOptions.END_TIMESTAMP).parseOptional(); } + public int maxFilesPerMicroBatch() { + return confParser + .intConf() + .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH) + .defaultValue(Integer.MAX_VALUE) + .parse(); + } + + public int maxRecordsPerMicroBatch() { + return confParser + .intConf() + .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH) + .defaultValue(Integer.MAX_VALUE) + .parse(); + } + public boolean preserveDataGrouping() { return confParser .booleanConf() diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 9063e0f9aba6..80d60cf872f3 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -84,6 +84,13 @@ private SparkReadOptions() {} // Timestamp in milliseconds; start a stream from the snapshot that occurs after this timestamp public static final String STREAM_FROM_TIMESTAMP = "stream-from-timestamp"; + // maximum file per micro_batch + public static final String STREAMING_MAX_FILES_PER_MICRO_BATCH = + "streaming-max-files-per-micro-batch"; + // maximum rows per micro_batch + public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH = + "streaming-max-rows-per-micro-batch"; + // Table path public static final String PATH = "path"; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 728acbe6463e..4019fedcbbfa 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -30,6 +30,7 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataOperations; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.MicroBatches; import org.apache.iceberg.MicroBatches.MicroBatch; import org.apache.iceberg.Schema; @@ -38,6 +39,7 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -48,6 +50,7 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; @@ -59,10 +62,12 @@ import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.connector.read.streaming.ReadLimit; +import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SparkMicroBatchStream implements MicroBatchStream { +public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissionControl { private static final Joiner SLASH = Joiner.on("/"); private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of(); @@ -80,6 +85,8 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final boolean skipDelete; private final boolean skipOverwrite; private final long fromTimestamp; + private final int maxFilesPerMicroBatch; + private final int maxRecordsPerMicroBatch; SparkMicroBatchStream( JavaSparkContext sparkContext, @@ -97,6 +104,8 @@ public class SparkMicroBatchStream implements MicroBatchStream { this.splitLookback = readConf.splitLookback(); this.splitOpenFileCost = readConf.splitOpenFileCost(); this.fromTimestamp = readConf.streamFromTimestamp(); + this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch(); + this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch(); InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation, fromTimestamp); @@ -118,16 +127,8 @@ public Offset latestOffset() { } Snapshot latestSnapshot = table.currentSnapshot(); - long addedFilesCount = - PropertyUtil.propertyAsLong(latestSnapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1); - // if the latest snapshot summary doesn't contain SnapshotSummary.ADDED_FILES_PROP, - // iterate through addedDataFiles to compute addedFilesCount - addedFilesCount = - addedFilesCount == -1 - ? Iterables.size(latestSnapshot.addedDataFiles(table.io())) - : addedFilesCount; - - return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount, false); + + return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount(latestSnapshot), false); } @Override @@ -204,34 +205,47 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse StreamingOffset currentOffset = null; + // [(startOffset : startFileIndex), (endOffset : endFileIndex) ) do { + long endFileIndex; if (currentOffset == null) { currentOffset = batchStartOffset; } else { Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId()); - currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false); + // it may happen that we need to read this snapshot partially in case it's equal to + // endOffset. + if (currentOffset.snapshotId() != endOffset.snapshotId()) { + currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false); + } else { + currentOffset = endOffset; + } } Snapshot snapshot = table.snapshot(currentOffset.snapshotId()); - if (snapshot == null) { - throw new IllegalStateException( - String.format( - "Cannot load current offset at snapshot %d, the snapshot was expired or removed", - currentOffset.snapshotId())); - } + validateCurrentSnapshotExists(snapshot, currentOffset); if (!shouldProcess(snapshot)) { LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name()); continue; } + Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId()); + if (currentOffset.snapshotId() == endOffset.snapshotId()) { + endFileIndex = endOffset.position(); + } else { + endFileIndex = addedFilesCount(currentSnapshot); + } + MicroBatch latestMicroBatch = - MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io()) + MicroBatches.from(currentSnapshot, table.io()) .caseSensitive(caseSensitive) .specsById(table.specs()) .generate( - currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles()); + currentOffset.position(), + endFileIndex, + Long.MAX_VALUE, + currentOffset.shouldScanAllFiles()); fileScanTasks.addAll(latestMicroBatch.tasks()); } while (currentOffset.snapshotId() != endOffset.snapshotId()); @@ -295,6 +309,139 @@ private static StreamingOffset determineStartingOffset(Table table, Long fromTim } } + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public Offset latestOffset(Offset startOffset, ReadLimit limit) { + // calculate end offset get snapshotId from the startOffset + Preconditions.checkArgument( + startOffset instanceof StreamingOffset, + "Invalid start offset: %s is not a StreamingOffset", + startOffset); + + table.refresh(); + if (table.currentSnapshot() == null) { + return StreamingOffset.START_OFFSET; + } + + if (table.currentSnapshot().timestampMillis() < fromTimestamp) { + return StreamingOffset.START_OFFSET; + } + + // end offset can expand to multiple snapshots + StreamingOffset startingOffset = (StreamingOffset) startOffset; + + if (startOffset.equals(StreamingOffset.START_OFFSET)) { + startingOffset = determineStartingOffset(table, fromTimestamp); + } + + Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId()); + validateCurrentSnapshotExists(curSnapshot, startingOffset); + + int startPosOfSnapOffset = (int) startingOffset.position(); + + boolean scanAllFiles = startingOffset.shouldScanAllFiles(); + + boolean shouldContinueReading = true; + int curFilesAdded = 0; + int curRecordCount = 0; + int curPos = 0; + + // Note : we produce nextOffset with pos as non-inclusive + while (shouldContinueReading) { + // generate manifest index for the curSnapshot + List> indexedManifests = + MicroBatches.skippedManifestIndexesFromSnapshot( + table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles); + // this is under assumption we will be able to add at-least 1 file in the new offset + for (int idx = 0; idx < indexedManifests.size() && shouldContinueReading; idx++) { + // be rest assured curPos >= startFileIndex + curPos = indexedManifests.get(idx).second(); + try (CloseableIterable taskIterable = + MicroBatches.openManifestFile( + table.io(), + table.specs(), + caseSensitive, + curSnapshot, + indexedManifests.get(idx).first(), + scanAllFiles); + CloseableIterator taskIter = taskIterable.iterator()) { + while (taskIter.hasNext()) { + FileScanTask task = taskIter.next(); + if (curPos >= startPosOfSnapOffset) { + // TODO : use readLimit provided in function param, the readLimits are derived from + // these 2 properties. + if ((curFilesAdded + 1) > maxFilesPerMicroBatch + || (curRecordCount + task.file().recordCount()) > maxRecordsPerMicroBatch) { + shouldContinueReading = false; + break; + } + + curFilesAdded += 1; + curRecordCount += task.file().recordCount(); + } + ++curPos; + } + } catch (IOException ioe) { + LOG.warn("Failed to close task iterable", ioe); + } + } + // if the currentSnapShot was also the mostRecentSnapshot then break + if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) { + break; + } + + // if everything was OK and we consumed complete snapshot then move to next snapshot + if (shouldContinueReading) { + startPosOfSnapOffset = -1; + curSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId()); + // if anyhow we are moving to next snapshot we should only scan addedFiles + scanAllFiles = false; + } + } + + StreamingOffset latestStreamingOffset = + new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles); + + // if no new data arrived, then return null. + return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset; + } + + private long addedFilesCount(Snapshot snapshot) { + long addedFilesCount = + PropertyUtil.propertyAsLong(snapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1); + // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP, + // iterate through addedFiles iterator to find addedFilesCount. + return addedFilesCount == -1 + ? Iterables.size(snapshot.addedDataFiles(table.io())) + : addedFilesCount; + } + + private void validateCurrentSnapshotExists(Snapshot snapshot, StreamingOffset currentOffset) { + if (snapshot == null) { + throw new IllegalStateException( + String.format( + "Cannot load current offset at snapshot %d, the snapshot was expired or removed", + currentOffset.snapshotId())); + } + } + + @Override + public ReadLimit getDefaultReadLimit() { + if (maxFilesPerMicroBatch != Integer.MAX_VALUE + && maxRecordsPerMicroBatch != Integer.MAX_VALUE) { + ReadLimit[] readLimits = new ReadLimit[2]; + readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch); + readLimits[1] = ReadLimit.maxRows(maxFilesPerMicroBatch); + return ReadLimit.compositeLimit(readLimits); + } else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) { + return ReadLimit.maxFiles(maxFilesPerMicroBatch); + } else if (maxRecordsPerMicroBatch != Integer.MAX_VALUE) { + return ReadLimit.maxRows(maxRecordsPerMicroBatch); + } else { + return ReadLimit.allAvailable(); + } + } + private static class InitialOffsetStore { private final Table table; private final FileIO io; diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index dd456f22371e..a2d0c9acaf48 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.BaseTable; @@ -47,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkCatalogTestBase; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -70,6 +72,8 @@ public TestStructuredStreamingRead3( private Table table; + private final AtomicInteger microBatches = new AtomicInteger(); + /** * test data to be used by multiple writes each write creates a snapshot and writes a list of * records @@ -115,6 +119,7 @@ public void setupTable() { + "PARTITIONED BY (bucket(3, id))", tableName); this.table = validationCatalog.loadTable(tableIdent); + microBatches.set(0); } @After @@ -140,6 +145,57 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } + @Test + public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1() + throws Exception { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + + Assert.assertEquals( + 6, + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"))); + } + + @Test + public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2() + throws Exception { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + + Assert.assertEquals( + 3, + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2"))); + } + + @Test + public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1() + throws Exception { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + + // only 1 micro-batch will be formed and we will read data partially + Assert.assertEquals( + 1, + microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1"))); + + StreamingQuery query = startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1"); + + // check answer correctness only 1 record read the micro-batch will be stuck + List actual = rowsAvailable(query); + Assertions.assertThat(actual) + .containsExactlyInAnyOrderElementsOf( + Lists.newArrayList(TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0))); + } + + @Test + public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4() + throws Exception { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + + Assert.assertEquals( + 2, + microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4"))); + } + @Test public void testReadStreamOnIcebergThenAddData() throws Exception { List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; @@ -561,7 +617,25 @@ private StreamingQuery startStream() throws TimeoutException { } private StreamingQuery startStream(String key, String value) throws TimeoutException { - return startStream(ImmutableMap.of(key, value)); + return startStream( + ImmutableMap.of(key, value, SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")); + } + + private int microBatchCount(Map options) throws TimeoutException { + Dataset ds = spark.readStream().options(options).format("iceberg").load(tableName); + + ds.writeStream() + .options(options) + .foreachBatch( + (VoidFunction2, Long>) + (dataset, batchId) -> { + microBatches.getAndIncrement(); + }) + .start() + .processAllAvailable(); + + stopStreams(); + return microBatches.get(); } private List rowsAvailable(StreamingQuery query) {