diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index f8e82ae618581..e53dd38891604 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -193,13 +193,25 @@ public String showLogFileRecords( if (shouldMerge) { System.out.println("===========================> MERGING RECORDS <==================="); HoodieMergedLogRecordScanner scanner = - new HoodieMergedLogRecordScanner(fs, client.getBasePath(), logFilePaths, readerSchema, - client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(), - HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, - Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), - Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED), - HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE, - HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH); + HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(client.getBasePath()) + .withLogFilePaths(logFilePaths) + .withReaderSchema(readerSchema) + .withLatestInstantTime( + client.getActiveTimeline() + .getCommitTimeline().lastInstant().get().getTimestamp()) + .withReadBlocksLazily( + Boolean.parseBoolean( + HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)) + .withReverseReader( + Boolean.parseBoolean( + HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED)) + .withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE) + .withMaxMemorySizeInBytes( + HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) + .withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH) + .build(); for (HoodieRecord hoodieRecord : scanner) { Option record = hoodieRecord.getData().getInsertValue(readerSchema); if (allRecords.size() < limit) { diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java index 0c52220e03155..fbd2b92c2bb00 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java @@ -197,13 +197,23 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc // get expected result of 10 records. List logFilePaths = Arrays.stream(fs.globStatus(new Path(partitionPath + "/*"))) .map(status -> status.getPath().toString()).collect(Collectors.toList()); - HoodieMergedLogRecordScanner scanner = - new HoodieMergedLogRecordScanner(fs, tablePath, logFilePaths, schema, INSTANT_TIME, - HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, - Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), - Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED), - HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE, - HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(tablePath) + .withLogFilePaths(logFilePaths) + .withReaderSchema(schema) + .withLatestInstantTime(INSTANT_TIME) + .withMaxMemorySizeInBytes( + HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) + .withReadBlocksLazily( + Boolean.parseBoolean( + HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)) + .withReverseReader( + Boolean.parseBoolean( + HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED)) + .withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE) + .withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH) + .build(); Iterator> records = scanner.iterator(); int num = 0; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java index 505eabb88baef..65cefc9b9923c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java @@ -129,10 +129,18 @@ private List compact(HoodieSparkCopyOnWriteTable hoodieCopyOnWriteT List logFiles = operation.getDeltaFileNames().stream().map( p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) .collect(toList()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles, - readerSchema, maxInstantTime, maxMemoryPerCompaction, config.getCompactionLazyBlockReadEnabled(), - config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), - config.getSpillableMapBasePath()); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(metaClient.getBasePath()) + .withLogFilePaths(logFiles) + .withReaderSchema(readerSchema) + .withLatestInstantTime(maxInstantTime) + .withMaxMemorySizeInBytes(maxMemoryPerCompaction) + .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) + .withReverseReader(config.getCompactionReverseLogReadEnabled()) + .withBufferSize(config.getMaxDFSStreamBufferSize()) + .withSpillableMapBasePath(config.getSpillableMapBasePath()) + .build(); if (!scanner.iterator().hasNext()) { return new ArrayList<>(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index 8d8ef56da7ab9..4ae709eda1de7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -105,7 +105,6 @@ public abstract class AbstractHoodieLogRecordScanner { // Progress private float progress = 0.0f; - // TODO (NA) - Change this to a builder, this constructor is too long public AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize) { this.readerSchema = readerSchema; @@ -358,4 +357,28 @@ public long getTotalRollbacks() { public long getTotalCorruptBlocks() { return totalCorruptBlocks.get(); } + + /** + * Builder used to build {@code AbstractHoodieLogRecordScanner}. + */ + public abstract static class Builder { + + public abstract Builder withFileSystem(FileSystem fs); + + public abstract Builder withBasePath(String basePath); + + public abstract Builder withLogFilePaths(List logFilePaths); + + public abstract Builder withReaderSchema(Schema schema); + + public abstract Builder withLatestInstantTime(String latestInstantTime); + + public abstract Builder withReadBlocksLazily(boolean readBlocksLazily); + + public abstract Builder withReverseReader(boolean reverseReader); + + public abstract Builder withBufferSize(int bufferSize); + + public abstract AbstractHoodieLogRecordScanner build(); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 025ae91758df3..18f2167b7e346 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -105,6 +105,13 @@ public long getNumMergedRecordsInLog() { return numMergedRecordsInLog; } + /** + * Returns the builder for {@code HoodieMergedLogRecordScanner}. + */ + public static HoodieMergedLogRecordScanner.Builder newBuilder() { + return new Builder(); + } + @Override protected void processNextRecord(HoodieRecord hoodieRecord) throws IOException { String key = hoodieRecord.getRecordKey(); @@ -128,5 +135,79 @@ protected void processNextDeletedKey(HoodieKey hoodieKey) { public long getTotalTimeTakenToReadAndMergeBlocks() { return totalTimeTakenToReadAndMergeBlocks; } + + /** + * Builder used to build {@code HoodieUnMergedLogRecordScanner}. + */ + public static class Builder extends AbstractHoodieLogRecordScanner.Builder { + private FileSystem fs; + private String basePath; + private List logFilePaths; + private Schema readerSchema; + private String latestInstantTime; + private boolean readBlocksLazily; + private boolean reverseReader; + private int bufferSize; + // specific configurations + private Long maxMemorySizeInBytes; + private String spillableMapBasePath; + + public Builder withFileSystem(FileSystem fs) { + this.fs = fs; + return this; + } + + public Builder withBasePath(String basePath) { + this.basePath = basePath; + return this; + } + + public Builder withLogFilePaths(List logFilePaths) { + this.logFilePaths = logFilePaths; + return this; + } + + public Builder withReaderSchema(Schema schema) { + this.readerSchema = schema; + return this; + } + + public Builder withLatestInstantTime(String latestInstantTime) { + this.latestInstantTime = latestInstantTime; + return this; + } + + public Builder withReadBlocksLazily(boolean readBlocksLazily) { + this.readBlocksLazily = readBlocksLazily; + return this; + } + + public Builder withReverseReader(boolean reverseReader) { + this.reverseReader = reverseReader; + return this; + } + + public Builder withBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) { + this.maxMemorySizeInBytes = maxMemorySizeInBytes; + return this; + } + + public Builder withSpillableMapBasePath(String spillableMapBasePath) { + this.spillableMapBasePath = spillableMapBasePath; + return this; + } + + @Override + public HoodieMergedLogRecordScanner build() { + return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, + latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader, + bufferSize, spillableMapBasePath); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 9c9df12f0e787..1aac6330e06ba 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -41,6 +41,13 @@ public HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List hoodieRecord) throws Exception { // Just call callback without merging @@ -60,4 +67,71 @@ public static interface LogRecordScannerCallback { public void apply(HoodieRecord record) throws Exception; } + + /** + * Builder used to build {@code HoodieUnMergedLogRecordScanner}. + */ + public static class Builder extends AbstractHoodieLogRecordScanner.Builder { + private FileSystem fs; + private String basePath; + private List logFilePaths; + private Schema readerSchema; + private String latestInstantTime; + private boolean readBlocksLazily; + private boolean reverseReader; + private int bufferSize; + // specific configurations + private LogRecordScannerCallback callback; + + public Builder withFileSystem(FileSystem fs) { + this.fs = fs; + return this; + } + + public Builder withBasePath(String basePath) { + this.basePath = basePath; + return this; + } + + public Builder withLogFilePaths(List logFilePaths) { + this.logFilePaths = logFilePaths; + return this; + } + + public Builder withReaderSchema(Schema schema) { + this.readerSchema = schema; + return this; + } + + public Builder withLatestInstantTime(String latestInstantTime) { + this.latestInstantTime = latestInstantTime; + return this; + } + + public Builder withReadBlocksLazily(boolean readBlocksLazily) { + this.readBlocksLazily = readBlocksLazily; + return this; + } + + public Builder withReverseReader(boolean reverseReader) { + this.reverseReader = reverseReader; + return this; + } + + public Builder withBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) { + this.callback = callback; + return this; + } + + @Override + public HoodieUnMergedLogRecordScanner build() { + return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, + latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback); + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index fa25e17755c1f..98ece7309ea7d 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -460,9 +460,20 @@ public void testBasicAppendAndScanMultipleFiles(boolean readBlocksLazily) writer.close(); // scan all log blocks (across multiple log files) - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, - logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()), schema, "100", - 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths( + logFiles.stream() + .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())) + .withReaderSchema(schema) + .withLatestInstantTime("100") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(readBlocksLazily) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .build(); List scannedRecords = new ArrayList<>(); for (HoodieRecord record : scanner) { @@ -601,8 +612,18 @@ public void testAvroLogRecordReaderBasic(boolean readBlocksLazily) FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") .map(s -> s.getPath().toString()).collect(Collectors.toList()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100", - 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(allLogFiles) + .withReaderSchema(schema) + .withLatestInstantTime("100") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(readBlocksLazily) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .build(); assertEquals(200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -663,8 +684,18 @@ public void testAvroLogRecordReaderWithRollbackTombstone(boolean readBlocksLazil FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") .map(s -> s.getPath().toString()).collect(Collectors.toList()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "102", - 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(allLogFiles) + .withReaderSchema(schema) + .withLatestInstantTime("102") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(readBlocksLazily) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .build(); assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 2 write batches"); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -742,8 +773,18 @@ public void testAvroLogRecordReaderWithRollbackPartialBlock() FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") .map(s -> s.getPath().toString()).collect(Collectors.toList()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "103", - 10240L, true, false, bufferSize, BASE_OUTPUT_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(allLogFiles) + .withReaderSchema(schema) + .withLatestInstantTime("103") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(true) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .build(); assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records"); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -802,8 +843,18 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(boolean readBlocksLazil FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") .map(s -> s.getPath().toString()).collect(Collectors.toList()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "102", - 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(allLogFiles) + .withReaderSchema(schema) + .withLatestInstantTime("102") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(readBlocksLazily) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .build(); assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records"); final List readKeys = new ArrayList<>(200); final List emptyPayloads = new ArrayList<>(); @@ -833,8 +884,18 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(boolean readBlocksLazil writer.appendBlock(commandBlock); readKeys.clear(); - scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "101", 10240L, readBlocksLazily, - false, bufferSize, BASE_OUTPUT_PATH); + scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(allLogFiles) + .withReaderSchema(schema) + .withLatestInstantTime("101") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(readBlocksLazily) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .build(); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete"); } @@ -898,8 +959,18 @@ public void testAvroLogRecordReaderWithFailedRollbacks(boolean readBlocksLazily) .map(s -> s.getPath().toString()).collect(Collectors.toList()); // all data must be rolled back before merge - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100", - 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(allLogFiles) + .withReaderSchema(schema) + .withLatestInstantTime("100") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(readBlocksLazily) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 records because of rollback"); final List readKeys = new ArrayList<>(); @@ -949,8 +1020,18 @@ public void testAvroLogRecordReaderWithInsertDeleteAndRollback(boolean readBlock FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") .map(s -> s.getPath().toString()).collect(Collectors.toList()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100", - 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(allLogFiles) + .withReaderSchema(schema) + .withLatestInstantTime("100") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(readBlocksLazily) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); } @@ -983,8 +1064,18 @@ public void testAvroLogRecordReaderWithInvalidRollback(boolean readBlocksLazily) FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") .map(s -> s.getPath().toString()).collect(Collectors.toList()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100", - 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(allLogFiles) + .withReaderSchema(schema) + .withLatestInstantTime("100") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(readBlocksLazily) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .build(); assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 records"); final List readKeys = new ArrayList<>(100); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -1036,8 +1127,18 @@ public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(boolean readBloc FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") .map(s -> s.getPath().toString()).collect(Collectors.toList()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "101", - 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(allLogFiles) + .withReaderSchema(schema) + .withLatestInstantTime("101") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(readBlocksLazily) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); } @@ -1126,8 +1227,18 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(boolean r FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") .map(s -> s.getPath().toString()).collect(Collectors.toList()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "101", - 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(allLogFiles) + .withReaderSchema(schema) + .withLatestInstantTime("101") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(readBlocksLazily) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); } @@ -1183,8 +1294,18 @@ private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1 List allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, - "100", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(allLogFiles) + .withReaderSchema(schema) + .withLatestInstantTime("100") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(readBlocksLazily) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .build(); assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog(), "We would read 100 records"); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index b710b599b0ba8..a139997cad974 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -63,17 +63,18 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit // but can return records for completed commits > the commit we are trying to read (if using // readCommit() API) - return new HoodieMergedLogRecordScanner( - FSUtils.getFs(split.getPath().toString(), jobConf), - split.getBasePath(), - split.getDeltaLogPaths(), - usesCustomPayload ? getWriterSchema() : getReaderSchema(), - split.getMaxCommitTime(), - HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), - Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), - false, - jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), - jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)); + return HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf)) + .withBasePath(split.getBasePath()) + .withLogFilePaths(split.getDeltaLogPaths()) + .withReaderSchema(usesCustomPayload ? getWriterSchema() : getReaderSchema()) + .withLatestInstantTime(split.getMaxCommitTime()) + .withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf)) + .withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) + .withReverseReader(false) + .withBufferSize(jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) + .withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) + .build(); } @Override diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index 76de84bd9a52e..d209a5a388fe6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -77,15 +77,22 @@ public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, Option.empty(), x -> x, new DefaultSizeEstimator<>()); // Consumer of this record reader this.iterator = this.executor.getQueue().iterator(); - this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), this.jobConf), - split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), split.getMaxCommitTime(), - Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), - false, this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> { + this.logRecordScanner = HoodieUnMergedLogRecordScanner.newBuilder() + .withFileSystem(FSUtils.getFs(split.getPath().toString(), this.jobConf)) + .withBasePath(split.getBasePath()) + .withLogFilePaths(split.getDeltaLogPaths()) + .withReaderSchema(getReaderSchema()) + .withLatestInstantTime(split.getMaxCommitTime()) + .withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) + .withReverseReader(false) + .withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) + .withLogRecordScannerCallback(record -> { // convert Hoodie log record to Hadoop AvroWritable and buffer GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get(); ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema()); this.executor.getQueue().insertRecord(aWritable); - }); + }) + .build(); // Start reading and buffering this.executor.startProducers(); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index cfe7991f43ee5..2bd507ca5a3cf 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -249,14 +249,21 @@ private Iterator readParquetOrLogFiles(FileSlice fileSlice) throw return itr; } else { // If there is no data file, fall back to reading log files - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(metaClient.getFs(), - metaClient.getBasePath(), - fileSlice.getLogFiles().map(l -> l.getPath().getName()).collect(Collectors.toList()), - new Schema.Parser().parse(schemaStr), metaClient.getActiveTimeline().getCommitsTimeline() - .filterCompletedInstants().lastInstant().get().getTimestamp(), - HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, true, false, - HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE, - HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(metaClient.getFs()) + .withBasePath(metaClient.getBasePath()) + .withLogFilePaths( + fileSlice.getLogFiles().map(l -> l.getPath().getName()).collect(Collectors.toList())) + .withReaderSchema(new Schema.Parser().parse(schemaStr)) + .withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().lastInstant().get().getTimestamp()) + .withMaxMemorySizeInBytes( + HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) + .withReadBlocksLazily(true) + .withReverseReader(false) + .withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE) + .withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH) + .build(); // readAvro log files Iterable> iterable = () -> scanner.iterator(); Schema schema = new Schema.Parser().parse(schemaStr); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 148102480c3c5..e8caa63912397 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -255,19 +255,24 @@ private object HoodieMergeOnReadRDD { def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = { val fs = FSUtils.getFs(split.tablePath, config) - new HoodieMergedLogRecordScanner( - fs, - split.tablePath, - split.logPaths.get.asJava, - logSchema, - split.latestCommit, - split.maxCompactionMemoryInBytes, - Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, - HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean).getOrElse(false), - false, - config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, - HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), - config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, - HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) + HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(split.tablePath) + .withLogFilePaths(split.logPaths.get.asJava) + .withReaderSchema(logSchema) + .withLatestInstantTime(split.latestCommit) + .withReadBlocksLazily( + Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, + HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean) + .getOrElse(false)) + .withReverseReader(false) + .withBufferSize( + config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, + HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) + .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes) + .withSpillableMapBasePath( + config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, + HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) + .build() } }