Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 29 additions & 24 deletions hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
Expand All @@ -29,7 +30,6 @@
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.HoodieAvroUtils;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -58,7 +58,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit

private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);

private Map<String, HoodieRecord<T>> keyToNewRecords;
private Map<String, T> keyToNewRecords;
private Set<String> writtenRecordKeys;
private HoodieStorageWriter<IndexedRecord> storageWriter;
private Path newFilePath;
Expand All @@ -68,12 +68,15 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
private long updatedRecordsWritten = 0;
private long insertRecordsWritten = 0;
private boolean useWriterSchema;
private String partitionPath;
private String fileId;

public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String fileId) {
super(config, commitTime, fileId, hoodieTable);
String partitionPath = init(fileId, recordItr);
init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
this.partitionPath = initSpillableMap(recordItr);
this.fileId = fileId;
init(hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
}

/**
Expand All @@ -82,10 +85,11 @@ public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTabl
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords, String fileId, HoodieBaseFile dataFileToBeMerged) {
super(config, commitTime, fileId, hoodieTable);
this.keyToNewRecords = keyToNewRecords;
this.keyToNewRecords = null; // hack to get it to compile. Compaction is now broken
this.useWriterSchema = true;
init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get()).getPartitionPath(),
dataFileToBeMerged);
this.fileId = fileId;
this.partitionPath = keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get()).getPartitionPath();
init(dataFileToBeMerged);
}

public static Schema createHoodieWriteSchema(Schema originalSchema) {
Expand Down Expand Up @@ -154,7 +158,7 @@ protected GenericRecord rewriteRecord(GenericRecord record) {
/**
* Extract old file path, initialize StorageWriter and WriteStatus.
*/
private void init(String fileId, String partitionPath, HoodieBaseFile dataFileToBeMerged) {
private void init(HoodieBaseFile dataFileToBeMerged) {
LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId);
this.writtenRecordKeys = new HashSet<>();
writeStatus.setStat(new HoodieWriteStat());
Expand Down Expand Up @@ -197,32 +201,27 @@ private void init(String fileId, String partitionPath, HoodieBaseFile dataFileTo
/**
* Load the new incoming records in a map and return partitionPath.
*/
private String init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
private String initSpillableMap(Iterator<HoodieRecord<T>> newRecordsItr) {
try {
// Load the new records in a map
long memoryForMerge = config.getMaxMemoryPerPartitionMerge();
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema));
this.keyToNewRecords = new ExternalSpillableMap<String, T>(memoryForMerge, config.getSpillableMapBasePath(),
new DefaultSizeEstimator(), new DefaultSizeEstimator<>());
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
String partitionPath = null;
while (newRecordsItr.hasNext()) {
HoodieRecord<T> record = newRecordsItr.next();
partitionPath = record.getPartitionPath();
// update the new location of the record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
record.seal();
// NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist
keyToNewRecords.put(record.getRecordKey(), record);
keyToNewRecords.put(record.getRecordKey(), record.getData());
}
LOG.info("Number of entries in MemoryBasedMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
+ "Total size in bytes of MemoryBasedMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => "
+ " Total size in bytes of MemoryBasedMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + " Number of entries in DiskBasedMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + " Size of file spilled to disk => "
+ ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
return partitionPath;
}
Expand Down Expand Up @@ -268,7 +267,11 @@ public void write(GenericRecord oldRecord) {
if (keyToNewRecords.containsKey(key)) {
// If we have duplicate records that we are updating, then the hoodie record will be deflated after
// writing the first record. So make a copy of the record to be merged
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(new HoodieKey(key, partitionPath), keyToNewRecords.get(key));
hoodieRecord.unseal();
hoodieRecord.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
hoodieRecord.seal();

try {
Option<IndexedRecord> combinedAvroRecord =
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchema : originalSchema);
Expand Down Expand Up @@ -311,10 +314,11 @@ public void write(GenericRecord oldRecord) {
public WriteStatus close() {
try {
// write out any pending records (this can happen when inserts are turned into updates)
Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();
/**
Iterator<Map.Entry<String, T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.entrySet().iterator();
while (newRecordsItr.hasNext()) {
HoodieRecord<T> hoodieRecord = newRecordsItr.next();
T payload = newRecordsItr.next();
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
Expand All @@ -324,6 +328,7 @@ public WriteStatus close() {
insertRecordsWritten++;
}
}
*/
keyToNewRecords.clear();
writtenRecordKeys.clear();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.Objects;

/**
* Base class for all AVRO record based payloads, that can be ordered based on a field.
Expand All @@ -39,7 +40,7 @@ public abstract class BaseAvroPayload implements Serializable {
/**
* For purposes of preCombining.
*/
protected final Comparable orderingVal;
public final Comparable orderingVal;

/**
* Instantiate {@link BaseAvroPayload}.
Expand All @@ -58,4 +59,28 @@ public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
throw new HoodieException("Ordering value is null for record: " + record);
}
}

public BaseAvroPayload(byte[] recordBytes, Comparable orderingVal) {
this.recordBytes = recordBytes;
this.orderingVal = orderingVal;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

BaseAvroPayload that = (BaseAvroPayload) o;
return Objects.deepEquals(this.recordBytes, that.recordBytes)
&& Objects.equals(this.orderingVal, that.orderingVal);
}

@Override
public int hashCode() {
return Objects.hash(this.recordBytes, this.orderingVal);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
*/
private boolean sealed;

public HoodieRecord() {

}

public HoodieRecord(HoodieKey key, T data) {
this.key = key;
this.data = data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public HoodieRecordLocation(String instantTime, String fileId) {
this.fileId = fileId;
}

public HoodieRecordLocation() {
this(null, null);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -54,11 +58,10 @@ public int hashCode() {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("HoodieRecordLocation {");
sb.append("instantTime=").append(instantTime).append(", ");
sb.append("fileId=").append(fileId);
sb.append('}');
return sb.toString();
return "HoodieRecordLocation {"
+ "instantTime=" + instantTime + ", "
+ "fileId=" + fileId
+ '}';
}

public String getInstantTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@
* 1. preCombine - Picks the latest delta record for a key, based on an ordering field 2.
* combineAndGetUpdateValue/getInsertValue - Simply overwrites storage with latest delta record
*/
public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {

/**
*
*/
public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}

public OverwriteWithLatestAvroPayload(byte[] recordBytes, Comparable orderingVal) {
super(recordBytes, orderingVal);
}

public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.table.log;

import com.google.common.collect.Iterators;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand All @@ -26,6 +27,7 @@
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -94,7 +96,7 @@ public HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String>

@Override
public Iterator<HoodieRecord<? extends HoodieRecordPayload>> iterator() {
return records.iterator();
return Iterators.transform(records.iterator(), Pair::getRight);
}

public Map<String, HoodieRecord<? extends HoodieRecordPayload>> getRecords() {
Expand Down
Loading