From 24550fc6107c5d9f8f80e538c3638d87b3bd2368 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Fri, 21 Feb 2020 00:12:41 -0800 Subject: [PATCH] [WIP] [HUDI-625] Fixing performance issues around DiskBasedMap & kryo - This is very rough cut of few things I tried; Just for sharing purposes - Kryo needs serializers and once we add them, the ser/deser is fast and writing finishes 10-20x faster - DiskbasedMap is tracking too many things redundantly and incurring its cost as well. - TODO : Need to break the kryo and map fix in differnt PRs - TODO : For map entry thinning, need to handle compaction, fix code structure, tests - TODO : For kyro, one more pass with good understanding of APIs, tests, null handling, cleanup --- .../org/apache/hudi/io/HoodieMergeHandle.java | 53 +++--- .../hudi/common/model/BaseAvroPayload.java | 27 +++- .../hudi/common/model/HoodieRecord.java | 4 + .../common/model/HoodieRecordLocation.java | 13 +- .../model/OverwriteWithLatestAvroPayload.java | 10 +- .../log/HoodieMergedLogRecordScanner.java | 4 +- .../hudi/common/util/SerializationUtils.java | 153 +++++++++++++++++- .../common/util/collection/DiskBasedMap.java | 7 +- .../util/collection/ExternalSpillableMap.java | 10 +- .../util/collection/LazyFileIterable.java | 16 +- .../util/collection/TestDiskBasedMap.java | 66 +++++++- .../collection/TestExternalSpillableMap.java | 7 +- 12 files changed, 307 insertions(+), 63 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index c3d726ccba2ce..e15b4f8c21239 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -20,6 +20,7 @@ import org.apache.hudi.WriteStatus; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -29,7 +30,6 @@ import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.HoodieAvroUtils; -import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieWriteConfig; @@ -58,7 +58,7 @@ public class HoodieMergeHandle extends HoodieWrit private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class); - private Map> keyToNewRecords; + private Map keyToNewRecords; private Set writtenRecordKeys; private HoodieStorageWriter storageWriter; private Path newFilePath; @@ -68,12 +68,15 @@ public class HoodieMergeHandle extends HoodieWrit private long updatedRecordsWritten = 0; private long insertRecordsWritten = 0; private boolean useWriterSchema; + private String partitionPath; + private String fileId; public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, Iterator> recordItr, String fileId) { super(config, commitTime, fileId, hoodieTable); - String partitionPath = init(fileId, recordItr); - init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get()); + this.partitionPath = initSpillableMap(recordItr); + this.fileId = fileId; + init(hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get()); } /** @@ -82,10 +85,11 @@ public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTabl public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, Map> keyToNewRecords, String fileId, HoodieBaseFile dataFileToBeMerged) { super(config, commitTime, fileId, hoodieTable); - this.keyToNewRecords = keyToNewRecords; + this.keyToNewRecords = null; // hack to get it to compile. Compaction is now broken this.useWriterSchema = true; - init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get()).getPartitionPath(), - dataFileToBeMerged); + this.fileId = fileId; + this.partitionPath = keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get()).getPartitionPath(); + init(dataFileToBeMerged); } public static Schema createHoodieWriteSchema(Schema originalSchema) { @@ -154,7 +158,7 @@ protected GenericRecord rewriteRecord(GenericRecord record) { /** * Extract old file path, initialize StorageWriter and WriteStatus. */ - private void init(String fileId, String partitionPath, HoodieBaseFile dataFileToBeMerged) { + private void init(HoodieBaseFile dataFileToBeMerged) { LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId); this.writtenRecordKeys = new HashSet<>(); writeStatus.setStat(new HoodieWriteStat()); @@ -197,13 +201,13 @@ private void init(String fileId, String partitionPath, HoodieBaseFile dataFileTo /** * Load the new incoming records in a map and return partitionPath. */ - private String init(String fileId, Iterator> newRecordsItr) { + private String initSpillableMap(Iterator> newRecordsItr) { try { // Load the new records in a map long memoryForMerge = config.getMaxMemoryPerPartitionMerge(); LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); - this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), - new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema)); + this.keyToNewRecords = new ExternalSpillableMap(memoryForMerge, config.getSpillableMapBasePath(), + new DefaultSizeEstimator(), new DefaultSizeEstimator<>()); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } @@ -211,18 +215,13 @@ private String init(String fileId, Iterator> newRecordsItr) { while (newRecordsItr.hasNext()) { HoodieRecord record = newRecordsItr.next(); partitionPath = record.getPartitionPath(); - // update the new location of the record, so we know where to find it next - record.unseal(); - record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); - record.seal(); - // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist - keyToNewRecords.put(record.getRecordKey(), record); + keyToNewRecords.put(record.getRecordKey(), record.getData()); } LOG.info("Number of entries in MemoryBasedMap => " + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries() - + "Total size in bytes of MemoryBasedMap => " - + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => " - + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => " + + " Total size in bytes of MemoryBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + " Number of entries in DiskBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + " Size of file spilled to disk => " + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); return partitionPath; } @@ -268,7 +267,11 @@ public void write(GenericRecord oldRecord) { if (keyToNewRecords.containsKey(key)) { // If we have duplicate records that we are updating, then the hoodie record will be deflated after // writing the first record. So make a copy of the record to be merged - HoodieRecord hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key)); + HoodieRecord hoodieRecord = new HoodieRecord<>(new HoodieKey(key, partitionPath), keyToNewRecords.get(key)); + hoodieRecord.unseal(); + hoodieRecord.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); + hoodieRecord.seal(); + try { Option combinedAvroRecord = hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchema : originalSchema); @@ -311,10 +314,11 @@ public void write(GenericRecord oldRecord) { public WriteStatus close() { try { // write out any pending records (this can happen when inserts are turned into updates) - Iterator> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap) - ? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator(); + /** + Iterator> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap) + ? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.entrySet().iterator(); while (newRecordsItr.hasNext()) { - HoodieRecord hoodieRecord = newRecordsItr.next(); + T payload = newRecordsItr.next(); if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) { if (useWriterSchema) { writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema)); @@ -324,6 +328,7 @@ public WriteStatus close() { insertRecordsWritten++; } } + */ keyToNewRecords.clear(); writtenRecordKeys.clear(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java index 8ed925bd4de69..febc9d6cd87a3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.Serializable; +import java.util.Objects; /** * Base class for all AVRO record based payloads, that can be ordered based on a field. @@ -39,7 +40,7 @@ public abstract class BaseAvroPayload implements Serializable { /** * For purposes of preCombining. */ - protected final Comparable orderingVal; + public final Comparable orderingVal; /** * Instantiate {@link BaseAvroPayload}. @@ -58,4 +59,28 @@ public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { throw new HoodieException("Ordering value is null for record: " + record); } } + + public BaseAvroPayload(byte[] recordBytes, Comparable orderingVal) { + this.recordBytes = recordBytes; + this.orderingVal = orderingVal; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + BaseAvroPayload that = (BaseAvroPayload) o; + return Objects.deepEquals(this.recordBytes, that.recordBytes) + && Objects.equals(this.orderingVal, that.orderingVal); + } + + @Override + public int hashCode() { + return Objects.hash(this.recordBytes, this.orderingVal); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index 3f1e95ad8d333..e7d4c0e477ffe 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -66,6 +66,10 @@ public class HoodieRecord implements Serializable */ private boolean sealed; + public HoodieRecord() { + + } + public HoodieRecord(HoodieKey key, T data) { this.key = key; this.data = data; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java index 2c522d1660340..eecfd4ab47bca 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java @@ -35,6 +35,10 @@ public HoodieRecordLocation(String instantTime, String fileId) { this.fileId = fileId; } + public HoodieRecordLocation() { + this(null, null); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -54,11 +58,10 @@ public int hashCode() { @Override public String toString() { - final StringBuilder sb = new StringBuilder("HoodieRecordLocation {"); - sb.append("instantTime=").append(instantTime).append(", "); - sb.append("fileId=").append(fileId); - sb.append('}'); - return sb.toString(); + return "HoodieRecordLocation {" + + "instantTime=" + instantTime + ", " + + "fileId=" + fileId + + '}'; } public String getInstantTime() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index 88811f53b5dd2..32e4d942f4e4d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -33,16 +33,16 @@ * 1. preCombine - Picks the latest delta record for a key, based on an ordering field 2. * combineAndGetUpdateValue/getInsertValue - Simply overwrites storage with latest delta record */ -public class OverwriteWithLatestAvroPayload extends BaseAvroPayload - implements HoodieRecordPayload { +public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements HoodieRecordPayload { - /** - * - */ public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); } + public OverwriteWithLatestAvroPayload(byte[] recordBytes, Comparable orderingVal) { + super(recordBytes, orderingVal); + } + public OverwriteWithLatestAvroPayload(Option record) { this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index e6246c48d8729..74e96d78d5b24 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.log; +import com.google.common.collect.Iterators; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -26,6 +27,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.Schema; @@ -94,7 +96,7 @@ public HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List @Override public Iterator> iterator() { - return records.iterator(); + return Iterators.transform(records.iterator(), Pair::getRight); } public Map> getRecords() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java index 9096080bb0f6e..639d2eed67fe4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java @@ -18,6 +18,20 @@ package org.apache.hudi.common.util; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.exception.HoodieSerializationException; import com.esotericsoftware.kryo.Kryo; @@ -33,6 +47,7 @@ import java.io.Serializable; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.nio.charset.StandardCharsets; /** * {@link SerializationUtils} class internally uses {@link Kryo} serializer for serializing / deserializing objects. @@ -43,9 +58,6 @@ public class SerializationUtils { private static final ThreadLocal SERIALIZER_REF = ThreadLocal.withInitial(KryoSerializerInstance::new); - // Serialize - // ----------------------------------------------------------------------- - /** *

* Serializes an {@code Object} to a byte array for storage/serialization. @@ -86,6 +98,129 @@ public static T deserialize(final byte[] objectData) { return (T) SERIALIZER_REF.get().deserialize(objectData); } + public static class HoodieKeySerializer extends Serializer implements Serializable { + private static final long serialVersionUID = 1L; + + @Override + public void write(Kryo kryo, Output output, HoodieKey hoodieKey) { + output.writeString(hoodieKey.getRecordKey()); + output.writeString(hoodieKey.getPartitionPath()); + } + + @Override + public HoodieKey read(Kryo kryo, Input input, Class type) { + return new HoodieKey(input.readString(), input.readString()); + } + } + + public static class OverwriteWithLatestPayloadSerializer extends Serializer implements Serializable { + private static final long serialVersionUID = 1L; + + @Override + public void write(Kryo kryo, Output output, OverwriteWithLatestAvroPayload payload) { + output.writeInt(payload.recordBytes.length); + output.writeBytes(payload.recordBytes); + kryo.writeClassAndObject(output, payload.orderingVal); + } + + @Override + public OverwriteWithLatestAvroPayload read(Kryo kryo, Input input, Class type) { + int size = input.readInt(); + byte[] recordBytes = new byte[size]; + input.read(recordBytes); + Comparable orderingVal = (Comparable) kryo.readClassAndObject(input); + return new OverwriteWithLatestAvroPayload(recordBytes, orderingVal); + } + } + + public static class HoodieRecordLocationSerializer extends Serializer implements Serializable { + private static final long serialVersionUID = 1L; + + @Override + public void write(Kryo kryo, Output output, HoodieRecordLocation location) { + output.writeString(location.getInstantTime()); + output.writeString(location.getFileId()); + } + + @Override + public HoodieRecordLocation read(Kryo kryo, Input input, Class type) { + return new HoodieRecordLocation(input.readString(), input.readString()); + } + } + + public static class HoodieRecordSerializer extends Serializer implements Serializable { + private static final long serialVersionUID = 1L; + + @Override + public void write(Kryo kryo, Output output, HoodieRecord record) { + kryo.writeObject(output, record.getKey()); + kryo.writeClassAndObject(output, record.getData()); + kryo.writeObject(output, record.getNewLocation().get()); + kryo.writeObject(output, record.getCurrentLocation()); + } + + @Override + public HoodieRecord read(Kryo kryo, Input input, Class type) { + HoodieKey key = kryo.readObject(input, HoodieKey.class); + HoodieRecordPayload payload = (HoodieRecordPayload) kryo.readClassAndObject(input); + HoodieRecordLocation newLocation = kryo.readObject(input, HoodieRecordLocation.class); + HoodieRecordLocation currentLocation = kryo.readObject(input, HoodieRecordLocation.class); + + HoodieRecord record = new HoodieRecord(key, payload); + record.setNewLocation(newLocation); + record.setCurrentLocation(currentLocation); + return record; + } + } + + public static class GenericDataRecordSerializer extends Serializer implements Serializable { + private static final long serialVersionUID = 1L; + + private void serializeDatum(Output output, GenericRecord object) throws IOException { + + BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(output, null); + Schema schema = object.getSchema(); + + byte[] bytes = schema.toString().getBytes(StandardCharsets.UTF_8); + output.writeInt(bytes.length); + output.write(bytes); + + DatumWriter datumWriter = GenericData.get().createDatumWriter(schema); + datumWriter.write(object, binaryEncoder); + + binaryEncoder.flush(); + + } + + private GenericRecord deserializeDatum(Input input) throws IOException { + int length = input.readInt(); + Schema schema = new Schema.Parser().parse(new String(input.readBytes(length))); + + BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(input, null); + DatumReader datumReader = GenericData.get().createDatumReader(schema); + return datumReader.read(null, binaryDecoder); + + } + + @Override + public void write(Kryo kryo, Output output, GenericRecord object) { + try { + serializeDatum(output, object); + } catch (IOException e) { + throw new RuntimeException(); + } + } + + @Override + public GenericRecord read(Kryo kryo, Input input, Class type) { + try { + return deserializeDatum(input); + } catch (IOException e) { + throw new RuntimeException(); + } + } + } + private static class KryoSerializerInstance implements Serializable { public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576; private final Kryo kryo; @@ -94,7 +229,15 @@ private static class KryoSerializerInstance implements Serializable { KryoSerializerInstance() { KryoInstantiator kryoInstantiator = new KryoInstantiator(); - kryo = kryoInstantiator.newKryo(); + // TODO: this kryo instantiation is much slower than a simple way below + //kryo = kryoInstantiator.newKryo(); + kryo = new Kryo(); + kryo.register(HoodieKey.class, new HoodieKeySerializer()); + kryo.register(GenericData.Record.class, new GenericDataRecordSerializer()); + kryo.register(HoodieRecord.class, new HoodieRecordSerializer()); + kryo.register(HoodieRecordLocationSerializer.class, new HoodieRecordLocationSerializer()); + kryo.register(OverwriteWithLatestAvroPayload.class, new OverwriteWithLatestPayloadSerializer()); + baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE); kryo.setRegistrationRequired(false); } @@ -123,7 +266,7 @@ public Kryo newKryo() { Kryo kryo = new KryoBase(); // ensure that kryo doesn't fail if classes are not registered with kryo. - kryo.setRegistrationRequired(false); + kryo.setRegistrationRequired(true); // This would be used for object initialization if nothing else works out. kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy()); // Handle cases where we may have an odd classloader setup like with libjars diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java index 07ad5f27a1193..dc93acbf32f31 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java @@ -53,7 +53,7 @@ * without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata. * 2) Current position in the file NOTE : Only String.class type supported for Key */ -public final class DiskBasedMap implements Map, Iterable { +public final class DiskBasedMap implements Map, Iterable> { public static int BUFFER_SIZE = 128 * 1024; // 128 KB private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class); @@ -167,8 +167,8 @@ private void flushToDisk() { * Custom iterator to iterate over values written to disk. */ @Override - public Iterator iterator() { - return new LazyFileIterable(filePath, valueMetadataMap).iterator(); + public Iterator> iterator() { + return new LazyFileIterable(filePath, valueMetadataMap).iterator(); } /** @@ -204,6 +204,7 @@ public R get(Object key) { if (entry == null) { return null; } + R d = get(entry); return get(entry); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index 32c41f7ada09d..f6dbee10aaaa1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.util.collection; +import com.google.common.collect.Iterators; import org.apache.hudi.common.util.ObjectSizeCalculator; import org.apache.hudi.common.util.SizeEstimator; import org.apache.hudi.exception.HoodieIOException; @@ -88,6 +89,7 @@ public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, Si this.currentInMemoryMapSize = 0L; this.keySizeEstimator = keySizeEstimator; this.valueSizeEstimator = valueSizeEstimator; + LOG.info("MaximumSizeInBytes:" + maxInMemorySizeInBytes + ", spilling to :" + baseFilePath); } private DiskBasedMap getDiskBasedMap() { @@ -108,8 +110,10 @@ private DiskBasedMap getDiskBasedMap() { /** * A custom iterator to wrap over iterating in-memory + disk spilled data. */ - public Iterator iterator() { - return new IteratorWrapper<>(inMemoryMap.values().iterator(), getDiskBasedMap().iterator()); + public Iterator> iterator() { + return new IteratorWrapper<>( + Iterators.transform(inMemoryMap.entrySet().iterator(), (entry) -> Pair.of(entry.getKey(), entry.getValue())), + getDiskBasedMap().iterator()); } /** @@ -177,7 +181,7 @@ public R put(T key, R value) { // At first, use the sizeEstimate of a record being inserted into the spillable map. // Note, the converter may over estimate the size of a record in the JVM this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value); - LOG.info("Estimated Payload size => " + estimatedPayloadSize); + LOG.info("Estimated Payload size => " + estimatedPayloadSize + " for " + key + "," + value); } else if (shouldEstimatePayloadSize && inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) { // Re-estimate the size of a record by calculating the size of the entire map containing // N entries and then dividing by the number of entries present (N). This helps to get a diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java index 927c992a85b0c..a595fe74be93f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java @@ -22,6 +22,7 @@ import org.apache.hudi.exception.HoodieException; import java.io.IOException; +import java.util.Comparator; import java.util.Iterator; import java.util.Map; import java.util.function.Consumer; @@ -31,7 +32,7 @@ * Iterable to lazily fetch values spilled to disk. This class uses BufferedRandomAccessFile to randomly access the position of * the latest value for a key spilled to disk and returns the result. */ -public class LazyFileIterable implements Iterable { +public class LazyFileIterable implements Iterable> { // Used to access the value written at a specific position in the file private final String filePath; @@ -44,7 +45,7 @@ public LazyFileIterable(String filePath, Map map) } @Override - public Iterator iterator() { + public Iterator> iterator() { try { return new LazyFileIterator<>(filePath, inMemoryMetadataOfSpilledData); } catch (IOException io) { @@ -55,7 +56,7 @@ public Iterator iterator() { /** * Iterator implementation for the iterable defined above. */ - public class LazyFileIterator implements Iterator { + public class LazyFileIterator implements Iterator> { private final String filePath; private BufferedRandomAccessFile readOnlyFileHandle; @@ -68,8 +69,7 @@ public LazyFileIterator(String filePath, Map map) // sort the map in increasing order of offset of value so disk seek is only in one(forward) direction this.metadataIterator = map.entrySet().stream() - .sorted((Map.Entry o1, Map.Entry o2) -> o1 - .getValue().getOffsetOfValue().compareTo(o2.getValue().getOffsetOfValue())) + .sorted(Comparator.comparing((Map.Entry o) -> o.getValue().getOffsetOfValue())) .collect(Collectors.toList()).iterator(); this.addShutdownHook(); } @@ -84,12 +84,12 @@ public boolean hasNext() { } @Override - public R next() { + public Pair next() { if (!hasNext()) { throw new IllegalStateException("next() called on EOF'ed stream. File :" + filePath); } Map.Entry entry = this.metadataIterator.next(); - return DiskBasedMap.get(entry.getValue(), readOnlyFileHandle); + return Pair.of(entry.getKey(), DiskBasedMap.get(entry.getValue(), readOnlyFileHandle)); } @Override @@ -98,7 +98,7 @@ public void remove() { } @Override - public void forEachRemaining(Consumer action) { + public void forEachRemaining(Consumer> action) { action.accept(next()); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java index 2cc726e131ea3..171feed47cab4 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java @@ -18,22 +18,26 @@ package org.apache.hudi.common.util.collection; +import org.apache.avro.generic.GenericData; import org.apache.hudi.common.HoodieCommonTestHarness; import org.apache.hudi.common.model.AvroBinaryTestPayload; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.common.util.HoodieAvroUtils; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SchemaTestUtil; +import org.apache.hudi.common.util.SerializationUtils; import org.apache.hudi.common.util.SpillableMapTestUtils; import org.apache.hudi.common.util.SpillableMapUtils; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -42,12 +46,14 @@ import java.io.UncheckedIOException; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.hudi.common.util.SchemaTestUtil.getSimpleSchema; import static org.junit.Assert.assertEquals; @@ -144,6 +150,57 @@ public void testSimpleUpsert() throws IOException, URISyntaxException { } } + @Test + public void testDiskBasedMapSerialization() throws Exception { + Schema longSchema = Schema.createRecord("hudi_schema","","", false, + Arrays.asList(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null))); + + List records = IntStream.range(0, 1000000) + .mapToObj(k -> { + GenericRecord gr = new GenericData.Record(longSchema); + gr.put("id", (long) k); + OverwriteWithLatestAvroPayload payload = new OverwriteWithLatestAvroPayload(gr, k); + HoodieRecord record = new HoodieRecord<>(new HoodieKey(k + "", "default"), payload); + record.unseal(); + record.setCurrentLocation(new HoodieRecordLocation("20200402101048", UUID.randomUUID().toString())); + record.setNewLocation(new HoodieRecordLocation("2019375493949", UUID.randomUUID().toString())); + return record; + }).collect(Collectors.toList()); + + HoodieKey key = new HoodieKey("key", "path"); + HoodieKey diskKey = SerializationUtils.deserialize(SerializationUtils.serialize(key)); + assertEquals(key, diskKey); + + HoodieRecordLocation location = new HoodieRecordLocation("383883", "f24"); + HoodieRecordLocation diskLocation = SerializationUtils.deserialize(SerializationUtils.serialize(location)); + assertEquals(location, diskLocation); + + GenericRecord gr = new GenericData.Record(longSchema); + gr.put("id", (long) 127373); + OverwriteWithLatestAvroPayload payload = new OverwriteWithLatestAvroPayload(gr, 373737); + OverwriteWithLatestAvroPayload diskPayload = SerializationUtils.deserialize(SerializationUtils.serialize(payload)); + assertEquals(payload, diskPayload); + + HoodieRecord record1 = new HoodieRecord(key, payload); + record1.setCurrentLocation(location); + record1.setNewLocation(location); + HoodieRecord diskRecord = SerializationUtils.deserialize(SerializationUtils.serialize(record1)); + assertEquals(record1, diskRecord); + + DiskBasedMap diskBasedMap = new DiskBasedMap<>("/tmp/diskmap"); + long writeStartMs = System.currentTimeMillis(); + for (HoodieRecord record : records) { + diskBasedMap.put(record.getRecordKey(), record); + } + System.err.println(">>> write took : " + (System.currentTimeMillis() - writeStartMs)); + + long readStartMs = System.currentTimeMillis(); + for (HoodieRecord record : records) { + assertEquals(record, diskBasedMap.get(record.getRecordKey())); + } + System.err.println(">>> read took : " + (System.currentTimeMillis() - readStartMs)); + } + @Test public void testSizeEstimator() throws IOException, URISyntaxException { Schema schema = SchemaTestUtil.getSimpleSchema(); @@ -198,7 +255,6 @@ public void testSizeEstimatorPerformance() throws IOException, URISyntaxExceptio long startTime = System.currentTimeMillis(); SpillableMapUtils.computePayloadSize(record, sizeEstimator); long timeTaken = System.currentTimeMillis() - startTime; - System.out.println("Time taken :" + timeTaken); assertTrue(timeTaken < 100); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java index 6e3de9e555e14..06630f3fad434 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.util.collection; +import com.google.common.collect.Iterators; import org.apache.hudi.common.HoodieCommonTestHarness; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieKey; @@ -74,7 +75,7 @@ public void simpleInsertTest() throws IOException, URISyntaxException { List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); assert (recordKeys.size() == 100); - Iterator> itr = records.iterator(); + Iterator> itr = Iterators.transform(records.iterator(), Pair::getRight); List oRecords = new ArrayList<>(); while (itr.hasNext()) { HoodieRecord rec = itr.next(); @@ -94,7 +95,7 @@ public void testSimpleUpsert() throws IOException, URISyntaxException { List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); assert (recordKeys.size() == 100); - Iterator> itr = records.iterator(); + Iterator> itr = Iterators.transform(records.iterator(), Pair::getRight); while (itr.hasNext()) { HoodieRecord rec = itr.next(); assert recordKeys.contains(rec.getRecordKey()); @@ -185,7 +186,7 @@ public void simpleTestWithException() throws IOException, URISyntaxException { List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); assert (recordKeys.size() == 100); - Iterator> itr = records.iterator(); + Iterator> itr = Iterators.transform(records.iterator(), Pair::getRight); while (itr.hasNext()) { throw new IOException("Testing failures..."); }