diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 3c2432d6a2f3..3f8b1bf0f84e 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -37,6 +37,7 @@ import org.apache.iceberg.SchemaParser; import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; @@ -50,6 +51,7 @@ import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.source.SparkScan.ReadTask; import org.apache.iceberg.spark.source.SparkScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.iceberg.util.Tasks; @@ -111,8 +113,12 @@ public Offset latestOffset() { } Snapshot latestSnapshot = table.currentSnapshot(); - return new StreamingOffset( - latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles(table.io())), false); + long addedFilesCount = PropertyUtil.propertyAsLong(latestSnapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1); + // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP, iterate through addedFiles iterator to find + // addedFilesCount. + addedFilesCount = addedFilesCount == -1 ? Iterables.size(latestSnapshot.addedFiles()) : addedFilesCount; + + return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount, false); } @Override