Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
527ade2
need more test and tuning
Apr 22, 2022
2b096e8
checked code style&& still need more java docs and more config and en…
Apr 22, 2022
19ce784
config and need more test
Apr 23, 2022
1db7936
need more consuming tuning
Apr 23, 2022
725f5c3
improved && add benchmark
Apr 23, 2022
71a9f13
adjust bulk_insert in benchmark
Apr 23, 2022
b5843f0
adjust bulk_insert in benchmark
Apr 23, 2022
4a1f01c
modify benchmark
Apr 24, 2022
4b94523
adjust
Apr 24, 2022
69904cc
tuning
Apr 24, 2022
204abe2
add executor bench mark
Apr 24, 2022
45979df
add recordsNumber to control
Apr 24, 2022
075cab1
code style
Apr 24, 2022
e87bfbc
modify benchmark
Apr 24, 2022
79dcebe
add bench mark result
Apr 24, 2022
e85c961
ready to review
Apr 24, 2022
b838e1f
merge from master
Apr 24, 2022
e00a109
code style
Apr 24, 2022
74068f6
code style
Apr 24, 2022
03b328e
code style
Apr 24, 2022
3778cf4
code style
Apr 24, 2022
8041a02
merge from master
Jun 6, 2022
a04be3c
merge from master
Jun 21, 2022
1b86864
code review
Jun 22, 2022
6114ee2
merge from master
Oct 5, 2022
67d8cd3
merge from master
Oct 5, 2022
b369ccb
address comments
Oct 6, 2022
92760db
address comments
Oct 6, 2022
3e44fb2
address comments
Oct 6, 2022
672fdc6
address comments
Oct 7, 2022
c0c2274
address comments
Oct 7, 2022
4587303
address comments
Oct 7, 2022
abd8b27
address comments
Oct 7, 2022
4ba91d4
address comments
Oct 7, 2022
70ca2c7
add mmore tests
Oct 8, 2022
447cb45
address comments
Oct 8, 2022
f0b7218
merge from master
Oct 13, 2022
eaf0ae0
address comments
Oct 14, 2022
270d811
address comments
Oct 14, 2022
0fc24d3
address comments
Oct 14, 2022
4797166
address comments
Oct 14, 2022
1fa9ac7
address comments
Oct 14, 2022
e230fbb
address comments
Oct 14, 2022
298f66d
address comments
Oct 17, 2022
8ded3e8
address comments
Oct 17, 2022
794e30b
address comments
Oct 17, 2022
25fcd5d
address comments
Oct 17, 2022
6e56bd1
address comments
Oct 17, 2022
d615f1f
address comments
Oct 17, 2022
c748320
address comments
Oct 17, 2022
3f3a41a
address comments
Oct 17, 2022
9fe6bda
merge from master
Oct 19, 2022
a81ffdf
merge from master
Oct 19, 2022
fd19907
address comments
Oct 21, 2022
e5c17f0
address comments
Oct 24, 2022
c13fc35
merge from master
Oct 25, 2022
d1970a3
merge from master
Oct 25, 2022
988e14a
address comments
Oct 31, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.config.metrics.HoodieMetricsCloudWatchConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
Expand Down Expand Up @@ -84,12 +85,14 @@
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY;
import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;

/**
Expand Down Expand Up @@ -132,6 +135,14 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` "
+ "extract a key out of incoming records.");

public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty
.key("hoodie.write.executor.type")
.defaultValue(BOUNDED_IN_MEMORY.name())
.withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue."
+ "default value is BOUNDED_IN_MEMORY which use a bounded in-memory queue using LinkedBlockingQueue."
+ "Also users could use DISRUPTOR, which use disruptor as a lock free message queue "
+ "to gain better writing performance if lock was the bottleneck. Although DISRUPTOR_EXECUTOR is still an experimental feature.");

public static final ConfigProperty<String> KEYGENERATOR_TYPE = ConfigProperty
.key("hoodie.datasource.write.keygenerator.type")
.defaultValue(KeyGeneratorType.SIMPLE.name())
Expand Down Expand Up @@ -233,6 +244,19 @@ public class HoodieWriteConfig extends HoodieConfig {
.defaultValue(String.valueOf(4 * 1024 * 1024))
.withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes.");

public static final ConfigProperty<String> WRITE_DISRUPTOR_BUFFER_SIZE = ConfigProperty
.key("hoodie.write.executor.disruptor.buffer.size")
.defaultValue(String.valueOf(1024))
.withDocumentation("The size of the Disruptor Executor ring buffer, must be power of 2");

public static final ConfigProperty<String> WRITE_WAIT_STRATEGY = ConfigProperty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

.key("hoodie.write.executor.disruptor.wait.strategy")
.defaultValue("BLOCKING_WAIT")
.withDocumentation("Strategy employed for making Disruptor Executor wait on a cursor. Other options are "
+ "SLEEPING_WAIT, it attempts to be conservative with CPU usage by using a simple busy wait loop"
+ "YIELDING_WAIT, it is designed for cases where there is the option to burn CPU cycles with the goal of improving latency"
+ "BUSY_SPIN_WAIT, it can be used in low-latency systems, but puts the highest constraints on the deployment environment");

public static final ConfigProperty<String> COMBINE_BEFORE_INSERT = ConfigProperty
.key("hoodie.combine.before.insert")
.defaultValue("false")
Expand Down Expand Up @@ -975,6 +999,10 @@ public String getKeyGeneratorClass() {
return getString(KEYGENERATOR_CLASS_NAME);
}

public ExecutorType getExecutorType() {
return ExecutorType.valueOf(getString(EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
}

public boolean isCDCEnabled() {
return getBooleanOrDefault(
HoodieTableConfig.CDC_ENABLED, HoodieTableConfig.CDC_ENABLED.defaultValue());
Expand Down Expand Up @@ -1040,6 +1068,14 @@ public int getWriteBufferLimitBytes() {
return Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE));
}

public Option<String> getWriteExecutorWaitStrategy() {
return Option.of(getString(WRITE_WAIT_STRATEGY));
}

public Option<Integer> getDisruptorWriteBufferSize() {
return Option.of(Integer.parseInt(getStringOrDefault(WRITE_DISRUPTOR_BUFFER_SIZE)));
}

public boolean shouldCombineBeforeInsert() {
return getBoolean(COMBINE_BEFORE_INSERT);
}
Expand Down Expand Up @@ -2287,6 +2323,11 @@ public Builder withKeyGenerator(String keyGeneratorClass) {
return this;
}

public Builder withExecutorType(String executorClass) {
writeConfig.setValue(EXECUTOR_TYPE, executorClass);
return this;
}

public Builder withTimelineLayoutVersion(int version) {
writeConfig.setValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(version));
return this;
Expand Down Expand Up @@ -2333,6 +2374,16 @@ public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
return this;
}

public Builder withWriteWaitStrategy(String waitStrategy) {
writeConfig.setValue(WRITE_WAIT_STRATEGY, String.valueOf(waitStrategy));
return this;
}

public Builder withWriteBufferSize(int size) {
writeConfig.setValue(WRITE_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
return this;
}

public Builder combineInput(boolean onInsert, boolean onUpsert) {
writeConfig.setValue(COMBINE_BEFORE_INSERT, String.valueOf(onInsert));
writeConfig.setValue(COMBINE_BEFORE_UPSERT, String.valueOf(onUpsert));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.hudi.io.HoodieWriteHandle;
Expand All @@ -38,7 +38,7 @@
* Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles.
*/
public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
extends BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {
extends IteratorBasedQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {

private HoodieWriteConfig config;
private String instantTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.HoodieBootstrapHandle;

Expand All @@ -29,7 +29,7 @@
/**
* Consumer that dequeues records from queue and sends to Merge Handle for writing.
*/
public class BootstrapRecordConsumer extends BoundedInMemoryQueueConsumer<HoodieRecord, Void> {
public class BootstrapRecordConsumer extends IteratorBasedQueueConsumer<HoodieRecord, Void> {

private final HoodieBootstrapHandle bootstrapHandle;

Expand All @@ -38,7 +38,7 @@ public BootstrapRecordConsumer(HoodieBootstrapHandle bootstrapHandle) {
}

@Override
protected void consumeOneRecord(HoodieRecord record) {
public void consumeOneRecord(HoodieRecord record) {
try {
bootstrapHandle.write(record, ((HoodieRecordPayload) record.getData())
.getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));
Expand All @@ -48,7 +48,7 @@ protected void consumeOneRecord(HoodieRecord record) {
}

@Override
protected void finish() {}
public void finish() {}

@Override
protected Void getResult() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
Expand Down Expand Up @@ -109,7 +109,7 @@ protected Iterator<GenericRecord> getMergingIterator(HoodieTable<T, I, K, O> tab
/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
protected static class UpdateHandler extends BoundedInMemoryQueueConsumer<GenericRecord, Void> {
protected static class UpdateHandler extends IteratorBasedQueueConsumer<GenericRecord, Void> {

private final HoodieMergeHandle upsertHandle;

Expand All @@ -118,12 +118,12 @@ protected UpdateHandler(HoodieMergeHandle upsertHandle) {
}

@Override
protected void consumeOneRecord(GenericRecord record) {
public void consumeOneRecord(GenericRecord record) {
upsertHandle.write(record);
}

@Override
protected void finish() {}
public void finish() {}

@Override
protected Void getResult() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.io.HoodieWriteHandle;

import java.util.ArrayList;
Expand All @@ -31,7 +31,7 @@
* Consumes stream of hoodie records from in-memory queue and writes to one explicit create handle.
*/
public class ExplicitWriteHandler<T extends HoodieRecordPayload>
extends BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {
extends IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {

private final List<WriteStatus> statuses = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.hudi.util.QueueBasedExecutorFactory;

import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -77,16 +78,16 @@ public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
@Override
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
null;
HoodieExecutor<?, ?, List<WriteStatus>> bufferedIteratorExecutor = null;
try {
Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
if (useWriterSchema) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
bufferedIteratorExecutor =
new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(),
getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());

bufferedIteratorExecutor = QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(),
getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());

final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.util;

import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.DisruptorExecutor;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;

import java.util.Iterator;
import java.util.function.Function;

public class QueueBasedExecutorFactory {

/**
* Create a new hoodie executor instance on demand.
*/
public static <I, O, E> HoodieExecutor create(HoodieWriteConfig hoodieConfig, Iterator<I> inputItr, IteratorBasedQueueConsumer<O, E> consumer,
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
ExecutorType executorType = hoodieConfig.getExecutorType();

switch (executorType) {
case BOUNDED_IN_MEMORY:
return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer,
transformFunction, preExecuteRunnable);
case DISRUPTOR:
return new DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr, consumer,
transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(), preExecuteRunnable);
default:
throw new HoodieException("Unsupported Executor Type " + executorType);
}
}
}
Loading