From a2c8bf8ec4bb12ebec140578318601aa2e218835 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Thu, 20 May 2021 19:20:49 -0700 Subject: [PATCH 1/2] minor refactoring in MicroBatches class to prep for Spark3 streaming change --- .../java/org/apache/iceberg/MicroBatches.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MicroBatches.java b/core/src/main/java/org/apache/iceberg/MicroBatches.java index 846f20a07b7c..9247131e4251 100644 --- a/core/src/main/java/org/apache/iceberg/MicroBatches.java +++ b/core/src/main/java/org/apache/iceberg/MicroBatches.java @@ -40,13 +40,13 @@ private MicroBatches() { public static class MicroBatch { private final long snapshotId; - private final int startFileIndex; - private final int endFileIndex; + private final long startFileIndex; + private final long endFileIndex; private final long sizeInBytes; private final List tasks; private final boolean lastIndexOfSnapshot; - private MicroBatch(long snapshotId, int startFileIndex, int endFileIndex, long sizeInBytes, + private MicroBatch(long snapshotId, long startFileIndex, long endFileIndex, long sizeInBytes, List tasks, boolean lastIndexOfSnapshot) { this.snapshotId = snapshotId; this.startFileIndex = startFileIndex; @@ -60,11 +60,11 @@ public long snapshotId() { return snapshotId; } - public int startFileIndex() { + public long startFileIndex() { return startFileIndex; } - public int endFileIndex() { + public long endFileIndex() { return endFileIndex; } @@ -109,18 +109,18 @@ public MicroBatchBuilder specsById(Map specs) { return this; } - public MicroBatch generate(int startFileIndex, long targetSizeInBytes, boolean isStarting) { + public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) { Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0"); Preconditions.checkArgument(targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0"); - List manifests = isStarting ? snapshot.dataManifests() : + List manifests = scanAllFiles ? snapshot.dataManifests() : snapshot.dataManifests().stream().filter(m -> m.snapshotId().equals(snapshot.snapshotId())) .collect(Collectors.toList()); List> manifestIndexes = indexManifests(manifests); List> skippedManifestIndexes = skipManifests(manifestIndexes, startFileIndex); - return generateMicroBatch(skippedManifestIndexes, startFileIndex, targetSizeInBytes, isStarting); + return generateMicroBatch(skippedManifestIndexes, startFileIndex, targetSizeInBytes, scanAllFiles); } /** @@ -153,7 +153,7 @@ private static List> indexManifests(List> skipManifests(List> indexedManifests, - int startFileIndex) { + long startFileIndex) { if (startFileIndex == 0) { return indexedManifests; } @@ -177,11 +177,11 @@ private static List> skipManifests(List> indexedManifests, - int startFileIndex, long targetSizeInBytes, boolean isStarting) { + long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) { if (indexedManifests.isEmpty()) { return new MicroBatch(snapshot.snapshotId(), startFileIndex, startFileIndex + 1, 0L, Collections.emptyList(), true); @@ -195,7 +195,7 @@ private MicroBatch generateMicroBatch(List> indexedM for (int idx = 0; idx < indexedManifests.size(); idx++) { currentFileIndex = indexedManifests.get(idx).second(); - try (CloseableIterable taskIterable = open(indexedManifests.get(idx).first(), isStarting); + try (CloseableIterable taskIterable = open(indexedManifests.get(idx).first(), scanAllFiles); CloseableIterator taskIter = taskIterable.iterator()) { while (taskIter.hasNext()) { FileScanTask task = taskIter.next(); @@ -238,11 +238,11 @@ private MicroBatch generateMicroBatch(List> indexedM tasks, isLastIndex); } - private CloseableIterable open(ManifestFile manifestFile, boolean isStarting) { + private CloseableIterable open(ManifestFile manifestFile, boolean scanAllFiles) { ManifestGroup manifestGroup = new ManifestGroup(io, ImmutableList.of(manifestFile)) .specsById(specsById) .caseSensitive(caseSensitive); - if (isStarting) { + if (scanAllFiles) { manifestGroup = manifestGroup .filterManifestEntries(entry -> entry.snapshotId() == snapshot.snapshotId() && entry.status() == ManifestEntry.Status.ADDED) From d7a8c43c158c3e01984c22bbb69efe151e22ce07 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Thu, 20 May 2021 21:58:40 -0700 Subject: [PATCH 2/2] fixing typo --- core/src/main/java/org/apache/iceberg/MicroBatches.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/MicroBatches.java b/core/src/main/java/org/apache/iceberg/MicroBatches.java index 9247131e4251..d97dc7fd811c 100644 --- a/core/src/main/java/org/apache/iceberg/MicroBatches.java +++ b/core/src/main/java/org/apache/iceberg/MicroBatches.java @@ -177,7 +177,7 @@ private static List> skipManifests(List> indexedManifests,