Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
29a53c6
Implement RockDbBasedMap as an alternate to DiskBasedMap in SpillableMap
Jun 19, 2021
e41f13f
[MINOR] Put Azure cache tasks first (#3118)
xushiyan Jun 20, 2021
429e9fb
[HUDI-1248] Increase timeout for deltaStreamerTestRunner in TestHoodi…
codope Jun 21, 2021
adf1679
[HUDI-2049] StreamWriteFunction should wait for the next inflight ins…
danny0405 Jun 21, 2021
f8d9242
[HUDI-2050] Support rollback inflight compaction instances for batch …
swuferhong Jun 21, 2021
4fd8a88
[HUDI-1776] Support AlterCommand For Hoodie (#3086)
Jun 21, 2021
93dad05
Fix iterator for rocks db
Jun 21, 2021
cb5cd35
[HUDI-2043] HoodieDefaultTimeline$filterPendingCompactionTImeline() m…
swuferhong Jun 22, 2021
7bd517a
[HUDI-2031] JVM occasionally crashes during compaction when spark spe…
marin-ma Jun 22, 2021
5db37c2
[HUDI-2047] Ignore FileNotFoundException in WriteProfiles #getWritePa…
yuzhaojing Jun 22, 2021
69c0d9e
[HUDI-1883] Support Truncate Table For Hoodie (#3098)
Jun 22, 2021
062d5ba
[HUDI-2013] Removed option to fallback to file listing when Metadata …
prashantwason Jun 22, 2021
11e64b2
[HUDI-1717] Metadata Reader should merge all the un-synced but comple…
prashantwason Jun 22, 2021
3fb59dd
[HUDI-1988] FinalizeWrite() been executed twice in AbstractHoodieWrit…
swuferhong Jun 23, 2021
2687eab
[HUDI-2054] Remove the duplicate name for flink write pipeline (#3135)
danny0405 Jun 23, 2021
43b9c1f
[HUDI-1826] Add ORC support in HoodieSnapshotExporter (#3130)
vaibhav-sinha Jun 23, 2021
380518e
[HUDI-2038] Support rollback inflight compaction instances for Compac…
yuzhaojing Jun 23, 2021
dd248a3
Fix checkstyle issues
Jun 23, 2021
e039e0f
[HUDI-2064] Fix TestHoodieBackedMetadata#testOnlyValidPartitionsAdded…
leesf Jun 23, 2021
7e50f9a
[HUDI-2061] Incorrect Schema Inference For Schema Evolved Table (#3137)
Jun 24, 2021
84dd3ca
[HUDI-2053] Insert Static Partition With DateType Return Incorrect P…
Jun 24, 2021
b328555
[HUDI-2069] Fix KafkaAvroSchemaDeserializer to not rely on reflection…
sbernauer Jun 24, 2021
218f2a6
[HUDI-2062] Catch FileNotFoundException in WriteProfiles #getCommitMe…
yuzhaojing Jun 25, 2021
e64fe55
[HUDI-2068] Skip the assign state for SmallFileAssign when the state …
danny0405 Jun 25, 2021
0fb8556
Add ability to provide multi-region (global) data consistency across …
s-sanjay Jun 25, 2021
23dbc09
[MINOR] Removing un-used files and references (#3150)
n3nash Jun 25, 2021
ed1a5da
[HUDI-2060] Added tests for KafkaOffsetGen (#3136)
veenaypatil Jun 25, 2021
f73bedd
[MINOR] Remove unused methods (#3152)
wangxianghu Jun 26, 2021
e99a6b0
[HUDI-2073] Fix the bug of hoodieClusteringJob never quit (#3157)
zhangyue19921010 Jun 27, 2021
d24341d
[HUDI-2074] Use while loop instead of recursive call in MergeOnReadIn…
danny0405 Jun 28, 2021
9e61dad
[MINOR] Drop duplicate keygenerator class configuration setting (#3167)
wangxianghu Jun 28, 2021
34fc8a8
[HUDI-2067] Sync FlinkOptions config to FlinkStreamerConfig (#3151)
veenaypatil Jun 28, 2021
039aeb6
[HUDI-1910] Commit Offset to Kafka after successful Hudi commit (#3092)
veenaypatil Jun 28, 2021
37b7c65
[HUDI-2084] Resend the uncommitted write metadata when start up (#3168)
yuzhaojing Jun 29, 2021
0749cc8
[HUDI-2081] Move schema util tests out from TestHiveSyncTool (#3166)
xushiyan Jun 29, 2021
b8a8f57
[HUDI-2094] Supports hive style partitioning for flink writer (#3178)
danny0405 Jun 29, 2021
5a7d1b3
[HUDI-2097] Fix Flink unable to read commit metadata error (#3180)
swuferhong Jun 29, 2021
f665db0
[HUDI-2085] Support specify compaction paralleism and compaction targ…
swuferhong Jun 29, 2021
6d4b556
Address reviewer comments
Jun 29, 2021
72a3594
Address reviewer comments
Jun 30, 2021
202887b
[HUDI-2092] Fix NPE caused by FlinkStreamerConfig#writePartitionUrlEn…
wangxianghu Jun 30, 2021
5564c7e
[HUDI-2006] Adding more yaml templates to test suite (#3073)
nsivabalan Jun 30, 2021
1cbf43b
[HUDI-2103] Add rebalance before index bootstrap (#3185)
yuzhaojing Jun 30, 2021
94f0f40
[HUDI-1944] Support Hudi to read from committed offset (#3175)
veenaypatil Jun 30, 2021
07e93de
[HUDI-2052] Support load logFile in BootstrapFunction (#3134)
yuzhaojing Jun 30, 2021
91427f3
Implement RockDbBasedMap as an alternate to DiskBasedMap in SpillableMap
Jun 19, 2021
26ed037
Fix iterator for rocks db
Jun 21, 2021
6ca64ba
Fix checkstyle issues
Jun 23, 2021
4eb25d0
Address reviewer comments
Jun 29, 2021
e45577b
Address reviewer comments
Jun 30, 2021
231b679
Merge branch 'rm_rocks_db' of https://github.com/rmahindra123/hudi in…
Jun 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
Expand Down Expand Up @@ -153,6 +154,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String MERGE_ALLOW_DUPLICATE_ON_INSERTS = "hoodie.merge.allow.duplicate.on.inserts";
private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS = "false";

// Enable usage of RocksDb for External Spillable Map
public static final String DEFAULT_SPILLABLE_DISK_MAP_TYPE = ExternalSpillableMap.DiskMapType.ROCK_DB.name();
public static final String SPILLABLE_DISK_MAP_TYPE = "hoodie.spillable.diskmap.type";

public static final String CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = "hoodie.client.heartbeat.interval_in_ms";
public static final Integer DEFAULT_CLIENT_HEARTBEAT_INTERVAL_IN_MS = 60 * 1000;

Expand Down Expand Up @@ -410,6 +415,10 @@ public boolean allowDuplicateInserts() {
return Boolean.parseBoolean(props.getProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS));
}

public ExternalSpillableMap.DiskMapType getSpillableDiskMapType() {
return ExternalSpillableMap.DiskMapType.valueOf(props.getProperty(SPILLABLE_DISK_MAP_TYPE));
}

public EngineType getEngineType() {
return engineType;
}
Expand Down Expand Up @@ -1355,6 +1364,11 @@ public Builder withMergeAllowDuplicateOnInserts(boolean routeInsertsToNewFiles)
return this;
}

public Builder withSpillableDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
props.setProperty(SPILLABLE_DISK_MAP_TYPE, diskMapType.value());
return this;
}

public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) {
props.setProperty(CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP, String.valueOf(heartbeatIntervalInMs));
return this;
Expand Down Expand Up @@ -1433,6 +1447,8 @@ protected void setDefaults() {
MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED);
setDefaultOnCondition(props, !props.containsKey(MERGE_ALLOW_DUPLICATE_ON_INSERTS),
MERGE_ALLOW_DUPLICATE_ON_INSERTS, DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS);
setDefaultOnCondition(props, !props.containsKey(SPILLABLE_DISK_MAP_TYPE),
SPILLABLE_DISK_MAP_TYPE, DEFAULT_SPILLABLE_DISK_MAP_TYPE);
setDefaultOnCondition(props, !props.containsKey(CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP),
CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP, String.valueOf(DEFAULT_CLIENT_HEARTBEAT_INTERVAL_IN_MS));
setDefaultOnCondition(props, !props.containsKey(CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES_PROP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ protected void initializeIncomingRecordsMap() {
long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema));
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema),
config.getSpillableDiskMapType());
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
* without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata.
* 2) Current position in the file NOTE : Only String.class type supported for Key
*/
public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {
public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements SpillableDiskMap<T, R> {

public static final int BUFFER_SIZE = 128 * 1024; // 128 KB
private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class);
Expand Down Expand Up @@ -151,6 +151,7 @@ public Iterator<R> iterator() {
/**
* Number of bytes spilled to disk.
*/
@Override
public long sizeOfFileOnDiskInBytes() {
return filePosition.get();
}
Expand Down Expand Up @@ -287,6 +288,7 @@ public Collection<R> values() {
throw new HoodieException("Unsupported Operation Exception");
}

@Override
public Stream<R> valueStream() {
final BufferedRandomAccessFile file = getRandomAccessFile();
return valueMetadataMap.values().stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData, file));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.log4j.LogManager;
Expand All @@ -33,6 +34,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
Expand Down Expand Up @@ -61,15 +63,17 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
private final long maxInMemorySizeInBytes;
// Map to store key-values in memory until it hits maxInMemorySizeInBytes
private final Map<T, R> inMemoryMap;
// Map to store key-valuemetadata important to find the values spilled to disk
private transient volatile DiskBasedMap<T, R> diskBasedMap;
// Map to store key-values on disk or db after it spilled over the memory
private transient volatile SpillableDiskMap<T, R> diskBasedMap;
// TODO(na) : a dynamic sizing factor to ensure we have space for other objects in memory and
// incorrect payload estimation
private final Double sizingFactorForInMemoryMap = 0.8;
// Size Estimator for key type
private final SizeEstimator<T> keySizeEstimator;
// Size Estimator for key types
private final SizeEstimator<R> valueSizeEstimator;
// Type of the disk map
private final DiskMapType diskMapType;
// current space occupied by this map in-memory
private Long currentInMemoryMapSize;
// An estimate of the size of each payload written to this map
Expand All @@ -80,22 +84,35 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
private final String baseFilePath;

public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator<T> keySizeEstimator,
SizeEstimator<R> valueSizeEstimator) throws IOException {
SizeEstimator<R> valueSizeEstimator) throws IOException {
this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator,
valueSizeEstimator, DiskMapType.DISK_MAP);
}

public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator<T> keySizeEstimator,
SizeEstimator<R> valueSizeEstimator, DiskMapType diskMapType) throws IOException {
this.inMemoryMap = new HashMap<>();
this.baseFilePath = baseFilePath;
this.diskBasedMap = new DiskBasedMap<>(baseFilePath);
this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap);
this.currentInMemoryMapSize = 0L;
this.keySizeEstimator = keySizeEstimator;
this.valueSizeEstimator = valueSizeEstimator;
this.diskMapType = diskMapType;
}

private DiskBasedMap<T, R> getDiskBasedMap() {
private SpillableDiskMap<T, R> getDiskBasedMap() {
if (null == diskBasedMap) {
synchronized (this) {
if (null == diskBasedMap) {
try {
diskBasedMap = new DiskBasedMap<>(baseFilePath);
switch (diskMapType) {
case ROCK_DB:
diskBasedMap = new SpillableRocksDBBasedMap<>(baseFilePath);
break;
case DISK_MAP:
default:
diskBasedMap = new DiskBasedMap<>(baseFilePath);
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
Expand Down Expand Up @@ -160,6 +177,14 @@ public boolean containsValue(Object value) {
return inMemoryMap.containsValue(value) || getDiskBasedMap().containsValue(value);
}

public boolean inMemoryContainsKey(Object key) {
return inMemoryMap.containsKey(key);
}

public boolean inDiskContainsKey(Object key) {
return getDiskBasedMap().containsKey(key);
}

@Override
public R get(Object key) {
if (inMemoryMap.containsKey(key)) {
Expand Down Expand Up @@ -259,6 +284,40 @@ public Set<Entry<T, R>> entrySet() {
return entrySet;
}

public enum DiskMapType {
DISK_MAP("disk_map"),
ROCK_DB("rock_db"),
UNKNOWN("unknown");

private final String value;

DiskMapType(String value) {
this.value = value;
}

/**
* Getter for spillable disk map type.
* @return
*/
public String value() {
return value;
}

/**
* Convert string value to {@link DiskMapType}.
*/
public static DiskMapType fromValue(String value) {
switch (value.toLowerCase(Locale.ROOT)) {
case "disk_map":
return DISK_MAP;
case "rock_db":
return ROCK_DB;
default:
throw new HoodieException("Invalid value of Type.");
}
}
}

/**
* Iterator that wraps iterating over all the values for this map 1) inMemoryIterator - Iterates over all the data
* in-memory map 2) diskLazyFileIterator - Iterates over all the data spilled to disk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -64,11 +66,13 @@ public class RocksDBDAO {
private transient RocksDB rocksDB;
private boolean closed = false;
private final String rocksDBBasePath;
private long totalBytesWritten;

public RocksDBDAO(String basePath, String rocksDBBasePath) {
this.rocksDBBasePath =
String.format("%s/%s/%s", rocksDBBasePath, basePath.replace("/", "_"), UUID.randomUUID().toString());
init();
totalBytesWritten = 0L;
}

/**
Expand Down Expand Up @@ -169,7 +173,7 @@ public void writeBatch(BatchHandler handler) {
*/
public <T extends Serializable> void putInBatch(WriteBatch batch, String columnFamilyName, String key, T value) {
try {
byte[] payload = SerializationUtils.serialize(value);
byte[] payload = serializePayload(value);
batch.put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload);
} catch (Exception e) {
throw new HoodieException(e);
Expand All @@ -189,7 +193,7 @@ public <K extends Serializable, T extends Serializable> void putInBatch(WriteBat
K key, T value) {
try {
byte[] keyBytes = SerializationUtils.serialize(key);
byte[] payload = SerializationUtils.serialize(value);
byte[] payload = serializePayload(value);
batch.put(managedHandlesMap.get(columnFamilyName), keyBytes, payload);
} catch (Exception e) {
throw new HoodieException(e);
Expand All @@ -206,7 +210,7 @@ public <K extends Serializable, T extends Serializable> void putInBatch(WriteBat
*/
public <T extends Serializable> void put(String columnFamilyName, String key, T value) {
try {
byte[] payload = SerializationUtils.serialize(value);
byte[] payload = serializePayload(value);
getRocksDB().put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload);
} catch (Exception e) {
throw new HoodieException(e);
Expand All @@ -223,7 +227,7 @@ public <T extends Serializable> void put(String columnFamilyName, String key, T
*/
public <K extends Serializable, T extends Serializable> void put(String columnFamilyName, K key, T value) {
try {
byte[] payload = SerializationUtils.serialize(value);
byte[] payload = serializePayload(value);
getRocksDB().put(managedHandlesMap.get(columnFamilyName), SerializationUtils.serialize(key), payload);
} catch (Exception e) {
throw new HoodieException(e);
Expand Down Expand Up @@ -351,6 +355,10 @@ public <T extends Serializable> Stream<Pair<String, T>> prefixSearch(String colu
return results.stream();
}

public <T extends Serializable> Iterator<T> iterator(String columnFamilyName) {
return new IteratorWrapper<>(getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName)));
}

/**
* Perform a prefix delete and return stream of key-value pairs retrieved.
*
Expand Down Expand Up @@ -448,10 +456,53 @@ public synchronized void close() {
}
}

public long getTotalBytesWritten() {
return totalBytesWritten;
}

private <T extends Serializable> byte[] serializePayload(T value) throws IOException {
byte[] payload = SerializationUtils.serialize(value);
totalBytesWritten += payload.length;
return payload;
}

String getRocksDBBasePath() {
return rocksDBBasePath;
}

/**
* {@link Iterator} wrapper for RocksDb Iterator {@link RocksIterator}.
*/
private static class IteratorWrapper<R> implements Iterator<R> {

private final RocksIterator iterator;

public IteratorWrapper(final RocksIterator iterator) {
this.iterator = iterator;
iterator.seekToFirst();
}

@Override
public boolean hasNext() {
return iterator.isValid();
}

@Override
public R next() {
if (!hasNext()) {
throw new IllegalStateException("next() called on rocksDB with no more valid entries");
}
R val = SerializationUtils.deserialize(iterator.value());
iterator.next();
return val;
}

@Override
public void forEachRemaining(Consumer<? super R> action) {
action.accept(next());
}
}

/**
* Functional interface for stacking operation to Write batch.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.util.collection;

import java.io.Serializable;
import java.util.Map;
import java.util.stream.Stream;

/**
* The spillable map that provides the map interface for storing records in disk/ db after they
* spill over from memory. Used by {@link ExternalSpillableMap}.
* @param <T>
* @param <R>
*/
public interface SpillableDiskMap<T extends Serializable, R extends Serializable> extends Map<T, R>, Iterable<R> {
Stream<R> valueStream();

long sizeOfFileOnDiskInBytes();

void close();
}
Loading