diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index d0ab73ab01552..882e1057c8043 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -59,18 +59,15 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader implements Iterable> { private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class); - + // A timer for calculating elapsed time in millis + public final HoodieTimer timer = new HoodieTimer(); // Final map of compacted/merged records protected final ExternalSpillableMap> records; - // count of merged records in log private long numMergedRecordsInLog; private long maxMemorySizeInBytes; - // Stores the total time taken to perform reading and merging of log blocks private long totalTimeTakenToReadAndMergeBlocks; - // A timer for calculating elapsed time in millis - public final HoodieTimer timer = new HoodieTimer(); @SuppressWarnings("unchecked") protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, @@ -143,9 +140,11 @@ protected void processNextRecord(HoodieRecord hoo HoodieRecord oldRecord = records.get(key); HoodieRecordPayload oldValue = oldRecord.getData(); HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue); - boolean choosePrev = combinedValue.equals(oldValue); - HoodieOperation operation = choosePrev ? oldRecord.getOperation() : hoodieRecord.getOperation(); - records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation)); + // If combinedValue is oldValue, no need rePut oldRecord + if (combinedValue != oldValue) { + HoodieOperation operation = hoodieRecord.getOperation(); + records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation)); + } } else { // Put the record as is records.put(key, hoodieRecord); @@ -187,11 +186,11 @@ public static class Builder extends AbstractHoodieLogRecordReader.Builder { protected boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue(); // incremental filtering protected Option instantRange = Option.empty(); + protected String partitionName; // auto scan default true private boolean autoScan = true; // operation field default false private boolean withOperationField = false; - protected String partitionName; @Override public Builder withFileSystem(FileSystem fs) {