Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ private MicroBatches() {

public static class MicroBatch {
private final long snapshotId;
private final int startFileIndex;
private final int endFileIndex;
private final long startFileIndex;
private final long endFileIndex;
private final long sizeInBytes;
private final List<FileScanTask> tasks;
private final boolean lastIndexOfSnapshot;

private MicroBatch(long snapshotId, int startFileIndex, int endFileIndex, long sizeInBytes,
private MicroBatch(long snapshotId, long startFileIndex, long endFileIndex, long sizeInBytes,
List<FileScanTask> tasks, boolean lastIndexOfSnapshot) {
this.snapshotId = snapshotId;
this.startFileIndex = startFileIndex;
Expand All @@ -60,11 +60,11 @@ public long snapshotId() {
return snapshotId;
}

public int startFileIndex() {
public long startFileIndex() {
return startFileIndex;
}

public int endFileIndex() {
public long endFileIndex() {
return endFileIndex;
}

Expand Down Expand Up @@ -109,18 +109,18 @@ public MicroBatchBuilder specsById(Map<Integer, PartitionSpec> specs) {
return this;
}

public MicroBatch generate(int startFileIndex, long targetSizeInBytes, boolean isStarting) {
public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0");

List<ManifestFile> manifests = isStarting ? snapshot.dataManifests() :
List<ManifestFile> manifests = scanAllFiles ? snapshot.dataManifests() :
snapshot.dataManifests().stream().filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
.collect(Collectors.toList());

List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests);
List<Pair<ManifestFile, Integer>> skippedManifestIndexes = skipManifests(manifestIndexes, startFileIndex);

return generateMicroBatch(skippedManifestIndexes, startFileIndex, targetSizeInBytes, isStarting);
return generateMicroBatch(skippedManifestIndexes, startFileIndex, targetSizeInBytes, scanAllFiles);
}

/**
Expand Down Expand Up @@ -153,7 +153,7 @@ private static List<Pair<ManifestFile, Integer>> indexManifests(List<ManifestFil
* startFileIndex.
*/
private static List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<ManifestFile, Integer>> indexedManifests,
int startFileIndex) {
long startFileIndex) {
if (startFileIndex == 0) {
return indexedManifests;
}
Expand All @@ -177,11 +177,11 @@ private static List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<Manifes
* @param startFileIndex A startFileIndex used to skip processed files.
* @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes must be smaller than
* this size.
* @param isStarting Used to check where all the data file should be processed, or only added files.
* @param scanAllFiles Used to check whether all the data files should be processed, or only added files.
* @return A MicroBatch.
*/
private MicroBatch generateMicroBatch(List<Pair<ManifestFile, Integer>> indexedManifests,
int startFileIndex, long targetSizeInBytes, boolean isStarting) {
long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
if (indexedManifests.isEmpty()) {
return new MicroBatch(snapshot.snapshotId(), startFileIndex, startFileIndex + 1, 0L,
Collections.emptyList(), true);
Expand All @@ -195,7 +195,7 @@ private MicroBatch generateMicroBatch(List<Pair<ManifestFile, Integer>> indexedM
for (int idx = 0; idx < indexedManifests.size(); idx++) {
currentFileIndex = indexedManifests.get(idx).second();

try (CloseableIterable<FileScanTask> taskIterable = open(indexedManifests.get(idx).first(), isStarting);
try (CloseableIterable<FileScanTask> taskIterable = open(indexedManifests.get(idx).first(), scanAllFiles);
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
Expand Down Expand Up @@ -238,11 +238,11 @@ private MicroBatch generateMicroBatch(List<Pair<ManifestFile, Integer>> indexedM
tasks, isLastIndex);
}

private CloseableIterable<FileScanTask> open(ManifestFile manifestFile, boolean isStarting) {
private CloseableIterable<FileScanTask> open(ManifestFile manifestFile, boolean scanAllFiles) {
ManifestGroup manifestGroup = new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
if (isStarting) {
if (scanAllFiles) {
manifestGroup = manifestGroup
.filterManifestEntries(entry ->
entry.snapshotId() == snapshot.snapshotId() && entry.status() == ManifestEntry.Status.ADDED)
Expand Down