Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,22 @@ public Long endTimestamp() {
return confParser.longConf().option(SparkReadOptions.END_TIMESTAMP).parseOptional();
}

public int maxFilesPerMicroBatch() {
return confParser
.intConf()
.option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH)
.defaultValue(Integer.MAX_VALUE)
.parse();
}

public int maxRecordsPerMicroBatch() {
return confParser
.intConf()
.option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH)
.defaultValue(Integer.MAX_VALUE)
.parse();
}

public boolean preserveDataGrouping() {
return confParser
.booleanConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ private SparkReadOptions() {}
// Timestamp in milliseconds; start a stream from the snapshot that occurs after this timestamp
public static final String STREAM_FROM_TIMESTAMP = "stream-from-timestamp";

// maximum file per micro_batch
public static final String STREAMING_MAX_FILES_PER_MICRO_BATCH =
"streaming-max-files-per-micro-batch";
// maximum rows per micro_batch
public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH =
"streaming-max-rows-per-micro-batch";

// Table path
public static final String PATH = "path";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MicroBatches;
import org.apache.iceberg.MicroBatches.MicroBatch;
import org.apache.iceberg.Schema;
Expand All @@ -38,6 +39,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
Expand All @@ -48,6 +50,7 @@
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
Expand All @@ -59,10 +62,12 @@
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkMicroBatchStream implements MicroBatchStream {
public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissionControl {
private static final Joiner SLASH = Joiner.on("/");
private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class);
private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of();
Expand All @@ -80,6 +85,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {
private final boolean skipDelete;
private final boolean skipOverwrite;
private final long fromTimestamp;
private final int maxFilesPerMicroBatch;
private final int maxRecordsPerMicroBatch;

SparkMicroBatchStream(
JavaSparkContext sparkContext,
Expand All @@ -97,6 +104,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {
this.splitLookback = readConf.splitLookback();
this.splitOpenFileCost = readConf.splitOpenFileCost();
this.fromTimestamp = readConf.streamFromTimestamp();
this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch();
this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch();

InitialOffsetStore initialOffsetStore =
new InitialOffsetStore(table, checkpointLocation, fromTimestamp);
Expand All @@ -118,16 +127,8 @@ public Offset latestOffset() {
}

Snapshot latestSnapshot = table.currentSnapshot();
long addedFilesCount =
PropertyUtil.propertyAsLong(latestSnapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1);
// if the latest snapshot summary doesn't contain SnapshotSummary.ADDED_FILES_PROP,
// iterate through addedDataFiles to compute addedFilesCount
addedFilesCount =
addedFilesCount == -1
? Iterables.size(latestSnapshot.addedDataFiles(table.io()))
: addedFilesCount;

return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount, false);

return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount(latestSnapshot), false);
}

@Override
Expand Down Expand Up @@ -204,34 +205,47 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse

StreamingOffset currentOffset = null;

// [(startOffset : startFileIndex), (endOffset : endFileIndex) )
do {
long endFileIndex;
if (currentOffset == null) {
currentOffset = batchStartOffset;
} else {
Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId());
currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
// it may happen that we need to read this snapshot partially in case it's equal to
// endOffset.
if (currentOffset.snapshotId() != endOffset.snapshotId()) {
currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
} else {
currentOffset = endOffset;
}
}

Snapshot snapshot = table.snapshot(currentOffset.snapshotId());

if (snapshot == null) {
throw new IllegalStateException(
String.format(
"Cannot load current offset at snapshot %d, the snapshot was expired or removed",
currentOffset.snapshotId()));
}
validateCurrentSnapshotExists(snapshot, currentOffset);

if (!shouldProcess(snapshot)) {
LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name());
continue;
}

Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
if (currentOffset.snapshotId() == endOffset.snapshotId()) {
endFileIndex = endOffset.position();
} else {
endFileIndex = addedFilesCount(currentSnapshot);
}

MicroBatch latestMicroBatch =
MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
MicroBatches.from(currentSnapshot, table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(
currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles());
currentOffset.position(),
endFileIndex,
Long.MAX_VALUE,
currentOffset.shouldScanAllFiles());

fileScanTasks.addAll(latestMicroBatch.tasks());
} while (currentOffset.snapshotId() != endOffset.snapshotId());
Expand Down Expand Up @@ -295,6 +309,139 @@ private static StreamingOffset determineStartingOffset(Table table, Long fromTim
}
}

@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public Offset latestOffset(Offset startOffset, ReadLimit limit) {
// calculate end offset get snapshotId from the startOffset
Preconditions.checkArgument(
startOffset instanceof StreamingOffset,
"Invalid start offset: %s is not a StreamingOffset",
startOffset);

table.refresh();
if (table.currentSnapshot() == null) {
return StreamingOffset.START_OFFSET;
}

if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
return StreamingOffset.START_OFFSET;
}

// end offset can expand to multiple snapshots
StreamingOffset startingOffset = (StreamingOffset) startOffset;

if (startOffset.equals(StreamingOffset.START_OFFSET)) {
startingOffset = determineStartingOffset(table, fromTimestamp);
}

Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
validateCurrentSnapshotExists(curSnapshot, startingOffset);

int startPosOfSnapOffset = (int) startingOffset.position();

boolean scanAllFiles = startingOffset.shouldScanAllFiles();

boolean shouldContinueReading = true;
int curFilesAdded = 0;
int curRecordCount = 0;
int curPos = 0;

// Note : we produce nextOffset with pos as non-inclusive
while (shouldContinueReading) {
// generate manifest index for the curSnapshot
List<Pair<ManifestFile, Integer>> indexedManifests =
MicroBatches.skippedManifestIndexesFromSnapshot(
table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
// this is under assumption we will be able to add at-least 1 file in the new offset
for (int idx = 0; idx < indexedManifests.size() && shouldContinueReading; idx++) {
// be rest assured curPos >= startFileIndex
curPos = indexedManifests.get(idx).second();
try (CloseableIterable<FileScanTask> taskIterable =
MicroBatches.openManifestFile(
table.io(),
table.specs(),
caseSensitive,
curSnapshot,
indexedManifests.get(idx).first(),
scanAllFiles);
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
if (curPos >= startPosOfSnapOffset) {
// TODO : use readLimit provided in function param, the readLimits are derived from
// these 2 properties.
if ((curFilesAdded + 1) > maxFilesPerMicroBatch
|| (curRecordCount + task.file().recordCount()) > maxRecordsPerMicroBatch) {
shouldContinueReading = false;
break;
}

Comment on lines +371 to +378
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a forward port this should be fine for now but probably worth creating an issue to track the ToDo to use the provided ReadLimit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK let me take this as follow-up of this pr immediately and add a tracking issue as well meanwhile.

curFilesAdded += 1;
curRecordCount += task.file().recordCount();
}
++curPos;
}
} catch (IOException ioe) {
LOG.warn("Failed to close task iterable", ioe);
}
}
// if the currentSnapShot was also the mostRecentSnapshot then break
if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
break;
}

// if everything was OK and we consumed complete snapshot then move to next snapshot
if (shouldContinueReading) {
startPosOfSnapOffset = -1;
curSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId());
// if anyhow we are moving to next snapshot we should only scan addedFiles
scanAllFiles = false;
}
}

StreamingOffset latestStreamingOffset =
new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles);

// if no new data arrived, then return null.
return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset;
}

private long addedFilesCount(Snapshot snapshot) {
long addedFilesCount =
PropertyUtil.propertyAsLong(snapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1);
// If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP,
// iterate through addedFiles iterator to find addedFilesCount.
return addedFilesCount == -1
? Iterables.size(snapshot.addedDataFiles(table.io()))
: addedFilesCount;
}

private void validateCurrentSnapshotExists(Snapshot snapshot, StreamingOffset currentOffset) {
if (snapshot == null) {
throw new IllegalStateException(
String.format(
"Cannot load current offset at snapshot %d, the snapshot was expired or removed",
currentOffset.snapshotId()));
}
}

@Override
public ReadLimit getDefaultReadLimit() {
if (maxFilesPerMicroBatch != Integer.MAX_VALUE
&& maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
ReadLimit[] readLimits = new ReadLimit[2];
readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch);
readLimits[1] = ReadLimit.maxRows(maxFilesPerMicroBatch);
return ReadLimit.compositeLimit(readLimits);
} else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) {
return ReadLimit.maxFiles(maxFilesPerMicroBatch);
} else if (maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
return ReadLimit.maxRows(maxRecordsPerMicroBatch);
} else {
return ReadLimit.allAvailable();
}
}

private static class InitialOffsetStore {
private final Table table;
private final FileIO io;
Expand Down
Loading