Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
Expand All @@ -52,6 +53,7 @@
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
Expand Down Expand Up @@ -283,6 +285,11 @@ public class HoodieWriteConfig extends HoodieConfig {
.defaultValue("false")
.withDocumentation("Allow duplicates with inserts while merging with existing records");

public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.spillable.diskmap.type")
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
.withDocumentation("Enable usage of either BITCASK or ROCKS_DB as disk map for External Spillable Map");

public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = ConfigProperty
.key("hoodie.client.heartbeat.interval_in_ms")
.defaultValue(60 * 1000)
Expand Down Expand Up @@ -554,6 +561,10 @@ public boolean allowDuplicateInserts() {
return getBoolean(MERGE_ALLOW_DUPLICATE_ON_INSERTS);
}

public ExternalSpillableMap.DiskMapType getSpillableDiskMapType() {
return ExternalSpillableMap.DiskMapType.valueOf(getString(SPILLABLE_DISK_MAP_TYPE).toUpperCase(Locale.ROOT));
}

public EngineType getEngineType() {
return engineType;
}
Expand Down Expand Up @@ -1504,6 +1515,11 @@ public Builder withMergeAllowDuplicateOnInserts(boolean routeInsertsToNewFiles)
return this;
}

public Builder withSpillableDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
writeConfig.setValue(SPILLABLE_DISK_MAP_TYPE, diskMapType.name());
return this;
}

public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) {
writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP, String.valueOf(heartbeatIntervalInMs));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ protected void initializeIncomingRecordsMap() {
long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config);
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema));
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema),
config.getSpillableDiskMapType());
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
Expand Down Expand Up @@ -231,7 +232,7 @@ protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
LOG.info("Number of entries in MemoryBasedMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
+ "Total size in bytes of MemoryBasedMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in BitCaskDiskMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => "
+ ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected void performScan() {
LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
LOG.info(
"Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
LOG.info("Number of entries in BitCaskDiskMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.DiskBasedMap.FileEntry;
import org.apache.hudi.common.util.collection.BitCaskDiskMap.FileEntry;
import org.apache.hudi.exception.HoodieCorruptedDataException;

import org.apache.avro.generic.GenericRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@
* This class provides a disk spillable only map implementation. All of the data is currenly written to one file,
* without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata.
* 2) Current position in the file NOTE : Only String.class type supported for Key
*
* Inspired by https://github.com/basho/bitcask
*/
public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {
public final class BitCaskDiskMap<T extends Serializable, R extends Serializable> implements DiskMap<T, R> {

public static final int BUFFER_SIZE = 128 * 1024; // 128 KB
private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class);
private static final Logger LOG = LogManager.getLogger(BitCaskDiskMap.class);
// Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, ValueMetadata> valueMetadataMap;
// Write only file
Expand All @@ -76,7 +78,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>

private transient Thread shutdownThread = null;

public DiskBasedMap(String baseFilePath) throws IOException {
public BitCaskDiskMap(String baseFilePath) throws IOException {
this.valueMetadataMap = new ConcurrentHashMap<>();
this.writeOnlyFile = new File(baseFilePath, UUID.randomUUID().toString());
this.filePath = writeOnlyFile.getPath();
Expand Down Expand Up @@ -136,7 +138,7 @@ private void flushToDisk() {
try {
writeOnlyFileHandle.flush();
} catch (IOException e) {
throw new HoodieIOException("Failed to flush to DiskBasedMap file", e);
throw new HoodieIOException("Failed to flush to BitCaskDiskMap file", e);
}
}

Expand All @@ -151,6 +153,7 @@ public Iterator<R> iterator() {
/**
* Number of bytes spilled to disk.
*/
@Override
public long sizeOfFileOnDiskInBytes() {
return filePosition.get();
}
Expand Down Expand Up @@ -203,7 +206,7 @@ private synchronized R put(T key, R value, boolean flush) {
Integer valueSize = val.length;
Long timestamp = System.currentTimeMillis();
this.valueMetadataMap.put(key,
new DiskBasedMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp));
new BitCaskDiskMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp));
byte[] serializedKey = SerializationUtils.serialize(key);
filePosition
.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(SpillableMapUtils.generateChecksum(val),
Expand Down Expand Up @@ -287,6 +290,7 @@ public Collection<R> values() {
throw new HoodieException("Unsupported Operation Exception");
}

@Override
public Stream<R> valueStream() {
final BufferedRandomAccessFile file = getRandomAccessFile();
return valueMetadataMap.values().stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData, file));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.util.collection;

import java.io.Serializable;
import java.util.Map;
import java.util.stream.Stream;

/**
* This interface provides the map interface for storing records in disk after they
* spill over from memory. Used by {@link ExternalSpillableMap}.
*
* @param <T> The generic type of the keys
* @param <R> The generic type of the values
*/
public interface DiskMap<T extends Serializable, R extends Serializable> extends Map<T, R>, Iterable<R> {

/**
* @returns a stream of the values stored in the disk.
*/
Stream<R> valueStream();

/**
* Number of bytes spilled to disk.
*/
long sizeOfFileOnDiskInBytes();

/**
* Cleanup.
*/
void close();

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,17 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
private final long maxInMemorySizeInBytes;
// Map to store key-values in memory until it hits maxInMemorySizeInBytes
private final Map<T, R> inMemoryMap;
// Map to store key-valuemetadata important to find the values spilled to disk
private transient volatile DiskBasedMap<T, R> diskBasedMap;
// Map to store key-values on disk or db after it spilled over the memory
private transient volatile DiskMap<T, R> diskBasedMap;
// TODO(na) : a dynamic sizing factor to ensure we have space for other objects in memory and
// incorrect payload estimation
private final Double sizingFactorForInMemoryMap = 0.8;
// Size Estimator for key type
private final SizeEstimator<T> keySizeEstimator;
// Size Estimator for key types
private final SizeEstimator<R> valueSizeEstimator;
// Type of the disk map
private final DiskMapType diskMapType;
// current space occupied by this map in-memory
private Long currentInMemoryMapSize;
// An estimate of the size of each payload written to this map
Expand All @@ -80,22 +82,34 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
private final String baseFilePath;

public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator<T> keySizeEstimator,
SizeEstimator<R> valueSizeEstimator) throws IOException {
SizeEstimator<R> valueSizeEstimator) throws IOException {
this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator, valueSizeEstimator, DiskMapType.BITCASK);
}

public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator<T> keySizeEstimator,
SizeEstimator<R> valueSizeEstimator, DiskMapType diskMapType) throws IOException {
this.inMemoryMap = new HashMap<>();
this.baseFilePath = baseFilePath;
this.diskBasedMap = new DiskBasedMap<>(baseFilePath);
this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap);
this.currentInMemoryMapSize = 0L;
this.keySizeEstimator = keySizeEstimator;
this.valueSizeEstimator = valueSizeEstimator;
this.diskMapType = diskMapType;
}

private DiskBasedMap<T, R> getDiskBasedMap() {
private DiskMap<T, R> getDiskBasedMap() {
if (null == diskBasedMap) {
synchronized (this) {
if (null == diskBasedMap) {
try {
diskBasedMap = new DiskBasedMap<>(baseFilePath);
switch (diskMapType) {
case ROCKS_DB:
diskBasedMap = new RocksDbDiskMap<>(baseFilePath);
break;
case BITCASK:
default:
diskBasedMap = new BitCaskDiskMap<>(baseFilePath);
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
Expand All @@ -113,7 +127,7 @@ public Iterator<R> iterator() {
}

/**
* Number of entries in DiskBasedMap.
* Number of entries in BitCaskDiskMap.
*/
public int getDiskBasedMapNumEntries() {
return getDiskBasedMap().size();
Expand Down Expand Up @@ -160,6 +174,14 @@ public boolean containsValue(Object value) {
return inMemoryMap.containsValue(value) || getDiskBasedMap().containsValue(value);
}

public boolean inMemoryContainsKey(Object key) {
return inMemoryMap.containsKey(key);
}

public boolean inDiskContainsKey(Object key) {
return getDiskBasedMap().containsKey(key);
}

@Override
public R get(Object key) {
if (inMemoryMap.containsKey(key)) {
Expand Down Expand Up @@ -259,14 +281,24 @@ public Set<Entry<T, R>> entrySet() {
return entrySet;
}

/**
* The type of map to use for storing the Key, values on disk after it spills
* from memory in the {@link ExternalSpillableMap}.
*/
public enum DiskMapType {
BITCASK,
ROCKS_DB,
UNKNOWN
}

/**
* Iterator that wraps iterating over all the values for this map 1) inMemoryIterator - Iterates over all the data
* in-memory map 2) diskLazyFileIterator - Iterates over all the data spilled to disk.
*/
private class IteratorWrapper<R> implements Iterator<R> {

private Iterator<R> inMemoryIterator;
private Iterator<R> diskLazyFileIterator;
private final Iterator<R> inMemoryIterator;
private final Iterator<R> diskLazyFileIterator;

public IteratorWrapper(Iterator<R> inMemoryIterator, Iterator<R> diskLazyFileIterator) {
this.inMemoryIterator = inMemoryIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
// Used to access the value written at a specific position in the file
private final String filePath;
// Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, DiskBasedMap.ValueMetadata> inMemoryMetadataOfSpilledData;
private final Map<T, BitCaskDiskMap.ValueMetadata> inMemoryMetadataOfSpilledData;

private transient Thread shutdownThread = null;

public LazyFileIterable(String filePath, Map<T, DiskBasedMap.ValueMetadata> map) {
public LazyFileIterable(String filePath, Map<T, BitCaskDiskMap.ValueMetadata> map) {
this.filePath = filePath;
this.inMemoryMetadataOfSpilledData = map;
}
Expand All @@ -61,16 +61,16 @@ public class LazyFileIterator<T, R> implements Iterator<R> {

private final String filePath;
private BufferedRandomAccessFile readOnlyFileHandle;
private final Iterator<Map.Entry<T, DiskBasedMap.ValueMetadata>> metadataIterator;
private final Iterator<Map.Entry<T, BitCaskDiskMap.ValueMetadata>> metadataIterator;

public LazyFileIterator(String filePath, Map<T, DiskBasedMap.ValueMetadata> map) throws IOException {
public LazyFileIterator(String filePath, Map<T, BitCaskDiskMap.ValueMetadata> map) throws IOException {
this.filePath = filePath;
this.readOnlyFileHandle = new BufferedRandomAccessFile(filePath, "r", DiskBasedMap.BUFFER_SIZE);
this.readOnlyFileHandle = new BufferedRandomAccessFile(filePath, "r", BitCaskDiskMap.BUFFER_SIZE);
readOnlyFileHandle.seek(0);

// sort the map in increasing order of offset of value so disk seek is only in one(forward) direction
this.metadataIterator = map.entrySet().stream()
.sorted((Map.Entry<T, DiskBasedMap.ValueMetadata> o1, Map.Entry<T, DiskBasedMap.ValueMetadata> o2) -> o1
.sorted((Map.Entry<T, BitCaskDiskMap.ValueMetadata> o1, Map.Entry<T, BitCaskDiskMap.ValueMetadata> o2) -> o1
.getValue().getOffsetOfValue().compareTo(o2.getValue().getOffsetOfValue()))
.collect(Collectors.toList()).iterator();
this.addShutdownHook();
Expand All @@ -90,8 +90,8 @@ public R next() {
if (!hasNext()) {
throw new IllegalStateException("next() called on EOF'ed stream. File :" + filePath);
}
Map.Entry<T, DiskBasedMap.ValueMetadata> entry = this.metadataIterator.next();
return DiskBasedMap.get(entry.getValue(), readOnlyFileHandle);
Map.Entry<T, BitCaskDiskMap.ValueMetadata> entry = this.metadataIterator.next();
return BitCaskDiskMap.get(entry.getValue(), readOnlyFileHandle);
}

@Override
Expand Down
Loading