Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 121 additions & 88 deletions core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
Expand All @@ -36,6 +37,96 @@
public class MicroBatches {
private MicroBatches() {}

public static List<Pair<ManifestFile, Integer>> skippedManifestIndexesFromSnapshot(
FileIO io, Snapshot snapshot, long startFileIndex, boolean scanAllFiles) {
List<ManifestFile> manifests =
scanAllFiles
? snapshot.dataManifests(io)
: snapshot.dataManifests(io).stream()
.filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
.collect(Collectors.toList());

List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests);

return skipManifests(manifestIndexes, startFileIndex);
}

public static CloseableIterable<FileScanTask> openManifestFile(
FileIO io,
Map<Integer, PartitionSpec> specsById,
boolean caseSensitive,
Snapshot snapshot,
ManifestFile manifestFile,
boolean scanAllFiles) {

ManifestGroup manifestGroup =
new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
if (!scanAllFiles) {
manifestGroup =
manifestGroup
.filterManifestEntries(
entry ->
entry.snapshotId() == snapshot.snapshotId()
&& entry.status() == ManifestEntry.Status.ADDED)
.ignoreDeleted();
}

return manifestGroup.planFiles();
}

/**
* Method to index the data files for each manifest. For example, if manifest m1 has 3 data files,
* manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2,
* 3), (m3, 5).
*
* @param manifestFiles List of input manifests used to index.
* @return a list pairing each manifest with the index number of the first data file entry in that
* manifest.
*/
private static List<Pair<ManifestFile, Integer>> indexManifests(
List<ManifestFile> manifestFiles) {
int currentFileIndex = 0;
List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();

for (ManifestFile manifest : manifestFiles) {
manifestIndexes.add(Pair.of(manifest, currentFileIndex));
currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount();
}

return manifestIndexes;
}

/**
* Method to skip the manifest file whose index is smaller than startFileIndex. For example, if
* the index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the returned
* manifest index list is: (m2, 3), (m3, 5).
*
* @param indexedManifests List of input manifests.
* @param startFileIndex Index used to skip all manifests with an index less than or equal to this
* value.
* @return a sub-list of manifest file index which only contains the manifest indexes larger than
* the startFileIndex.
*/
private static List<Pair<ManifestFile, Integer>> skipManifests(
List<Pair<ManifestFile, Integer>> indexedManifests, long startFileIndex) {
if (startFileIndex == 0) {
return indexedManifests;
}

int manifestIndex = 0;
for (Pair<ManifestFile, Integer> manifest : indexedManifests) {
if (manifest.second() > startFileIndex) {
break;
}

manifestIndex++;
}

return indexedManifests.subList(Math.max(manifestIndex - 1, 0), indexedManifests.size());
}

public static class MicroBatch {
private final long snapshotId;
private final long startFileIndex;
Expand Down Expand Up @@ -113,73 +204,27 @@ public MicroBatchBuilder specsById(Map<Integer, PartitionSpec> specs) {
}

public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
return generate(
startFileIndex,
Iterables.size(snapshot.addedDataFiles(io)),
targetSizeInBytes,
scanAllFiles);
}

public MicroBatch generate(
long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(
startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(
targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0");

List<ManifestFile> manifests =
scanAllFiles
? snapshot.dataManifests(io)
: snapshot.dataManifests(io).stream()
.filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
.collect(Collectors.toList());

List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests);
List<Pair<ManifestFile, Integer>> skippedManifestIndexes =
skipManifests(manifestIndexes, startFileIndex);

return generateMicroBatch(
skippedManifestIndexes, startFileIndex, targetSizeInBytes, scanAllFiles);
}

/**
* Method to index the data files for each manifest. For example, if manifest m1 has 3 data
* files, manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1,
* 0), (m2, 3), (m3, 5).
*
* @param manifestFiles List of input manifests used to index.
* @return a list of manifest index with key as manifest file, value as file counts.
*/
private static List<Pair<ManifestFile, Integer>> indexManifests(
List<ManifestFile> manifestFiles) {
int currentFileIndex = 0;
List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();

for (ManifestFile manifest : manifestFiles) {
manifestIndexes.add(Pair.of(manifest, currentFileIndex));
currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount();
}

return manifestIndexes;
}

/**
* Method to skip the manifest file in which the index is smaller than startFileIndex. For
* example, if the index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the
* returned manifest index list is: (m2, 3), (m3, 5).
*
* @param indexedManifests List of input manifests.
* @param startFileIndex Index used to skip the processed manifests.
* @return a sub-list of manifest file index which only contains the manifest indexes larger
* than the startFileIndex.
*/
private static List<Pair<ManifestFile, Integer>> skipManifests(
List<Pair<ManifestFile, Integer>> indexedManifests, long startFileIndex) {
if (startFileIndex == 0) {
return indexedManifests;
}

int manifestIndex = 0;
for (Pair<ManifestFile, Integer> manifest : indexedManifests) {
if (manifest.second() > startFileIndex) {
break;
}

manifestIndex++;
}

return indexedManifests.subList(manifestIndex - 1, indexedManifests.size());
skippedManifestIndexesFromSnapshot(io, snapshot, startFileIndex, scanAllFiles),
startFileIndex,
endFileIndex,
targetSizeInBytes,
scanAllFiles);
}

/**
Expand All @@ -188,25 +233,23 @@ private static List<Pair<ManifestFile, Integer>> skipManifests(
*
* @param indexedManifests A list of indexed manifests to generate MicroBatch
* @param startFileIndex A startFileIndex used to skip processed files.
* @param endFileIndex An endFileIndex used to find files to include, exclusive.
* @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes
* must be smaller than this size.
* @param scanAllFiles Used to check whether all the data files should be processed, or only
* added files.
* @return A MicroBatch.
*/
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private MicroBatch generateMicroBatch(
List<Pair<ManifestFile, Integer>> indexedManifests,
long startFileIndex,
long endFileIndex,
long targetSizeInBytes,
boolean scanAllFiles) {
if (indexedManifests.isEmpty()) {
return new MicroBatch(
snapshot.snapshotId(),
startFileIndex,
startFileIndex + 1,
0L,
Collections.emptyList(),
true);
snapshot.snapshotId(), startFileIndex, endFileIndex, 0L, Collections.emptyList(), true);
}

long currentSizeInBytes = 0L;
Expand All @@ -218,11 +261,18 @@ private MicroBatch generateMicroBatch(
currentFileIndex = indexedManifests.get(idx).second();

try (CloseableIterable<FileScanTask> taskIterable =
open(indexedManifests.get(idx).first(), scanAllFiles);
openManifestFile(
io,
specsById,
caseSensitive,
snapshot,
indexedManifests.get(idx).first(),
scanAllFiles);
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
if (currentFileIndex >= startFileIndex) {
// want to read [startFileIndex ... endFileIndex)
if (currentFileIndex >= startFileIndex && currentFileIndex < endFileIndex) {
// Make sure there's at least one task in each MicroBatch to void job to be stuck,
// always add task
// firstly.
Expand All @@ -231,7 +281,7 @@ private MicroBatch generateMicroBatch(
}

currentFileIndex++;
if (currentSizeInBytes >= targetSizeInBytes) {
if (currentSizeInBytes >= targetSizeInBytes || currentFileIndex >= endFileIndex) {
break;
}
}
Expand Down Expand Up @@ -259,6 +309,7 @@ private MicroBatch generateMicroBatch(
}
}

// [startFileIndex ....currentFileIndex)
return new MicroBatch(
snapshot.snapshotId(),
startFileIndex,
Expand All @@ -267,23 +318,5 @@ private MicroBatch generateMicroBatch(
tasks,
isLastIndex);
}

private CloseableIterable<FileScanTask> open(ManifestFile manifestFile, boolean scanAllFiles) {
ManifestGroup manifestGroup =
new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
if (!scanAllFiles) {
manifestGroup =
manifestGroup
.filterManifestEntries(
entry ->
entry.snapshotId() == snapshot.snapshotId()
&& entry.status() == ManifestEntry.Status.ADDED)
.ignoreDeleted();
}

return manifestGroup.planFiles();
}
}
}
20 changes: 10 additions & 10 deletions core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testGenerateMicroBatch() {
MicroBatch batch =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, Long.MAX_VALUE, true);
.generate(0, 6, Long.MAX_VALUE, true);
Assert.assertEquals(batch.snapshotId(), 1L);
Assert.assertEquals(batch.startFileIndex(), 0);
Assert.assertEquals(batch.endFileIndex(), 5);
Expand All @@ -63,7 +63,7 @@ public void testGenerateMicroBatch() {
MicroBatch batch1 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 15L, true);
.generate(0, 1, 15L, true);
Assert.assertEquals(batch1.endFileIndex(), 1);
Assert.assertEquals(batch1.sizeInBytes(), 10);
Assert.assertFalse(batch1.lastIndexOfSnapshot());
Expand All @@ -72,7 +72,7 @@ public void testGenerateMicroBatch() {
MicroBatch batch2 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 30L, true);
.generate(batch1.endFileIndex(), 4, 30L, true);
Assert.assertEquals(batch2.endFileIndex(), 4);
Assert.assertEquals(batch2.sizeInBytes(), 30);
Assert.assertFalse(batch2.lastIndexOfSnapshot());
Expand All @@ -81,7 +81,7 @@ public void testGenerateMicroBatch() {
MicroBatch batch3 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 50L, true);
.generate(batch2.endFileIndex(), 5, 50L, true);
Assert.assertEquals(batch3.endFileIndex(), 5);
Assert.assertEquals(batch3.sizeInBytes(), 10);
Assert.assertTrue(batch3.lastIndexOfSnapshot());
Expand All @@ -95,7 +95,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
MicroBatch batch =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 10L, true);
.generate(0, 1, 10L, true);
Assert.assertEquals(batch.snapshotId(), 1L);
Assert.assertEquals(batch.startFileIndex(), 0);
Assert.assertEquals(batch.endFileIndex(), 1);
Expand All @@ -106,7 +106,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
MicroBatch batch1 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch.endFileIndex(), 5L, true);
.generate(batch.endFileIndex(), 2, 5L, true);
Assert.assertEquals(batch1.endFileIndex(), 2);
Assert.assertEquals(batch1.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("B"), filesToScan(batch1.tasks()));
Expand All @@ -115,7 +115,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
MicroBatch batch2 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 10L, true);
.generate(batch1.endFileIndex(), 3, 10L, true);
Assert.assertEquals(batch2.endFileIndex(), 3);
Assert.assertEquals(batch2.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("C"), filesToScan(batch2.tasks()));
Expand All @@ -124,7 +124,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
MicroBatch batch3 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 10L, true);
.generate(batch2.endFileIndex(), 4, 10L, true);
Assert.assertEquals(batch3.endFileIndex(), 4);
Assert.assertEquals(batch3.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("D"), filesToScan(batch3.tasks()));
Expand All @@ -133,7 +133,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
MicroBatch batch4 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch3.endFileIndex(), 5L, true);
.generate(batch3.endFileIndex(), 5, 5L, true);
Assert.assertEquals(batch4.endFileIndex(), 5);
Assert.assertEquals(batch4.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("E"), filesToScan(batch4.tasks()));
Expand All @@ -142,7 +142,7 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
MicroBatch batch5 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch4.endFileIndex(), 5L, true);
.generate(batch4.endFileIndex(), 5, 5L, true);
Assert.assertEquals(batch5.endFileIndex(), 5);
Assert.assertEquals(batch5.sizeInBytes(), 0);
Assert.assertTrue(Iterables.isEmpty(batch5.tasks()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,22 @@ public Long endTimestamp() {
return confParser.longConf().option(SparkReadOptions.END_TIMESTAMP).parseOptional();
}

public Integer maxFilesPerMicroBatch() {
return confParser
.intConf()
.option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH)
.defaultValue(Integer.MAX_VALUE)
.parse();
}

public Integer maxRecordsPerMicroBatch() {
return confParser
.intConf()
.option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH)
.defaultValue(Integer.MAX_VALUE)
.parse();
}

public boolean preserveDataGrouping() {
return confParser
.booleanConf()
Expand Down
Loading