Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
527ade2
need more test and tuning
Apr 22, 2022
2b096e8
checked code style&& still need more java docs and more config and en…
Apr 22, 2022
19ce784
config and need more test
Apr 23, 2022
1db7936
need more consuming tuning
Apr 23, 2022
725f5c3
improved && add benchmark
Apr 23, 2022
71a9f13
adjust bulk_insert in benchmark
Apr 23, 2022
b5843f0
adjust bulk_insert in benchmark
Apr 23, 2022
4a1f01c
modify benchmark
Apr 24, 2022
4b94523
adjust
Apr 24, 2022
69904cc
tuning
Apr 24, 2022
204abe2
add executor bench mark
Apr 24, 2022
45979df
add recordsNumber to control
Apr 24, 2022
075cab1
code style
Apr 24, 2022
e87bfbc
modify benchmark
Apr 24, 2022
79dcebe
add bench mark result
Apr 24, 2022
e85c961
ready to review
Apr 24, 2022
b838e1f
merge from master
Apr 24, 2022
e00a109
code style
Apr 24, 2022
74068f6
code style
Apr 24, 2022
03b328e
code style
Apr 24, 2022
3778cf4
code style
Apr 24, 2022
8041a02
merge from master
Jun 6, 2022
a04be3c
merge from master
Jun 21, 2022
1b86864
code review
Jun 22, 2022
6114ee2
merge from master
Oct 5, 2022
67d8cd3
merge from master
Oct 5, 2022
b369ccb
address comments
Oct 6, 2022
92760db
address comments
Oct 6, 2022
3e44fb2
address comments
Oct 6, 2022
672fdc6
address comments
Oct 7, 2022
c0c2274
address comments
Oct 7, 2022
4587303
address comments
Oct 7, 2022
abd8b27
address comments
Oct 7, 2022
4ba91d4
address comments
Oct 7, 2022
70ca2c7
add mmore tests
Oct 8, 2022
447cb45
address comments
Oct 8, 2022
f0b7218
merge from master
Oct 13, 2022
eaf0ae0
address comments
Oct 14, 2022
270d811
address comments
Oct 14, 2022
0fc24d3
address comments
Oct 14, 2022
4797166
address comments
Oct 14, 2022
1fa9ac7
address comments
Oct 14, 2022
e230fbb
address comments
Oct 14, 2022
298f66d
address comments
Oct 17, 2022
8ded3e8
address comments
Oct 17, 2022
794e30b
address comments
Oct 17, 2022
25fcd5d
address comments
Oct 17, 2022
6e56bd1
address comments
Oct 17, 2022
d615f1f
address comments
Oct 17, 2022
c748320
address comments
Oct 17, 2022
3f3a41a
address comments
Oct 17, 2022
9fe6bda
merge from master
Oct 19, 2022
a81ffdf
merge from master
Oct 19, 2022
fd19907
address comments
Oct 21, 2022
e5c17f0
address comments
Oct 24, 2022
c13fc35
merge from master
Oct 25, 2022
d1970a3
merge from master
Oct 25, 2022
988e14a
address comments
Oct 31, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` "
+ "extract a key out of incoming records.");

public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty
.key("hoodie.write.executor.type")
.defaultValue("BOUNDED_IN_MEMORY_EXECUTOR")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we introduce enum for this. also, we can remove "EXECUTOR" suffix from it. for eg, in case of Key gen, enum is named as KeyGeneratorType and value is just "SIMPLE". we don't call it as "SIMPOLE_KEY_GEN" as its repetitive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing. Changed!

.withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue."
+ "default value is BOUNDED_IN_MEMORY_EXECUTOR which use a bounded in-memory queue using LinkedBlockingQueue."
+ "Also users could use DISRUPTOR_EXECUTOR, which use disruptor as a lock free message queue "
+ "to gain better writing performance. Although DISRUPTOR_EXECUTOR is still an experimental feature.");

public static final ConfigProperty<String> KEYGENERATOR_TYPE = ConfigProperty
.key("hoodie.datasource.write.keygenerator.type")
.defaultValue(KeyGeneratorType.SIMPLE.name())
Expand Down Expand Up @@ -235,6 +243,16 @@ public class HoodieWriteConfig extends HoodieConfig {
.defaultValue(String.valueOf(4 * 1024 * 1024))
.withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes.");

public static final ConfigProperty<Integer> WRITE_BUFFER_SIZE = ConfigProperty
.key("hoodie.write.buffer.size")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would suggest to sub-scope this to (for ex, hoodie.write.executor.disruptor.buffer.size)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

.defaultValue(1024)
.withDocumentation("The size of the Disruptor Executor ring buffer, must be power of 2");

public static final ConfigProperty<String> WRITE_WAIT_STRATEGY = ConfigProperty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

.key("hoodie.write.wait.strategy")
.defaultValue("BlockingWaitStrategy")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, may be an enum for this too. and you can remove "Strategy" suffix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not too strong on the suffix removal. feel free to take a call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, just remove the "Strategy" suffix :)

.withDocumentation("Strategy employed for making DisruptorExecutor wait on a cursor.");

public static final ConfigProperty<String> COMBINE_BEFORE_INSERT = ConfigProperty
.key("hoodie.combine.before.insert")
.defaultValue("false")
Expand Down Expand Up @@ -975,6 +993,10 @@ public String getKeyGeneratorClass() {
return getString(KEYGENERATOR_CLASS_NAME);
}

public String getExecutorType() {
return getString(EXECUTOR_TYPE);
}

public boolean isConsistentLogicalTimestampEnabled() {
return getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
}
Expand Down Expand Up @@ -1035,6 +1057,14 @@ public int getWriteBufferLimitBytes() {
return Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE));
}

public String getWriteWaitStrategy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • nit: getWriteExecutorWaitStrategy
  • Let's make it return Option (since config is optional)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing. Changed.

return getString(WRITE_WAIT_STRATEGY);
}

public int getWriteBufferSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to above one (this one will actually cause NPE, since we're returning primitive type)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

return getInt(WRITE_BUFFER_SIZE);
}

public boolean shouldCombineBeforeInsert() {
return getBoolean(COMBINE_BEFORE_INSERT);
}
Expand Down Expand Up @@ -2205,6 +2235,11 @@ public Builder withKeyGenerator(String keyGeneratorClass) {
return this;
}

public Builder withExecutorName(String executorClass) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be withExecutorType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, changed.

writeConfig.setValue(EXECUTOR_TYPE, executorClass);
return this;
}

public Builder withTimelineLayoutVersion(int version) {
writeConfig.setValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(version));
return this;
Expand Down Expand Up @@ -2251,6 +2286,16 @@ public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
return this;
}

public Builder withWriteWaitStrategy(String waitStrategy) {
writeConfig.setValue(WRITE_WAIT_STRATEGY, String.valueOf(waitStrategy));
return this;
}

public Builder withWriteBufferSize(int size) {
writeConfig.setValue(WRITE_BUFFER_SIZE, String.valueOf(size));
return this;
}

public Builder combineInput(boolean onInsert, boolean onUpsert) {
writeConfig.setValue(COMBINE_BEFORE_INSERT, String.valueOf(onInsert));
writeConfig.setValue(COMBINE_BEFORE_UPSERT, String.valueOf(onUpsert));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.DisruptorExecutor;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.WriteHandleFactory;
Expand All @@ -33,6 +36,7 @@

import java.util.Iterator;
import java.util.List;
import java.util.Locale;

public class SparkLazyInsertIterable<T extends HoodieRecordPayload> extends HoodieLazyInsertIterable<T> {

Expand Down Expand Up @@ -77,16 +81,35 @@ public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
@Override
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
null;
HoodieExecutor<?, ?, List<WriteStatus>> bufferedIteratorExecutor = null;
try {
Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
if (useWriterSchema) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
bufferedIteratorExecutor =
new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(),

String executorType = hoodieConfig.getExecutorType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cloud put executorType -> executorTypeEnum conversion logic into the hooideConfig.getExecutorType(), to have more clear logic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing. Changed.

ExecutorType executorTypeEnum;

try {
executorTypeEnum = ExecutorType.valueOf(executorType.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new HoodieException("Unsupported Executor Type " + executorType);
}

switch (executorTypeEnum) {
case BOUNDED_IN_MEMORY_EXECUTOR:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have just started reviewing. not sure if we have similar code elsewhere. but can we move this to a util method so that code here is leaner and can be reused at other places.

bufferedIteratorExecutor = MessageQueueUtil/MessageQueueFactory.getMessageQueueExecutor(executorType size, itr, consumer, transformFn, preExecutorRunnable).  

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.
Currently only parkLazyInsertIterable.java will create this bufferedIteratorExecutor, but we really should move this to a util method.

bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(),
getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());
break;
case DISRUPTOR_EXECUTOR:
bufferedIteratorExecutor = new DisruptorExecutor<>(hoodieConfig.getWriteBufferSize(), inputItr, getInsertHandler(),
getTransformFunction(schema, hoodieConfig), hoodieConfig.getWriteWaitStrategy(), hoodieTable.getPreExecuteRunnable());
break;
default:
throw new HoodieException("Unsupported Executor Type " + executorType);
}

final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.execution;

import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;

import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.common.util.queue.DisruptorExecutor;
import org.apache.hudi.common.util.queue.WaitStrategyFactory;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import scala.Tuple2;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {

private final String instantTime = HoodieActiveTimeline.createNewInstantTime();

@BeforeEach
public void setUp() throws Exception {
initTestDataGenerator();
initExecutorServiceWithFixedThreadPool(2);
}

@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}

private Runnable getPreExecuteRunnable() {
final TaskContext taskContext = TaskContext.get();
return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
}

@Test
public void testExecutor() {

final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 128);

HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(8);
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
new BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {

private int count = 0;

@Override
protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
count++;
}

@Override
protected void finish() {
}

@Override
protected Integer getResult() {
return count;
}
};
DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;

try {
exec = new DisruptorExecutor(hoodieWriteConfig.getWriteBufferSize(), hoodieRecords.iterator(), consumer,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
int result = exec.execute();
// It should buffer and write 100 records
assertEquals(128, result);
// There should be no remaining records in the buffer
assertFalse(exec.isRemaining());
} finally {
if (exec != null) {
exec.shutdownNow();
}
}
}

@Test
public void testInterruptExecutor() {
final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
ExecutorService pool = Executors.newSingleThreadExecutor();

HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(1024);
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
new BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {

@Override
protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
try {
while (true) {
Thread.sleep(1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid using sleeps in tests (it's a time-bomb). Instead just park the thread on a lock/latch/etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

}
} catch (InterruptedException ie) {
return;
}
}

@Override
protected void finish() {
}

@Override
protected Integer getResult() {
return 0;
}
};

DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
AtomicReference<Exception> actualException = new AtomicReference<>();
try {
executor = new DisruptorExecutor(hoodieWriteConfig.getWriteBufferSize(), hoodieRecords.iterator(), consumer,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> finalExecutor = executor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for another var, just drop = null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.


Future<?> future = pool.submit(() -> {
try {
finalExecutor.execute();
} catch (Exception e) {
actualException.set(e);
}

});
future.cancel(true);
future.get();
assertTrue(actualException.get() instanceof HoodieException);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of that please use assertThrows utility

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

} catch (Exception e) {
// ignore here
} finally {
if (executor != null) {
executor.shutdownNow();
}
}
}
}
Loading