diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 6bbcec370eb7b..13921fc70f403 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -157,9 +157,11 @@ public class HoodieWriteConfig extends HoodieConfig { .defaultValue(BOUNDED_IN_MEMORY.name()) .withValidValues(BOUNDED_IN_MEMORY.name(), DISRUPTOR.name()) .withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue." - + "default value is BOUNDED_IN_MEMORY which use a bounded in-memory queue using LinkedBlockingQueue." - + "Also users could use DISRUPTOR, which use disruptor as a lock free message queue " - + "to gain better writing performance if lock was the bottleneck. Although DISRUPTOR_EXECUTOR is still an experimental feature."); + + "BOUNDED_IN_MEMORY(default): Use LinkedBlockingQueue as a bounded in-memory queue, this queue will use extra lock to balance producers and consumer" + + "DISRUPTOR: Use disruptor which a lock free message queue as inner message, this queue may gain better writing performance if lock was the bottleneck. " + + "SIMPLE: Executor with no inner message queue and no inner lock. Consuming and writing records from iterator directly. Compared with BIM and DISRUPTOR, " + + "this queue has no need for additional memory and cpu resources due to lock or multithreading, but also lost some benefits such as speed limit. " + + "Although DISRUPTOR_EXECUTOR and SIMPLE are still in experimental."); public static final ConfigProperty KEYGENERATOR_TYPE = ConfigProperty .key("hoodie.datasource.write.keygenerator.type") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 2caa20c69c5c8..de94f2eff0af8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -42,7 +42,7 @@ import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.util.QueueBasedExecutorFactory; +import org.apache.hudi.util.ExecutorFactory; import org.apache.avro.Schema; import org.apache.avro.SchemaCompatibility; @@ -108,7 +108,7 @@ public void runMerge(HoodieTable table, || !isPureProjection || baseFile.getBootstrapBaseFile().isPresent(); - HoodieExecutor wrapper = null; + HoodieExecutor wrapper = null; try { Iterator recordIterator; @@ -135,7 +135,7 @@ public void runMerge(HoodieTable table, recordSchema = isPureProjection ? writerSchema : readerSchema; } - wrapper = QueueBasedExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> { + wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> { // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be put into queue of QueueBasedExecutorFactory. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java similarity index 88% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java index 349f1e1164a79..b9e7f06f84859 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.common.util.queue.SimpleHoodieExecutor; import org.apache.hudi.common.util.queue.DisruptorExecutor; import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.common.util.queue.HoodieExecutor; @@ -30,16 +31,16 @@ import java.util.Iterator; import java.util.function.Function; -public class QueueBasedExecutorFactory { +public class ExecutorFactory { - public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, + public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, Iterator inputItr, HoodieConsumer consumer, Function transformFunction) { return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop()); } - public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, + public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, Iterator inputItr, HoodieConsumer consumer, Function transformFunction, @@ -53,6 +54,8 @@ public static HoodieExecutor create(HoodieWriteConfig hoodieC case DISRUPTOR: return new DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr, consumer, transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(), preExecuteRunnable); + case SIMPLE: + return new SimpleHoodieExecutor<>(inputItr, consumer, transformFunction); default: throw new HoodieException("Unsupported Executor Type " + executorType); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java index 8a0eb8beca75f..6979716624910 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java @@ -28,7 +28,7 @@ import org.apache.hudi.io.ExplicitWriteHandleFactory; import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.util.QueueBasedExecutorFactory; +import org.apache.hudi.util.ExecutorFactory; import java.util.Iterator; import java.util.List; @@ -56,10 +56,11 @@ public FlinkLazyInsertIterable(Iterator> recordItr, @Override protected List computeNext() { // Executor service used for launching writer thread. - HoodieExecutor, HoodieInsertValueGenResult, List> bufferedIteratorExecutor = null; + HoodieExecutor> bufferedIteratorExecutor = null; try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); - bufferedIteratorExecutor = QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(), getCloningTransformer(schema, hoodieConfig)); + bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(), + getCloningTransformer(schema, hoodieConfig)); final List result = bufferedIteratorExecutor.execute(); checkState(result != null && !result.isEmpty()); return result; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java index b5b131e64ed7a..5113b3406801e 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java @@ -27,7 +27,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.util.QueueBasedExecutorFactory; +import org.apache.hudi.util.ExecutorFactory; import java.util.Iterator; import java.util.List; @@ -59,12 +59,12 @@ public JavaLazyInsertIterable(Iterator> recordItr, @Override protected List computeNext() { // Executor service used for launching writer thread. - HoodieExecutor, HoodieInsertValueGenResult, List> bufferedIteratorExecutor = + HoodieExecutor> bufferedIteratorExecutor = null; try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); bufferedIteratorExecutor = - QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getCloningTransformer(schema)); + ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getCloningTransformer(schema)); final List result = bufferedIteratorExecutor.execute(); checkState(result != null && !result.isEmpty()); return result; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index fec382a6a611d..7cb2b27e17115 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -29,7 +29,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; -import org.apache.hudi.util.QueueBasedExecutorFactory; +import org.apache.hudi.util.ExecutorFactory; import java.util.Iterator; import java.util.List; @@ -79,14 +79,14 @@ public SparkLazyInsertIterable(Iterator> recordItr, @Override protected List computeNext() { // Executor service used for launching writer thread. - HoodieExecutor> bufferedIteratorExecutor = null; + HoodieExecutor> bufferedIteratorExecutor = null; try { Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); if (useWriterSchema) { schema = HoodieAvroUtils.addMetadataFields(schema); } - bufferedIteratorExecutor = QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), + bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getCloningTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); final List result = bufferedIteratorExecutor.execute(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java new file mode 100644 index 0000000000000..bbc85efd376a6 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.queue.HoodieConsumer; +import org.apache.hudi.common.util.queue.SimpleHoodieExecutor; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.Tuple2; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { + + private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); + + @BeforeEach + public void setUp() throws Exception { + initTestDataGenerator(); + initExecutorServiceWithFixedThreadPool(2); + } + + @AfterEach + public void tearDown() throws Exception { + cleanupResources(); + } + + @Test + public void testExecutor() { + + final List hoodieRecords = dataGen.generateInserts(instantTime, 128); + final List consumedRecords = new ArrayList<>(); + + HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); + when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8)); + HoodieConsumer, Integer> consumer = + new HoodieConsumer, Integer>() { + private int count = 0; + + @Override + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) throws Exception { + consumedRecords.add(record.getResult()); + count++; + } + + @Override + public Integer finish() { + return count; + } + }; + SimpleHoodieExecutor>, Integer> exec = null; + + try { + exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + + int result = exec.execute(); + // It should buffer and write 128 records + assertEquals(128, result); + + // collect all records and assert that consumed records are identical to produced ones + // assert there's no tampering, and that the ordering is preserved + assertEquals(hoodieRecords, consumedRecords); + + } finally { + if (exec != null) { + exec.shutdownNow(); + } + } + } + + // Test to ensure that we are reading all records from queue iterator in the same order + // without any exceptions. + @SuppressWarnings("unchecked") + @Test + @Timeout(value = 60) + public void testRecordReading() { + + final List hoodieRecords = dataGen.generateInserts(instantTime, 100); + ArrayList beforeRecord = new ArrayList<>(); + ArrayList beforeIndexedRecord = new ArrayList<>(); + ArrayList afterRecord = new ArrayList<>(); + ArrayList afterIndexedRecord = new ArrayList<>(); + + hoodieRecords.forEach(record -> { + final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record; + beforeRecord.add(originalRecord); + try { + final Option originalInsertValue = + originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA); + beforeIndexedRecord.add(originalInsertValue.get()); + } catch (IOException e) { + // ignore exception here. + } + }); + + HoodieConsumer, Integer> consumer = + new HoodieConsumer, Integer>() { + private int count = 0; + + @Override + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) throws Exception { + count++; + afterRecord.add((HoodieAvroRecord) record.getResult()); + try { + IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) record.getResult()) + .getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get(); + afterIndexedRecord.add(indexedRecord); + } catch (IOException e) { + //ignore exception here. + } + } + + @Override + public Integer finish() { + return count; + } + }; + + SimpleHoodieExecutor>, Integer> exec = null; + + try { + exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + int result = exec.execute(); + assertEquals(100, result); + + assertEquals(beforeRecord, afterRecord); + assertEquals(beforeIndexedRecord, afterIndexedRecord); + + } finally { + if (exec != null) { + exec.shutdownNow(); + } + } + } + + /** + * Test to ensure exception happend in iterator then we need to stop the simple ingestion. + */ + @SuppressWarnings("unchecked") + @Test + @Timeout(value = 60) + public void testException() { + final int numRecords = 1000; + final String errorMessage = "Exception when iterating records!!!"; + + List pRecs = dataGen.generateInserts(instantTime, numRecords); + InnerIterator iterator = new InnerIterator(pRecs.iterator(), errorMessage, numRecords / 10); + + HoodieConsumer, Integer> consumer = + new HoodieConsumer, Integer>() { + int count = 0; + + @Override + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult payload) throws Exception { + // Read recs and ensure we have covered all producer recs. + final HoodieRecord rec = payload.getResult(); + count++; + } + + @Override + public Integer finish() { + return count; + } + }; + + SimpleHoodieExecutor>, Integer> exec = + new SimpleHoodieExecutor(iterator, consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + + final Throwable thrown = assertThrows(HoodieException.class, exec::execute, + "exception is expected"); + assertTrue(thrown.getMessage().contains(errorMessage)); + } + + class InnerIterator implements Iterator { + + private Iterator iterator; + private AtomicInteger count = new AtomicInteger(0); + private String errorMessage; + private int errorMessageCount; + + public InnerIterator(Iterator iterator, String errorMessage, int errorMessageCount) { + this.iterator = iterator; + this.errorMessage = errorMessage; + this.errorMessageCount = errorMessageCount; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public HoodieRecord next() { + if (count.get() == errorMessageCount) { + throw new HoodieException(errorMessage); + } + + count.incrementAndGet(); + return iterator.next(); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java index a72b24e99e61c..fb3924b21d9d9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java @@ -49,7 +49,7 @@ * is ingested from multiple sources (ie producers) into a singular sink (ie consumer), using * an internal queue to stage the records ingested from producers before these are consumed */ -public abstract class BaseHoodieQueueBasedExecutor implements HoodieExecutor { +public abstract class BaseHoodieQueueBasedExecutor implements HoodieExecutor { private static final long TERMINATE_WAITING_TIME_SECS = 60L; @@ -74,7 +74,7 @@ public BaseHoodieQueueBasedExecutor(List> producers, this.producers = producers; this.consumer = consumer; // Ensure fixed thread for each producer thread - this.producerExecutorService = Executors.newFixedThreadPool(producers.size(), new CustomizedThreadFactory("executor-queue-producer", preExecuteRunnable)); + this.producerExecutorService = Executors.newFixedThreadPool(Math.max(1, producers.size()), new CustomizedThreadFactory("executor-queue-producer", preExecuteRunnable)); // Ensure single thread for consumer this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("executor-queue-consumer", preExecuteRunnable)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java index 05ecb1746c28d..43b2f06103097 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java @@ -38,7 +38,15 @@ public enum ExecutorType { * Executor which orchestrates concurrent producers and consumers communicating through disruptor as a lock free message queue * to gain better writing performance. Although DisruptorExecutor is still an experimental feature. */ - DISRUPTOR; + DISRUPTOR, + + /** + * Executor with no inner message queue and no inner lock. Consuming and writing records from iterator directly. + * The advantage is that there is no need for additional memory and cpu resources due to lock or multithreading. + * The disadvantage is that the executor is a single-write-single-read model, cannot support functions such as speed limit + * and can not de-coupe the network read (shuffle read) and network write (writing objects/files to storage) anymore. + */ + SIMPLE; public static List getNames() { List names = new ArrayList<>(ExecutorType.values().length); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java index bb010610b35f3..cb93e4f940dcd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java @@ -19,11 +19,9 @@ package org.apache.hudi.common.util.queue; /** - * HoodieExecutor which orchestrates concurrent producers and consumers communicating through a bounded in message queue. - * - * TODO cleanup unused generics + * HoodieExecutor which orchestrates concurrent producers and consumers communicating. */ -public interface HoodieExecutor { +public interface HoodieExecutor { /** * Main API to diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java new file mode 100644 index 0000000000000..8d9fcc892bebb --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.Iterator; +import java.util.function.Function; + +/** + * Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no inner message queue and no inner lock. + * Consuming and writing records from iterator directly. + * + * Compared with queue based Executor + * Advantages: there is no need for additional memory and cpu resources due to lock or multithreading. + * Disadvantages: lost some benefits such as speed limit. And maybe lower throughput. + */ +public class SimpleHoodieExecutor implements HoodieExecutor { + + private static final Logger LOG = LogManager.getLogger(SimpleHoodieExecutor.class); + + // Consumer + protected final Option> consumer; + // records iterator + protected final Iterator it; + private final Function transformFunction; + + public SimpleHoodieExecutor(final Iterator inputItr, HoodieConsumer consumer, + Function transformFunction) { + this(inputItr, Option.of(consumer), transformFunction); + } + + public SimpleHoodieExecutor(final Iterator inputItr, Option> consumer, + Function transformFunction) { + this.it = inputItr; + this.consumer = consumer; + this.transformFunction = transformFunction; + } + + /** + * Consuming records from input iterator directly without any producers and inner message queue. + */ + @Override + public E execute() { + checkState(this.consumer.isPresent()); + + try { + LOG.info("Starting consumer, consuming records from the records iterator directly"); + while (it.hasNext()) { + O payload = transformFunction.apply(it.next()); + consumer.get().consume(payload); + } + + return consumer.get().finish(); + } catch (Exception e) { + LOG.error("Error consuming records in SimpleHoodieExecutor", e); + throw new HoodieException(e); + } + } + + @Override + public void shutdownNow() { + // no-op + } + + @Override + public boolean awaitTermination() { + // no-op + return true; + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala index b1d2517374c7e..8ba983246e600 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala @@ -32,7 +32,7 @@ object BoundInMemoryExecutorBenchmark extends HoodieBenchmarkBase { protected val spark: SparkSession = getSparkSession - val recordNumber = 1000000 + val recordNumber = 10000000 def getSparkSession: SparkSession = SparkSession.builder() .master("local[*]") @@ -66,18 +66,20 @@ object BoundInMemoryExecutorBenchmark extends HoodieBenchmarkBase { } /** - * OpenJDK 64-Bit Server VM 1.8.0_161-b14 on Linux 3.10.0-693.21.1.el7.x86_64 + * This benchmark has been run w/ unconstrained parallelism which is beneficial to Disruptor more than it's for Simple + * + * OpenJDK 64-Bit Server VM 1.8.0_342-b07 on Linux 5.10.62-55.141.amzn2.x86_64 * Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz * COW Ingestion: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative * ------------------------------------------------------------------------------------------------------------------------ - * BoundInMemory Executor 5629 5765 192 0.2 5628.9 1.0X - * Disruptor Executor 2772 2862 127 0.4 2772.2 2.0X - * + * BoundInMemory Executor 34661 35143 292 0.3 3466.1 1.0X + * Simple Executor 17347 17796 681 0.6 1734.7 2.0X + * Disruptor Executor 15803 16535 936 0.6 1580.3 2.2X */ private def cowTableDisruptorExecutorBenchmark(tableName: String = "executorBenchmark"): Unit = { val df = createDataFrame(recordNumber) withTempDir {f => - val benchmark = new HoodieBenchmark("COW Ingestion", recordNumber) + val benchmark = new HoodieBenchmark("COW Ingestion", recordNumber, 5) benchmark.addCase("BoundInMemory Executor") { _ => val finalTableName = tableName + Random.nextInt(10000) df.write.format("hudi") @@ -91,11 +93,34 @@ object BoundInMemoryExecutorBenchmark extends HoodieBenchmarkBase { .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.datasource.write.operation", "bulk_insert") .option("hoodie.datasource.write.row.writer.enable", "false") - .option("hoodie.bulkinsert.shuffle.parallelism", "1") + .option("hoodie.bulkinsert.shuffle.parallelism", "4") + .option("hoodie.upsert.shuffle.parallelism", "2") + .option("hoodie.delete.shuffle.parallelism", "2") + .option("hoodie.populate.meta.fields", "false") + .option("hoodie.table.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator") + .save(new Path(f.getCanonicalPath, finalTableName).toUri.toString) + } + + benchmark.addCase("Simple Executor") { _ => + val finalTableName = tableName + Random.nextInt(10000) + df.write.format("hudi") + .mode(SaveMode.Overwrite) + .option("hoodie.datasource.write.recordkey.field", "c1") + .option("hoodie.datasource.write.partitionpath.field", "c2") + .option("hoodie.table.name", finalTableName) + .option("hoodie.metadata.enable", "false") + .option("hoodie.clean.automatic", "false") + .option("hoodie.bulkinsert.sort.mode", "NONE") + .option("hoodie.insert.shuffle.parallelism", "2") + .option("hoodie.datasource.write.operation", "bulk_insert") + .option("hoodie.datasource.write.row.writer.enable", "false") + .option("hoodie.bulkinsert.shuffle.parallelism", "4") .option("hoodie.upsert.shuffle.parallelism", "2") .option("hoodie.delete.shuffle.parallelism", "2") + .option("hoodie.write.executor.type", "SIMPLE") .option("hoodie.populate.meta.fields", "false") .option("hoodie.table.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator") + .save(new Path(f.getCanonicalPath, finalTableName).toUri.toString) } @@ -112,7 +137,7 @@ object BoundInMemoryExecutorBenchmark extends HoodieBenchmarkBase { .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.datasource.write.operation", "bulk_insert") .option("hoodie.datasource.write.row.writer.enable", "false") - .option("hoodie.bulkinsert.shuffle.parallelism", "1") + .option("hoodie.bulkinsert.shuffle.parallelism", "4") .option("hoodie.upsert.shuffle.parallelism", "2") .option("hoodie.delete.shuffle.parallelism", "2") .option("hoodie.write.executor.type", "DISRUPTOR")