diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 2f36aa725e338..b9d7c800250a0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -54,6 +54,8 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.queue.DisruptorWaitStrategyType; import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.config.metrics.HoodieMetricsCloudWatchConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; @@ -97,8 +99,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY; -import static org.apache.hudi.common.util.queue.ExecutorType.DISRUPTOR; +import static org.apache.hudi.common.util.queue.ExecutorType.SIMPLE; import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY; import static org.apache.hudi.table.marker.ConflictDetectionUtils.getDefaultEarlyConflictDetectionStrategy; @@ -158,10 +159,10 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` " + "extract a key out of incoming records."); - public static final ConfigProperty EXECUTOR_TYPE = ConfigProperty + public static final ConfigProperty WRITE_EXECUTOR_TYPE = ConfigProperty .key("hoodie.write.executor.type") - .defaultValue(BOUNDED_IN_MEMORY.name()) - .withValidValues(BOUNDED_IN_MEMORY.name(), DISRUPTOR.name()) + .defaultValue(SIMPLE.name()) + .withValidValues(Arrays.stream(ExecutorType.values()).map(Enum::name).toArray(String[]::new)) .sinceVersion("0.13.0") .withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue." + "BOUNDED_IN_MEMORY(default): Use LinkedBlockingQueue as a bounded in-memory queue, this queue will use extra lock to balance producers and consumer" @@ -271,15 +272,15 @@ public class HoodieWriteConfig extends HoodieConfig { .defaultValue(String.valueOf(4 * 1024 * 1024)) .withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes."); - public static final ConfigProperty WRITE_DISRUPTOR_BUFFER_SIZE = ConfigProperty + public static final ConfigProperty WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE = ConfigProperty .key("hoodie.write.executor.disruptor.buffer.size") .defaultValue(String.valueOf(1024)) .sinceVersion("0.13.0") .withDocumentation("The size of the Disruptor Executor ring buffer, must be power of 2"); - public static final ConfigProperty WRITE_WAIT_STRATEGY = ConfigProperty + public static final ConfigProperty WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY = ConfigProperty .key("hoodie.write.executor.disruptor.wait.strategy") - .defaultValue("BLOCKING_WAIT") + .defaultValue(DisruptorWaitStrategyType.BLOCKING_WAIT.name()) .sinceVersion("0.13.0") .withDocumentation("Strategy employed for making Disruptor Executor wait on a cursor. Other options are " + "SLEEPING_WAIT, it attempts to be conservative with CPU usage by using a simple busy wait loop" @@ -1107,7 +1108,7 @@ public String getKeyGeneratorClass() { } public ExecutorType getExecutorType() { - return ExecutorType.valueOf(getStringOrDefault(EXECUTOR_TYPE).toUpperCase(Locale.ROOT)); + return ExecutorType.valueOf(getStringOrDefault(WRITE_EXECUTOR_TYPE).toUpperCase(Locale.ROOT)); } public boolean isCDCEnabled() { @@ -1175,12 +1176,12 @@ public int getWriteBufferLimitBytes() { return Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE)); } - public Option getWriteExecutorWaitStrategy() { - return Option.of(getString(WRITE_WAIT_STRATEGY)); + public String getWriteExecutorDisruptorWaitStrategy() { + return getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY); } - public Option getDisruptorWriteBufferSize() { - return Option.of(Integer.parseInt(getStringOrDefault(WRITE_DISRUPTOR_BUFFER_SIZE))); + public Integer getWriteExecutorDisruptorWriteBufferSize() { + return Integer.parseInt(getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE)); } public boolean shouldCombineBeforeInsert() { @@ -1987,7 +1988,7 @@ public ApiSite getDatadogApiSite() { public String getDatadogApiKey() { if (props.containsKey(HoodieMetricsDatadogConfig.API_KEY.key())) { return getString(HoodieMetricsDatadogConfig.API_KEY); - + } else { Supplier apiKeySupplier = ReflectionUtils.loadClass( getString(HoodieMetricsDatadogConfig.API_KEY_SUPPLIER)); @@ -2481,7 +2482,7 @@ public Builder withKeyGenerator(String keyGeneratorClass) { } public Builder withExecutorType(String executorClass) { - writeConfig.setValue(EXECUTOR_TYPE, executorClass); + writeConfig.setValue(WRITE_EXECUTOR_TYPE, executorClass); return this; } @@ -2536,13 +2537,13 @@ public Builder withWriteBufferLimitBytes(int writeBufferLimit) { return this; } - public Builder withWriteWaitStrategy(String waitStrategy) { - writeConfig.setValue(WRITE_WAIT_STRATEGY, String.valueOf(waitStrategy)); + public Builder withWriteExecutorDisruptorWaitStrategy(String waitStrategy) { + writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY, String.valueOf(waitStrategy)); return this; } - public Builder withWriteBufferSize(int size) { - writeConfig.setValue(WRITE_DISRUPTOR_BUFFER_SIZE, String.valueOf(size)); + public Builder withWriteExecutorDisruptorWriteBufferSize(long size) { + writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE, String.valueOf(size)); return this; } @@ -2970,8 +2971,15 @@ private void validate() { } public HoodieWriteConfig build() { + return build(true); + } + + @VisibleForTesting + public HoodieWriteConfig build(boolean shouldValidate) { setDefaults(); - validate(); + if (shouldValidate) { + validate(); + } // Build WriteConfig at the end return new HoodieWriteConfig(engineType, writeConfig.getProps()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java index 20f75f63c5234..991e52982cf74 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java @@ -21,9 +21,9 @@ import org.apache.avro.Schema; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.LazyIterableIterator; -import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.WriteHandleFactory; @@ -90,23 +90,25 @@ public R getResult() { } } - static Function, HoodieInsertValueGenResult> getCloningTransformer(Schema schema, - HoodieWriteConfig config) { - return getCloningTransformerInternal(schema, config.getProps()); + static Function, HoodieInsertValueGenResult> getTransformer(Schema schema, + HoodieWriteConfig writeConfig) { + return getTransformerInternal(schema, writeConfig); } - static Function, HoodieInsertValueGenResult> getCloningTransformer(Schema schema) { - return getCloningTransformerInternal(schema, new TypedProperties()); - } + private static Function, HoodieInsertValueGenResult> getTransformerInternal(Schema schema, + HoodieWriteConfig writeConfig) { + // NOTE: Whether record have to be cloned here is determined based on the executor type used + // for writing: executors relying on an inner queue, will be keeping references to the records + // and therefore in the environments where underlying buffer holding the record could be + // reused (for ex, Spark) we need to make sure that we get a clean copy of + // it since these records will be subsequently buffered (w/in the in-memory queue); + // Only case when we don't need to make a copy is when using [[SimpleExecutor]] which + // is guaranteed to not hold on to references to any records + boolean shouldClone = writeConfig.getExecutorType() != ExecutorType.SIMPLE; - private static Function, HoodieInsertValueGenResult> getCloningTransformerInternal(Schema schema, - TypedProperties props) { return record -> { - // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific - // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of - // it since these records will be subsequently buffered (w/in the in-memory queue) - HoodieRecord clonedRecord = record.copy(); - return new HoodieInsertValueGenResult(clonedRecord, schema, props); + HoodieRecord clonedRecord = shouldClone ? record.copy() : record; + return new HoodieInsertValueGenResult(clonedRecord, schema, writeConfig.getProps()); }; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java index b9e7f06f84859..bd192f649dbe8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java @@ -20,11 +20,11 @@ import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.SimpleHoodieExecutor; import org.apache.hudi.common.util.queue.DisruptorExecutor; import org.apache.hudi.common.util.queue.ExecutorType; -import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.common.util.queue.HoodieConsumer; +import org.apache.hudi.common.util.queue.HoodieExecutor; +import org.apache.hudi.common.util.queue.SimpleExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -34,17 +34,17 @@ public class ExecutorFactory { public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, - Iterator inputItr, - HoodieConsumer consumer, - Function transformFunction) { + Iterator inputItr, + HoodieConsumer consumer, + Function transformFunction) { return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop()); } public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, - Iterator inputItr, - HoodieConsumer consumer, - Function transformFunction, - Runnable preExecuteRunnable) { + Iterator inputItr, + HoodieConsumer consumer, + Function transformFunction, + Runnable preExecuteRunnable) { ExecutorType executorType = hoodieConfig.getExecutorType(); switch (executorType) { @@ -52,10 +52,10 @@ public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer, transformFunction, preExecuteRunnable); case DISRUPTOR: - return new DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr, consumer, - transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(), preExecuteRunnable); + return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer, + transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable); case SIMPLE: - return new SimpleHoodieExecutor<>(inputItr, consumer, transformFunction); + return new SimpleExecutor<>(inputItr, consumer, transformFunction); default: throw new HoodieException("Unsupported Executor Type " + executorType); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java index 6979716624910..6e573ec9432b6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java @@ -60,7 +60,7 @@ protected List computeNext() { try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(), - getCloningTransformer(schema, hoodieConfig)); + getTransformer(schema, hoodieConfig)); final List result = bufferedIteratorExecutor.execute(); checkState(result != null && !result.isEmpty()); return result; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java index 5113b3406801e..d2e813a506bc8 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java @@ -64,7 +64,7 @@ protected List computeNext() { try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); bufferedIteratorExecutor = - ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getCloningTransformer(schema)); + ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getTransformer(schema, hoodieConfig)); final List result = bufferedIteratorExecutor.execute(); checkState(result != null && !result.isEmpty()); return result; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index 7cb2b27e17115..147b5cf6b3333 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -87,7 +87,7 @@ protected List computeNext() { } bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), - getCloningTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); + getTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); final List result = bufferedIteratorExecutor.execute(); checkState(result != null && !result.isEmpty()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java index 3ccdf1ec10690..eb61cb433120d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -41,18 +42,21 @@ import scala.Tuple2; -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness { private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); + private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name()) + .withWriteBufferLimitBytes(1024) + .build(false); + @BeforeEach public void setUp() throws Exception { initTestDataGenerator(); @@ -74,8 +78,6 @@ public void testExecutor() { final int recordNumber = 100; final List hoodieRecords = dataGen.generateInserts(instantTime, recordNumber); - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); HoodieConsumer, Integer> consumer = new HoodieConsumer, Integer>() { @@ -94,8 +96,8 @@ public Integer finish() { BoundedInMemoryExecutor>, Integer> executor = null; try { - executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); + executor = new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, + getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), getPreExecuteRunnable()); int result = executor.execute(); assertEquals(100, result); @@ -113,8 +115,6 @@ public Integer finish() { public void testInterruptExecutor() { final List hoodieRecords = dataGen.generateInserts(instantTime, 100); - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); HoodieConsumer, Integer> consumer = new HoodieConsumer, Integer>() { @@ -136,8 +136,8 @@ public Integer finish() { }; BoundedInMemoryExecutor>, Integer> executor = - new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); + new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, + getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), getPreExecuteRunnable()); // Interrupt the current thread (therefore triggering executor to throw as soon as it // invokes [[get]] on the [[CompletableFuture]]) @@ -154,8 +154,6 @@ public Integer finish() { @Test public void testExecutorTermination() { - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); Iterator unboundedRecordIter = new Iterator() { @Override public boolean hasNext() { @@ -181,8 +179,8 @@ public Integer finish() { }; BoundedInMemoryExecutor>, Integer> executor = - new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter, - consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), + new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), unboundedRecordIter, + consumer, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), getPreExecuteRunnable()); executor.shutdownNow(); boolean terminatedGracefully = executor.awaitTermination(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java index c9be18c9da3da..50ba44c5688c8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java @@ -27,9 +27,11 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SizeEstimator; import org.apache.hudi.common.util.queue.BoundedInMemoryQueue; +import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.testutils.HoodieClientTestHarness; @@ -54,7 +56,7 @@ import scala.Tuple2; -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -65,6 +67,11 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness { private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); + private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name()) + .withWriteBufferLimitBytes(1024) + .build(false); + @BeforeEach public void setUp() throws Exception { initTestDataGenerator(); @@ -85,7 +92,7 @@ public void testRecordReading() throws Exception { final int numRecords = 128; final List hoodieRecords = dataGen.generateInserts(instantTime, numRecords); final BoundedInMemoryQueue queue = - new BoundedInMemoryQueue(FileIOUtils.KB, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(FileIOUtils.KB, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); // Produce Future resFuture = executorService.submit(() -> { new IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue); @@ -125,7 +132,7 @@ public void testCompositeProducerRecordReading() throws Exception { final List> recs = new ArrayList<>(); final BoundedInMemoryQueue queue = - new BoundedInMemoryQueue(FileIOUtils.KB, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(FileIOUtils.KB, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); // Record Key to Map> keyToProducerAndIndexMap = new HashMap<>(); @@ -220,11 +227,11 @@ public void testMemoryLimitForBuffering() throws Exception { final int recordLimit = 5; final SizeEstimator sizeEstimator = new DefaultSizeEstimator<>(); HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult = - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord) hoodieRecords.get(0)); + getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig).apply(hoodieRecords.get(0)); final long objSize = sizeEstimator.sizeEstimate(genResult); final long memoryLimitInBytes = recordLimit * objSize; final BoundedInMemoryQueue queue = - new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(memoryLimitInBytes, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); // Produce executorService.submit(() -> { @@ -269,7 +276,7 @@ public void testException() throws Exception { final SizeEstimator>> sizeEstimator = new DefaultSizeEstimator<>(); // queue memory limit HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult = - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0)); + getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig).apply(hoodieRecords.get(0)); final long objSize = sizeEstimator.sizeEstimate(new Tuple2(genResult.getResult(), genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new Properties()))); final long memoryLimitInBytes = 4 * objSize; @@ -277,7 +284,7 @@ public void testException() throws Exception { // stops and throws // correct exception back. BoundedInMemoryQueue>> queue1 = - new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(memoryLimitInBytes, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); // Produce Future resFuture = executorService.submit(() -> { @@ -305,7 +312,7 @@ public void testException() throws Exception { when(mockHoodieRecordsIterator.hasNext()).thenReturn(true); when(mockHoodieRecordsIterator.next()).thenThrow(expectedException); BoundedInMemoryQueue>> queue2 = - new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(memoryLimitInBytes, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); // Produce Future res = executorService.submit(() -> { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java index 674aca7415395..55c2325b137a6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java @@ -18,13 +18,11 @@ package org.apache.hudi.execution; -import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.common.util.queue.DisruptorExecutor; +import org.apache.hudi.common.util.queue.ExecutorType; +import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.common.util.queue.WaitStrategyFactory; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -38,21 +36,23 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.Function; -import scala.Tuple2; - -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness { private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); + + private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withExecutorType(ExecutorType.DISRUPTOR.name()) + .withWriteExecutorDisruptorWriteBufferSize(8) + .build(false); + @BeforeEach public void setUp() throws Exception { initTestDataGenerator(); @@ -75,16 +75,14 @@ public void testExecutor() { final List hoodieRecords = dataGen.generateInserts(instantTime, 128); final List consumedRecords = new ArrayList<>(); - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8)); - HoodieConsumer, Integer> consumer = - new HoodieConsumer, Integer>() { + HoodieConsumer consumer = + new HoodieConsumer() { private int count = 0; @Override - public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { - consumedRecords.add(record.getResult()); + public void consume(HoodieRecord record) { + consumedRecords.add(record); count++; } @@ -93,11 +91,11 @@ public Integer finish() { return count; } }; - DisruptorExecutor>, Integer> exec = null; + DisruptorExecutor exec = null; try { - exec = new DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable()); + exec = new DisruptorExecutor<>(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, + Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); int result = exec.execute(); // It should buffer and write 100 records assertEquals(128, result); @@ -123,13 +121,11 @@ public Integer finish() { public void testInterruptExecutor() { final List hoodieRecords = dataGen.generateInserts(instantTime, 100); - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(1024)); - HoodieConsumer, Integer> consumer = - new HoodieConsumer, Integer>() { + HoodieConsumer consumer = + new HoodieConsumer() { @Override - public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + public void consume(HoodieRecord record) { try { Thread.currentThread().wait(); } catch (InterruptedException ie) { @@ -143,9 +139,9 @@ public Integer finish() { } }; - DisruptorExecutor>, Integer> - executor = new DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable()); + DisruptorExecutor + executor = new DisruptorExecutor<>(1024, hoodieRecords.iterator(), consumer, + Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); try { Thread.currentThread().interrupt(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java index 2ff9d02926b01..7344ccd89fd16 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.queue.DisruptorExecutor; import org.apache.hudi.common.util.queue.DisruptorMessageQueue; +import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.common.util.queue.HoodieProducer; @@ -56,17 +57,20 @@ import java.util.stream.IntStream; import static org.apache.hudi.exception.ExceptionUtil.getRootCause; -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class TestDisruptorMessageQueue extends HoodieClientTestHarness { private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); + private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withExecutorType(ExecutorType.DISRUPTOR.name()) + .withWriteExecutorDisruptorWriteBufferSize(16) + .build(false); + @BeforeEach public void setUp() throws Exception { initTestDataGenerator(); @@ -108,8 +112,6 @@ public void testRecordReading() throws Exception { } }); - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(16)); HoodieConsumer, Integer> consumer = new HoodieConsumer, Integer>() { @@ -137,8 +139,8 @@ public Integer finish() { DisruptorExecutor>, Integer> exec = null; try { - exec = new DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable()); + exec = new DisruptorExecutor(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, + getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); int result = exec.execute(); // It should buffer and write 100 records assertEquals(100, result); @@ -167,8 +169,8 @@ public void testCompositeProducerRecordReading() throws Exception { final List> recs = new ArrayList<>(); final DisruptorMessageQueue queue = - new DisruptorMessageQueue(Option.of(1024), getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), - Option.of("BLOCKING_WAIT"), numProducers, new Runnable() { + new DisruptorMessageQueue(1024, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), + "BLOCKING_WAIT", numProducers, new Runnable() { @Override public void run() { // do nothing. @@ -282,8 +284,8 @@ public void testException() throws Exception { final int numProducers = 40; final DisruptorMessageQueue queue = - new DisruptorMessageQueue(Option.of(1024), getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), - Option.of("BLOCKING_WAIT"), numProducers, new Runnable() { + new DisruptorMessageQueue(1024, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), + "BLOCKING_WAIT", numProducers, new Runnable() { @Override public void run() { // do nothing. @@ -324,9 +326,9 @@ public Integer finish() { } }; - DisruptorExecutor>, Integer> exec = new DisruptorExecutor(Option.of(1024), - producers, consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), - Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable()); + DisruptorExecutor>, Integer> exec = new DisruptorExecutor(1024, + producers, consumer, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), + WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); final Throwable thrown = assertThrows(HoodieException.class, exec::execute, "exception is expected"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java index bbc85efd376a6..830577463b299 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java @@ -18,8 +18,6 @@ package org.apache.hudi.execution; -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; - import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieRecord; @@ -27,8 +25,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.HoodieConsumer; -import org.apache.hudi.common.util.queue.SimpleHoodieExecutor; -import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.common.util.queue.SimpleExecutor; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.junit.jupiter.api.AfterEach; @@ -41,16 +38,13 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; - -import scala.Tuple2; +import java.util.function.Function; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { +public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); @@ -71,15 +65,13 @@ public void testExecutor() { final List hoodieRecords = dataGen.generateInserts(instantTime, 128); final List consumedRecords = new ArrayList<>(); - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8)); - HoodieConsumer, Integer> consumer = - new HoodieConsumer, Integer>() { + HoodieConsumer consumer = + new HoodieConsumer() { private int count = 0; @Override - public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) throws Exception { - consumedRecords.add(record.getResult()); + public void consume(HoodieRecord record) throws Exception { + consumedRecords.add(record); count++; } @@ -88,10 +80,10 @@ public Integer finish() { return count; } }; - SimpleHoodieExecutor>, Integer> exec = null; + SimpleExecutor exec = null; try { - exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + exec = new SimpleExecutor<>(hoodieRecords.iterator(), consumer, Function.identity()); int result = exec.execute(); // It should buffer and write 128 records @@ -133,16 +125,16 @@ public void testRecordReading() { } }); - HoodieConsumer, Integer> consumer = - new HoodieConsumer, Integer>() { + HoodieConsumer consumer = + new HoodieConsumer() { private int count = 0; @Override - public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) throws Exception { + public void consume(HoodieRecord record) throws Exception { count++; - afterRecord.add((HoodieAvroRecord) record.getResult()); + afterRecord.add((HoodieAvroRecord) record); try { - IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) record.getResult()) + IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) record) .getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get(); afterIndexedRecord.add(indexedRecord); } catch (IOException e) { @@ -156,10 +148,10 @@ public Integer finish() { } }; - SimpleHoodieExecutor>, Integer> exec = null; + SimpleExecutor exec = null; try { - exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + exec = new SimpleExecutor<>(hoodieRecords.iterator(), consumer, Function.identity()); int result = exec.execute(); assertEquals(100, result); @@ -186,14 +178,13 @@ public void testException() { List pRecs = dataGen.generateInserts(instantTime, numRecords); InnerIterator iterator = new InnerIterator(pRecs.iterator(), errorMessage, numRecords / 10); - HoodieConsumer, Integer> consumer = - new HoodieConsumer, Integer>() { + HoodieConsumer consumer = + new HoodieConsumer() { int count = 0; @Override - public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult payload) throws Exception { + public void consume(HoodieRecord payload) throws Exception { // Read recs and ensure we have covered all producer recs. - final HoodieRecord rec = payload.getResult(); count++; } @@ -203,8 +194,8 @@ public Integer finish() { } }; - SimpleHoodieExecutor>, Integer> exec = - new SimpleHoodieExecutor(iterator, consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + SimpleExecutor exec = + new SimpleExecutor<>(iterator, consumer, Function.identity()); final Throwable thrown = assertThrows(HoodieException.class, exec::execute, "exception is expected"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java index c379a7abcc0d9..20e5af1a91cf9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java @@ -61,7 +61,7 @@ protected void doConsume(HoodieMessageQueue queue, HoodieConsumer co } LOG.info("All records from the queue have been consumed"); } catch (Exception e) { - LOG.error("Error consuming records", e); + LOG.error("Failed consuming records", e); queue.markAsFailed(e); throw new HoodieException(e); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java index 4ebbd6e528b9a..056c45a0bf40a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java @@ -32,16 +32,23 @@ */ public class DisruptorExecutor extends BaseHoodieQueueBasedExecutor { - public DisruptorExecutor(final Option bufferSize, final Iterator inputItr, - HoodieConsumer consumer, Function transformFunction, Option waitStrategy, Runnable preExecuteRunnable) { + public DisruptorExecutor(Integer bufferSize, + Iterator inputItr, + HoodieConsumer consumer, + Function transformFunction, + String waitStrategy, + Runnable preExecuteRunnable) { this(bufferSize, Collections.singletonList(new IteratorBasedQueueProducer<>(inputItr)), consumer, transformFunction, waitStrategy, preExecuteRunnable); } - public DisruptorExecutor(final Option bufferSize, List> producers, - HoodieConsumer consumer, final Function transformFunction, - final Option waitStrategy, Runnable preExecuteRunnable) { - super(producers, Option.of(consumer), new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable), preExecuteRunnable); + public DisruptorExecutor(int bufferSize, + List> producers, + HoodieConsumer consumer, + Function transformFunction, + String waitStrategyId, + Runnable preExecuteRunnable) { + super(producers, Option.of(consumer), new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategyId, producers.size(), preExecuteRunnable), preExecuteRunnable); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java index 19b0a04b5e845..de4ecff9ae45a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java @@ -27,6 +27,8 @@ import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.util.function.Function; @@ -38,6 +40,8 @@ */ public class DisruptorMessageQueue implements HoodieMessageQueue { + private static final Logger LOG = LogManager.getLogger(DisruptorMessageQueue.class); + private final Disruptor queue; private final Function transformFunction; private final RingBuffer ringBuffer; @@ -45,11 +49,11 @@ public class DisruptorMessageQueue implements HoodieMessageQueue { private boolean isShutdown = false; private boolean isStarted = false; - public DisruptorMessageQueue(Option bufferSize, Function transformFunction, Option waitStrategyName, int totalProducers, Runnable preExecuteRunnable) { - WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyName); + public DisruptorMessageQueue(int bufferSize, Function transformFunction, String waitStrategyId, int totalProducers, Runnable preExecuteRunnable) { + WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyId); CustomizedThreadFactory threadFactory = new CustomizedThreadFactory("disruptor", true, preExecuteRunnable); - this.queue = new Disruptor<>(HoodieDisruptorEvent::new, bufferSize.get(), threadFactory, totalProducers > 1 ? ProducerType.MULTI : ProducerType.SINGLE, waitStrategy); + this.queue = new Disruptor<>(HoodieDisruptorEvent::new, bufferSize, threadFactory, totalProducers > 1 ? ProducerType.MULTI : ProducerType.SINGLE, waitStrategy); this.ringBuffer = queue.getRingBuffer(); this.transformFunction = transformFunction; } @@ -103,9 +107,13 @@ public void close() { } } - protected void setHandlers(HoodieConsumer consumer) { + protected void setHandlers(HoodieConsumer consumer) { queue.handleEventsWith((event, sequence, endOfBatch) -> { - consumer.consume(event.get()); + try { + consumer.consume(event.get()); + } catch (Exception e) { + LOG.error("Failed consuming records", e); + } }); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java index 43b2f06103097..770041734239a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java @@ -18,12 +18,6 @@ package org.apache.hudi.common.util.queue; -import org.apache.hudi.keygen.constant.KeyGeneratorType; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - /** * Types of {@link org.apache.hudi.common.util.queue.HoodieExecutor}. */ @@ -46,12 +40,5 @@ public enum ExecutorType { * The disadvantage is that the executor is a single-write-single-read model, cannot support functions such as speed limit * and can not de-coupe the network read (shuffle read) and network write (writing objects/files to storage) anymore. */ - SIMPLE; - - public static List getNames() { - List names = new ArrayList<>(ExecutorType.values().length); - Arrays.stream(KeyGeneratorType.values()) - .forEach(x -> names.add(x.name())); - return names; - } + SIMPLE } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java similarity index 53% rename from hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java index 8d9fcc892bebb..e175c22d93070 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java @@ -18,9 +18,6 @@ package org.apache.hudi.common.util.queue; -import static org.apache.hudi.common.util.ValidationUtils.checkState; - -import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -29,31 +26,31 @@ import java.util.function.Function; /** - * Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no inner message queue and no inner lock. - * Consuming and writing records from iterator directly. + * Simple implementation of the {@link HoodieExecutor} interface assuming single-writer/single-reader + * mode allowing it to consume from the input {@link Iterator} directly avoiding the need for + * any internal materialization (ie queueing). * - * Compared with queue based Executor - * Advantages: there is no need for additional memory and cpu resources due to lock or multithreading. - * Disadvantages: lost some benefits such as speed limit. And maybe lower throughput. + *

+ * Such executor is aimed primarily at allowing + * the production-consumption chain to run w/ as little overhead as possible, at the expense of + * limited parallelism and therefore throughput, which is not an issue for execution environments + * such as Spark, where it's used primarily in a parallelism constraint environment (on executors) */ -public class SimpleHoodieExecutor implements HoodieExecutor { +public class SimpleExecutor implements HoodieExecutor { - private static final Logger LOG = LogManager.getLogger(SimpleHoodieExecutor.class); + private static final Logger LOG = LogManager.getLogger(SimpleExecutor.class); + // Record iterator (producer) + private final Iterator itr; // Consumer - protected final Option> consumer; - // records iterator - protected final Iterator it; - private final Function transformFunction; + private final HoodieConsumer consumer; - public SimpleHoodieExecutor(final Iterator inputItr, HoodieConsumer consumer, - Function transformFunction) { - this(inputItr, Option.of(consumer), transformFunction); - } + private final Function transformFunction; - public SimpleHoodieExecutor(final Iterator inputItr, Option> consumer, - Function transformFunction) { - this.it = inputItr; + public SimpleExecutor(Iterator inputItr, + HoodieConsumer consumer, + Function transformFunction) { + this.itr = inputItr; this.consumer = consumer; this.transformFunction = transformFunction; } @@ -63,18 +60,16 @@ public SimpleHoodieExecutor(final Iterator inputItr, Option name) { - DisruptorWaitStrategyType strategyType = name.isPresent() ? DisruptorWaitStrategyType.valueOf(name.get().toUpperCase()) : BLOCKING_WAIT; + public static WaitStrategy build(String name) { + DisruptorWaitStrategyType strategyType = DisruptorWaitStrategyType.valueOf(name); switch (strategyType) { case BLOCKING_WAIT: