Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.queue.DisruptorWaitStrategyType;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.config.metrics.HoodieMetricsCloudWatchConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
Expand Down Expand Up @@ -97,8 +99,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY;
import static org.apache.hudi.common.util.queue.ExecutorType.DISRUPTOR;
import static org.apache.hudi.common.util.queue.ExecutorType.SIMPLE;
import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;
import static org.apache.hudi.table.marker.ConflictDetectionUtils.getDefaultEarlyConflictDetectionStrategy;

Expand Down Expand Up @@ -158,10 +159,10 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` "
+ "extract a key out of incoming records.");

public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty
public static final ConfigProperty<String> WRITE_EXECUTOR_TYPE = ConfigProperty
.key("hoodie.write.executor.type")
.defaultValue(BOUNDED_IN_MEMORY.name())
.withValidValues(BOUNDED_IN_MEMORY.name(), DISRUPTOR.name())
.defaultValue(SIMPLE.name())
.withValidValues(Arrays.stream(ExecutorType.values()).map(Enum::name).toArray(String[]::new))
.sinceVersion("0.13.0")
.withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue."
+ "BOUNDED_IN_MEMORY(default): Use LinkedBlockingQueue as a bounded in-memory queue, this queue will use extra lock to balance producers and consumer"
Expand Down Expand Up @@ -271,15 +272,15 @@ public class HoodieWriteConfig extends HoodieConfig {
.defaultValue(String.valueOf(4 * 1024 * 1024))
.withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes.");

public static final ConfigProperty<String> WRITE_DISRUPTOR_BUFFER_SIZE = ConfigProperty
public static final ConfigProperty<String> WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE = ConfigProperty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Let's take this in a separate PR) What is the unit of this config (B, KB, or MB)? We should mention the unit in the config naming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline: will follow-up w/ separate PR addressing this

.key("hoodie.write.executor.disruptor.buffer.size")
.defaultValue(String.valueOf(1024))
.sinceVersion("0.13.0")
.withDocumentation("The size of the Disruptor Executor ring buffer, must be power of 2");

public static final ConfigProperty<String> WRITE_WAIT_STRATEGY = ConfigProperty
public static final ConfigProperty<String> WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY = ConfigProperty
.key("hoodie.write.executor.disruptor.wait.strategy")
.defaultValue("BLOCKING_WAIT")
.defaultValue(DisruptorWaitStrategyType.BLOCKING_WAIT.name())
.sinceVersion("0.13.0")
.withDocumentation("Strategy employed for making Disruptor Executor wait on a cursor. Other options are "
+ "SLEEPING_WAIT, it attempts to be conservative with CPU usage by using a simple busy wait loop"
Expand Down Expand Up @@ -1107,7 +1108,7 @@ public String getKeyGeneratorClass() {
}

public ExecutorType getExecutorType() {
return ExecutorType.valueOf(getStringOrDefault(EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
return ExecutorType.valueOf(getStringOrDefault(WRITE_EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
}

public boolean isCDCEnabled() {
Expand Down Expand Up @@ -1175,12 +1176,12 @@ public int getWriteBufferLimitBytes() {
return Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE));
}

public Option<String> getWriteExecutorWaitStrategy() {
return Option.of(getString(WRITE_WAIT_STRATEGY));
public String getWriteExecutorDisruptorWaitStrategy() {
return getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY);
}

public Option<Integer> getDisruptorWriteBufferSize() {
return Option.of(Integer.parseInt(getStringOrDefault(WRITE_DISRUPTOR_BUFFER_SIZE)));
public Integer getWriteExecutorDisruptorWriteBufferSize() {
return Integer.parseInt(getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE));
}

public boolean shouldCombineBeforeInsert() {
Expand Down Expand Up @@ -1987,7 +1988,7 @@ public ApiSite getDatadogApiSite() {
public String getDatadogApiKey() {
if (props.containsKey(HoodieMetricsDatadogConfig.API_KEY.key())) {
return getString(HoodieMetricsDatadogConfig.API_KEY);

} else {
Supplier<String> apiKeySupplier = ReflectionUtils.loadClass(
getString(HoodieMetricsDatadogConfig.API_KEY_SUPPLIER));
Expand Down Expand Up @@ -2481,7 +2482,7 @@ public Builder withKeyGenerator(String keyGeneratorClass) {
}

public Builder withExecutorType(String executorClass) {
writeConfig.setValue(EXECUTOR_TYPE, executorClass);
writeConfig.setValue(WRITE_EXECUTOR_TYPE, executorClass);
return this;
}

Expand Down Expand Up @@ -2536,13 +2537,13 @@ public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
return this;
}

public Builder withWriteWaitStrategy(String waitStrategy) {
writeConfig.setValue(WRITE_WAIT_STRATEGY, String.valueOf(waitStrategy));
public Builder withWriteExecutorDisruptorWaitStrategy(String waitStrategy) {
writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY, String.valueOf(waitStrategy));
return this;
}

public Builder withWriteBufferSize(int size) {
writeConfig.setValue(WRITE_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
public Builder withWriteExecutorDisruptorWriteBufferSize(long size) {
writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
return this;
}

Expand Down Expand Up @@ -2970,8 +2971,15 @@ private void validate() {
}

public HoodieWriteConfig build() {
return build(true);
}

@VisibleForTesting
public HoodieWriteConfig build(boolean shouldValidate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this method use the default scope/visibility instead of public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem is that it's being used in tests in different packages

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. We should think about how to hide this, so it is not used by users accidentally.

setDefaults();
validate();
if (shouldValidate) {
validate();
}
// Build WriteConfig at the end
return new HoodieWriteConfig(engineType, writeConfig.getProps());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
Expand Down Expand Up @@ -90,23 +90,25 @@ public R getResult() {
}
}

static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema,
HoodieWriteConfig config) {
return getCloningTransformerInternal(schema, config.getProps());
static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformer(Schema schema,
HoodieWriteConfig writeConfig) {
return getTransformerInternal(schema, writeConfig);
}

static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema) {
return getCloningTransformerInternal(schema, new TypedProperties());
}
private static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(Schema schema,
HoodieWriteConfig writeConfig) {
// NOTE: Whether record have to be cloned here is determined based on the executor type used
// for writing: executors relying on an inner queue, will be keeping references to the records
// and therefore in the environments where underlying buffer holding the record could be
// reused (for ex, Spark) we need to make sure that we get a clean copy of
// it since these records will be subsequently buffered (w/in the in-memory queue);
// Only case when we don't need to make a copy is when using [[SimpleExecutor]] which
// is guaranteed to not hold on to references to any records
boolean shouldClone = writeConfig.getExecutorType() != ExecutorType.SIMPLE;

private static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformerInternal(Schema schema,
TypedProperties props) {
return record -> {
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be subsequently buffered (w/in the in-memory queue)
HoodieRecord<T> clonedRecord = record.copy();
return new HoodieInsertValueGenResult(clonedRecord, schema, props);
HoodieRecord<T> clonedRecord = shouldClone ? record.copy() : record;
return new HoodieInsertValueGenResult(clonedRecord, schema, writeConfig.getProps());
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.SimpleHoodieExecutor;
import org.apache.hudi.common.util.queue.DisruptorExecutor;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.common.util.queue.SimpleExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;

Expand All @@ -34,28 +34,28 @@
public class ExecutorFactory {

public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
Iterator<I> inputItr,
HoodieConsumer<O, E> consumer,
Function<I, O> transformFunction) {
Iterator<I> inputItr,
HoodieConsumer<O, E> consumer,
Function<I, O> transformFunction) {
return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop());
}

public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
Iterator<I> inputItr,
HoodieConsumer<O, E> consumer,
Function<I, O> transformFunction,
Runnable preExecuteRunnable) {
Iterator<I> inputItr,
HoodieConsumer<O, E> consumer,
Function<I, O> transformFunction,
Runnable preExecuteRunnable) {
ExecutorType executorType = hoodieConfig.getExecutorType();

switch (executorType) {
case BOUNDED_IN_MEMORY:
return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer,
transformFunction, preExecuteRunnable);
case DISRUPTOR:
return new DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr, consumer,
transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(), preExecuteRunnable);
return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer,
transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
case SIMPLE:
return new SimpleHoodieExecutor<>(inputItr, consumer, transformFunction);
return new SimpleExecutor<>(inputItr, consumer, transformFunction);
default:
throw new HoodieException("Unsupported Executor Type " + executorType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected List<WriteStatus> computeNext() {
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(),
getCloningTransformer(schema, hoodieConfig));
getTransformer(schema, hoodieConfig));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
checkState(result != null && !result.isEmpty());
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected List<WriteStatus> computeNext() {
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor =
ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getCloningTransformer(schema));
ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getTransformer(schema, hoodieConfig));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
checkState(result != null && !result.isEmpty());
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected List<WriteStatus> computeNext() {
}

bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(),
getCloningTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());
getTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());

final List<WriteStatus> result = bufferedIteratorExecutor.execute();
checkState(result != null && !result.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand All @@ -41,18 +42,21 @@

import scala.Tuple2;

import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer;
import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness {

private final String instantTime = HoodieActiveTimeline.createNewInstantTime();

private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name())
.withWriteBufferLimitBytes(1024)
.build(false);

@BeforeEach
public void setUp() throws Exception {
initTestDataGenerator();
Expand All @@ -74,8 +78,6 @@ public void testExecutor() {
final int recordNumber = 100;
final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, recordNumber);

HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {

Expand All @@ -94,8 +96,8 @@ public Integer finish() {

BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
try {
executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
executor = new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), getPreExecuteRunnable());
int result = executor.execute();

assertEquals(100, result);
Expand All @@ -113,8 +115,6 @@ public Integer finish() {
public void testInterruptExecutor() {
final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);

HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {

Expand All @@ -136,8 +136,8 @@ public Integer finish() {
};

BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor =
new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), getPreExecuteRunnable());

// Interrupt the current thread (therefore triggering executor to throw as soon as it
// invokes [[get]] on the [[CompletableFuture]])
Expand All @@ -154,8 +154,6 @@ public Integer finish() {

@Test
public void testExecutorTermination() {
HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
Iterator<GenericRecord> unboundedRecordIter = new Iterator<GenericRecord>() {
@Override
public boolean hasNext() {
Expand All @@ -181,8 +179,8 @@ public Integer finish() {
};

BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor =
new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA),
new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
consumer, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig),
getPreExecuteRunnable());
executor.shutdownNow();
boolean terminatedGracefully = executor.awaitTermination();
Expand Down
Loading