Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,15 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
implements Iterable<HoodieRecord<? extends HoodieRecordPayload>> {

private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class);

// A timer for calculating elapsed time in millis
public final HoodieTimer timer = new HoodieTimer();
// Final map of compacted/merged records
protected final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records;

// count of merged records in log
private long numMergedRecordsInLog;
private long maxMemorySizeInBytes;

// Stores the total time taken to perform reading and merging of log blocks
private long totalTimeTakenToReadAndMergeBlocks;
// A timer for calculating elapsed time in millis
public final HoodieTimer timer = new HoodieTimer();

@SuppressWarnings("unchecked")
protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
Expand Down Expand Up @@ -143,9 +140,11 @@ protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoo
HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
HoodieRecordPayload oldValue = oldRecord.getData();
HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue);
boolean choosePrev = combinedValue.equals(oldValue);
HoodieOperation operation = choosePrev ? oldRecord.getOperation() : hoodieRecord.getOperation();
records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation));
// If combinedValue is oldValue, no need rePut oldRecord
if (combinedValue != oldValue) {
HoodieOperation operation = hoodieRecord.getOperation();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use == instead which is more efficient. Currently the payload clazz has no uniform #equals yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. please review again.

records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation));
}
} else {
// Put the record as is
records.put(key, hoodieRecord);
Expand Down Expand Up @@ -187,11 +186,11 @@ public static class Builder extends AbstractHoodieLogRecordReader.Builder {
protected boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
// incremental filtering
protected Option<InstantRange> instantRange = Option.empty();
protected String partitionName;
// auto scan default true
private boolean autoScan = true;
// operation field default false
private boolean withOperationField = false;
protected String partitionName;

@Override
public Builder withFileSystem(FileSystem fs) {
Expand Down