diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java index abb1122289453..fffb5e0e775c1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; +import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult; import org.apache.hudi.io.HoodieWriteHandle; @@ -34,25 +34,27 @@ import java.util.List; import java.util.Map; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + /** * Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles. */ public class CopyOnWriteInsertHandler - extends IteratorBasedQueueConsumer, List> { + implements HoodieConsumer, List> { - private HoodieWriteConfig config; - private String instantTime; - private boolean areRecordsSorted; - private HoodieTable hoodieTable; - private String idPrefix; - private TaskContextSupplier taskContextSupplier; - private WriteHandleFactory writeHandleFactory; + private final HoodieWriteConfig config; + private final String instantTime; + private final boolean areRecordsSorted; + private final HoodieTable hoodieTable; + private final String idPrefix; + private final TaskContextSupplier taskContextSupplier; + private final WriteHandleFactory writeHandleFactory; private final List statuses = new ArrayList<>(); // Stores the open HoodieWriteHandle for each table partition path // If the records are consumed in order, there should be only one open handle in this mapping. // Otherwise, there may be multiple handles. - private Map handles = new HashMap<>(); + private final Map handles = new HashMap<>(); public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime, boolean areRecordsSorted, HoodieTable hoodieTable, String idPrefix, @@ -68,7 +70,7 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime, } @Override - public void consumeOneRecord(HoodieInsertValueGenResult payload) { + public void consume(HoodieInsertValueGenResult payload) { final HoodieRecord insertPayload = payload.record; String partitionPath = insertPayload.getPartitionPath(); HoodieWriteHandle handle = handles.get(partitionPath); @@ -97,13 +99,9 @@ public void consumeOneRecord(HoodieInsertValueGenResult payload) { } @Override - public void finish() { + public List finish() { closeOpenHandles(); - assert statuses.size() > 0; - } - - @Override - public List getResult() { + checkState(statuses.size() > 0); return statuses; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 210e24f561cc8..18e7824c5bbc3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -305,7 +305,7 @@ protected boolean writeRecord(HoodieRecord hoodieRecord, Option hoodieRecord, Option indexedRecord, boolean isDelete) { + private boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord, boolean isDelete) { Option recordMetadata = hoodieRecord.getData().getMetadata(); if (!partitionPath.equals(hoodieRecord.getPartitionPath())) { HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java index 76968c6108d96..fbdb941365b76 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java @@ -20,7 +20,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; +import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.HoodieBootstrapHandle; @@ -29,7 +29,7 @@ /** * Consumer that dequeues records from queue and sends to Merge Handle for writing. */ -public class BootstrapRecordConsumer extends IteratorBasedQueueConsumer { +public class BootstrapRecordConsumer implements HoodieConsumer { private final HoodieBootstrapHandle bootstrapHandle; @@ -38,7 +38,7 @@ public BootstrapRecordConsumer(HoodieBootstrapHandle bootstrapHandle) { } @Override - public void consumeOneRecord(HoodieRecord record) { + public void consume(HoodieRecord record) { try { bootstrapHandle.write(record, ((HoodieRecordPayload) record.getData()) .getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields())); @@ -48,10 +48,7 @@ public void consumeOneRecord(HoodieRecord record) { } @Override - public void finish() {} - - @Override - protected Void getResult() { + public Void finish() { return null; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java index 95e3dadf85c67..8508e18ad66c4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.utils.MergingIterator; -import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; +import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -65,7 +65,7 @@ protected Iterator getMergingIterator(HoodieTable tab /** * Consumer that dequeues records from queue and sends to Merge Handle. */ - protected static class UpdateHandler extends IteratorBasedQueueConsumer { + protected static class UpdateHandler implements HoodieConsumer { private final HoodieMergeHandle upsertHandle; @@ -74,15 +74,12 @@ protected UpdateHandler(HoodieMergeHandle upsertHandle) { } @Override - public void consumeOneRecord(GenericRecord record) { + public void consume(GenericRecord record) { upsertHandle.write(record); } @Override - public void finish() {} - - @Override - protected Void getResult() { + public Void finish() { return null; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java index ba8ddbd1ec1f1..349f1e1164a79 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java @@ -18,11 +18,12 @@ package org.apache.hudi.util; +import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.DisruptorExecutor; import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.common.util.queue.HoodieExecutor; -import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; +import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -31,11 +32,18 @@ public class QueueBasedExecutorFactory { - /** - * Create a new hoodie executor instance on demand. - */ - public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, Iterator inputItr, IteratorBasedQueueConsumer consumer, - Function transformFunction, Runnable preExecuteRunnable) { + public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, + Iterator inputItr, + HoodieConsumer consumer, + Function transformFunction) { + return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop()); + } + + public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, + Iterator inputItr, + HoodieConsumer consumer, + Function transformFunction, + Runnable preExecuteRunnable) { ExecutorType executorType = hoodieConfig.getExecutorType(); switch (executorType) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java index 9dab2170e09c1..86f38c6f5ff65 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java @@ -21,17 +21,19 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; +import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.io.HoodieWriteHandle; import java.util.ArrayList; import java.util.List; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + /** * Consumes stream of hoodie records from in-memory queue and writes to one explicit create handle. */ public class ExplicitWriteHandler - extends IteratorBasedQueueConsumer, List> { + implements HoodieConsumer, List> { private final List statuses = new ArrayList<>(); @@ -42,19 +44,15 @@ public ExplicitWriteHandler(HoodieWriteHandle handle) { } @Override - public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult payload) { + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult payload) { final HoodieRecord insertPayload = payload.record; handle.write(insertPayload, payload.insertValue, payload.exception); } @Override - public void finish() { + public List finish() { closeOpenHandle(); - assert statuses.size() > 0; - } - - @Override - public List getResult() { + checkState(statuses.size() > 0); return statuses; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java index d0ec4e5ae6b03..d99a4e8bf7e16 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java @@ -18,24 +18,24 @@ package org.apache.hudi.execution; +import org.apache.avro.Schema; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; +import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.ExplicitWriteHandleFactory; import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.Schema; +import org.apache.hudi.util.QueueBasedExecutorFactory; import java.util.Iterator; import java.util.List; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + /** * Flink lazy iterable that supports explicit write handler. * @@ -57,14 +57,13 @@ public FlinkLazyInsertIterable(Iterator> recordItr, @Override protected List computeNext() { // Executor service used for launching writer thread. - BoundedInMemoryExecutor, HoodieInsertValueGenResult, List> bufferedIteratorExecutor = - null; + HoodieExecutor, HoodieInsertValueGenResult, List> bufferedIteratorExecutor = null; try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); - bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), - Option.of(getExplicitInsertHandler()), getTransformFunction(schema, hoodieConfig)); + bufferedIteratorExecutor = QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(), + getTransformFunction(schema, hoodieConfig)); final List result = bufferedIteratorExecutor.execute(); - assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); + checkState(result != null && !result.isEmpty()); return result; } catch (Exception e) { throw new HoodieException(e); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java index 9821aedc875cd..bc07513450936 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java @@ -18,23 +18,23 @@ package org.apache.hudi.execution; +import org.apache.avro.Schema; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; +import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.Schema; +import org.apache.hudi.util.QueueBasedExecutorFactory; import java.util.Iterator; import java.util.List; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + public class JavaLazyInsertIterable extends HoodieLazyInsertIterable { public JavaLazyInsertIterable(Iterator> recordItr, boolean areRecordsSorted, @@ -60,14 +60,14 @@ public JavaLazyInsertIterable(Iterator> recordItr, @Override protected List computeNext() { // Executor service used for launching writer thread. - BoundedInMemoryExecutor, HoodieInsertValueGenResult, List> bufferedIteratorExecutor = + HoodieExecutor, HoodieInsertValueGenResult, List> bufferedIteratorExecutor = null; try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); bufferedIteratorExecutor = - new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema)); + QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getTransformFunction(schema)); final List result = bufferedIteratorExecutor.execute(); - assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); + checkState(result != null && !result.isEmpty()); return result; } catch (Exception e) { throw new HoodieException(e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index d2555f95980b7..acb9a3169c642 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -35,6 +35,8 @@ import java.util.Iterator; import java.util.List; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + public class SparkLazyInsertIterable extends HoodieLazyInsertIterable { private boolean useWriterSchema; @@ -89,7 +91,7 @@ protected List computeNext() { getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); final List result = bufferedIteratorExecutor.execute(); - assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); + checkState(result != null && !result.isEmpty()); return result; } catch (Exception e) { throw new HoodieException(e); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java index 040634da4c335..26b8fc36465a5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; +import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.testutils.HoodieClientTestHarness; @@ -76,18 +76,18 @@ public void testExecutor() { HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); - IteratorBasedQueueConsumer, Integer> consumer = - new IteratorBasedQueueConsumer, Integer>() { + HoodieConsumer, Integer> consumer = + new HoodieConsumer, Integer>() { private int count = 0; @Override - public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { count++; } @Override - protected Integer getResult() { + public Integer finish() { return count; } }; @@ -100,7 +100,7 @@ protected Integer getResult() { assertEquals(100, result); // There should be no remaining records in the buffer - assertFalse(executor.isRemaining()); + assertFalse(executor.isRunning()); } finally { if (executor != null) { executor.shutdownNow(); @@ -115,11 +115,11 @@ public void testInterruptExecutor() { HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); - IteratorBasedQueueConsumer, Integer> consumer = - new IteratorBasedQueueConsumer, Integer>() { + HoodieConsumer, Integer> consumer = + new HoodieConsumer, Integer>() { @Override - public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { try { while (true) { Thread.sleep(1000); @@ -130,27 +130,26 @@ public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult } @Override - protected Integer getResult() { + public Integer finish() { return 0; } }; - BoundedInMemoryExecutor>, Integer> executor = null; - try { - executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, - getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); - BoundedInMemoryExecutor>, Integer> finalExecutor = executor; + BoundedInMemoryExecutor>, Integer> executor = + new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, + getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); - Thread.currentThread().interrupt(); + // Interrupt the current thread (therefore triggering executor to throw as soon as it + // invokes [[get]] on the [[CompletableFuture]]) + Thread.currentThread().interrupt(); - assertThrows(HoodieException.class, () -> finalExecutor.execute()); - assertTrue(Thread.interrupted()); - } finally { - if (executor != null) { - executor.shutdownNow(); - executor.awaitTermination(); - } - } + assertThrows(HoodieException.class, executor::execute); + + // Validate that interrupted flag is reset, after [[InterruptedException]] is thrown + assertTrue(Thread.interrupted()); + + executor.shutdownNow(); + executor.awaitTermination(); } @Test @@ -169,14 +168,14 @@ public GenericRecord next() { } }; - IteratorBasedQueueConsumer, Integer> consumer = - new IteratorBasedQueueConsumer, Integer>() { + HoodieConsumer, Integer> consumer = + new HoodieConsumer, Integer>() { @Override - public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { } @Override - protected Integer getResult() { + public Integer finish() { return 0; } }; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java index c36554bb64a7e..0826d192ee73d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java @@ -26,7 +26,7 @@ import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SizeEstimator; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueIterable; +import org.apache.hudi.common.util.queue.BoundedInMemoryQueue; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; @@ -83,12 +83,12 @@ public void tearDown() throws Exception { public void testRecordReading() throws Exception { final int numRecords = 128; final List hoodieRecords = dataGen.generateInserts(instantTime, numRecords); - final BoundedInMemoryQueueIterable queue = - new BoundedInMemoryQueueIterable(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + final BoundedInMemoryQueue queue = + new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); // Produce Future resFuture = executorService.submit(() -> { new IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue); - queue.close(); + queue.seal(); return true; }); final Iterator originalRecordIterator = hoodieRecords.iterator(); @@ -123,8 +123,8 @@ public void testCompositeProducerRecordReading() throws Exception { final int numProducers = 40; final List> recs = new ArrayList<>(); - final BoundedInMemoryQueueIterable queue = - new BoundedInMemoryQueueIterable(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + final BoundedInMemoryQueue queue = + new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); // Record Key to Map> keyToProducerAndIndexMap = new HashMap<>(); @@ -174,7 +174,7 @@ public void testCompositeProducerRecordReading() throws Exception { for (Future f : futureList) { f.get(); } - queue.close(); + queue.seal(); } catch (Exception e) { throw new RuntimeException(e); } @@ -222,8 +222,8 @@ public void testMemoryLimitForBuffering() throws Exception { getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord) hoodieRecords.get(0)); final long objSize = sizeEstimator.sizeEstimate(payload); final long memoryLimitInBytes = recordLimit * objSize; - final BoundedInMemoryQueueIterable queue = - new BoundedInMemoryQueueIterable(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + final BoundedInMemoryQueue queue = + new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); // Produce executorService.submit(() -> { @@ -275,8 +275,8 @@ public void testException() throws Exception { // first let us throw exception from queueIterator reader and test that queueing thread // stops and throws // correct exception back. - BoundedInMemoryQueueIterable>> queue1 = - new BoundedInMemoryQueueIterable(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + BoundedInMemoryQueue>> queue1 = + new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); // Produce Future resFuture = executorService.submit(() -> { @@ -303,8 +303,8 @@ public void testException() throws Exception { final Iterator mockHoodieRecordsIterator = mock(Iterator.class); when(mockHoodieRecordsIterator.hasNext()).thenReturn(true); when(mockHoodieRecordsIterator.next()).thenThrow(expectedException); - BoundedInMemoryQueueIterable>> queue2 = - new BoundedInMemoryQueueIterable(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + BoundedInMemoryQueue>> queue2 = + new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); // Produce Future res = executorService.submit(() -> { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java index 2351f2bbed6df..b3ba5413c25dd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; +import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.common.util.queue.DisruptorExecutor; import org.apache.hudi.common.util.queue.WaitStrategyFactory; import org.apache.hudi.config.HoodieWriteConfig; @@ -78,19 +78,19 @@ public void testExecutor() { HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8)); - IteratorBasedQueueConsumer, Integer> consumer = - new IteratorBasedQueueConsumer, Integer>() { + HoodieConsumer, Integer> consumer = + new HoodieConsumer, Integer>() { private int count = 0; @Override - public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { consumedRecords.add(record.record); count++; } @Override - protected Integer getResult() { + public Integer finish() { return count; } }; @@ -103,7 +103,7 @@ protected Integer getResult() { // It should buffer and write 100 records assertEquals(128, result); // There should be no remaining records in the buffer - assertFalse(exec.isRemaining()); + assertFalse(exec.isRunning()); // collect all records and assert that consumed records are identical to produced ones // assert there's no tampering, and that the ordering is preserved @@ -126,11 +126,11 @@ public void testInterruptExecutor() { HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(1024)); - IteratorBasedQueueConsumer, Integer> consumer = - new IteratorBasedQueueConsumer, Integer>() { + HoodieConsumer, Integer> consumer = + new HoodieConsumer, Integer>() { @Override - public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { try { Thread.currentThread().wait(); } catch (InterruptedException ie) { @@ -139,7 +139,7 @@ public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult } @Override - protected Integer getResult() { + public Integer finish() { return 0; } }; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java index d296d56440031..de19296202846 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java @@ -18,8 +18,6 @@ package org.apache.hudi.execution; -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction; - import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieRecord; @@ -27,11 +25,11 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.util.queue.DisruptorExecutor; import org.apache.hudi.common.util.queue.DisruptorMessageQueue; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; +import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.common.util.queue.HoodieProducer; -import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; -import org.apache.hudi.common.util.queue.DisruptorExecutor; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.common.util.queue.WaitStrategyFactory; import org.apache.hudi.config.HoodieWriteConfig; @@ -43,6 +41,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import scala.Tuple2; import java.io.IOException; import java.lang.reflect.Method; @@ -56,12 +55,11 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import scala.Tuple2; - +import static org.apache.hudi.exception.ExceptionUtil.getRootCause; +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -112,13 +110,13 @@ public void testRecordReading() throws Exception { HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(16)); - IteratorBasedQueueConsumer, Integer> consumer = - new IteratorBasedQueueConsumer, Integer>() { + HoodieConsumer, Integer> consumer = + new HoodieConsumer, Integer>() { private int count = 0; @Override - public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { count++; afterRecord.add((HoodieAvroRecord) record.record); try { @@ -131,7 +129,7 @@ public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult } @Override - protected Integer getResult() { + public Integer finish() { return count; } }; @@ -145,7 +143,7 @@ protected Integer getResult() { // It should buffer and write 100 records assertEquals(100, result); // There should be no remaining records in the buffer - assertFalse(exec.isRemaining()); + assertFalse(exec.isRunning()); assertEquals(beforeRecord, afterRecord); assertEquals(beforeIndexedRecord, afterIndexedRecord); @@ -221,11 +219,11 @@ public void run() { IntStream.range(0, numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> 0)); // setup consumer and start disruptor - IteratorBasedQueueConsumer, Integer> consumer = - new IteratorBasedQueueConsumer, Integer>() { + HoodieConsumer, Integer> consumer = + new HoodieConsumer, Integer>() { @Override - public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult payload) { + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult payload) { // Read recs and ensure we have covered all producer recs. final HoodieRecord rec = payload.record; Pair producerPos = keyToProducerAndIndexMap.get(rec.getRecordKey()); @@ -237,12 +235,12 @@ public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult } @Override - protected Integer getResult() { + public Integer finish() { return 0; } }; - Method setHandlersFunc = queue.getClass().getDeclaredMethod("setHandlers", IteratorBasedQueueConsumer.class); + Method setHandlersFunc = queue.getClass().getDeclaredMethod("setHandlers", HoodieConsumer.class); setHandlersFunc.setAccessible(true); setHandlersFunc.invoke(queue, consumer); @@ -309,19 +307,19 @@ public void run() { } - IteratorBasedQueueConsumer, Integer> consumer = - new IteratorBasedQueueConsumer, Integer>() { + HoodieConsumer, Integer> consumer = + new HoodieConsumer, Integer>() { int count = 0; @Override - public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult payload) { + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult payload) { // Read recs and ensure we have covered all producer recs. final HoodieRecord rec = payload.record; count++; } @Override - protected Integer getResult() { + public Integer finish() { return count; } }; @@ -332,6 +330,7 @@ protected Integer getResult() { final Throwable thrown = assertThrows(HoodieException.class, exec::execute, "exception is expected"); - assertTrue(thrown.getMessage().contains("Error producing records in disruptor executor")); + + assertEquals("Exception when produce records!!!", getRootCause(thrown).getMessage()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java index 347bcdf77a6ac..03bd471b606f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java @@ -18,7 +18,7 @@ package org.apache.hudi.common.util; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueIterable; +import org.apache.hudi.common.util.queue.BoundedInMemoryQueue; import org.apache.hudi.exception.HoodieException; import org.apache.parquet.hadoop.ParquetReader; @@ -27,7 +27,7 @@ /** * This class wraps a parquet reader and provides an iterator based api to read from a parquet file. This is used in - * {@link BoundedInMemoryQueueIterable} + * {@link BoundedInMemoryQueue} */ public class ParquetReaderIterator implements ClosableIterator { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java new file mode 100644 index 0000000000000..a72b24e99e61c --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import org.apache.hudi.common.util.CustomizedThreadFactory; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.FutureUtils.allOf; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +/** + * Base class for multi-threaded queue-based executor which + * + *
    + *
  • Can be ingesting instances of type {@link I} from multiple {@link HoodieProducer}s + * into the {@link HoodieMessageQueue}
  • + *
  • Can be ingesting instances of type {@link I} into an (optional) {@link HoodieConsumer} + * from the internal {@link HoodieMessageQueue} (when no consumer is provided records are + * simply accumulated into internal queue)
  • + *
+ * + * Such executors are allowing to setup an ingestion pipeline w/ N:1 configuration, where data + * is ingested from multiple sources (ie producers) into a singular sink (ie consumer), using + * an internal queue to stage the records ingested from producers before these are consumed + */ +public abstract class BaseHoodieQueueBasedExecutor implements HoodieExecutor { + + private static final long TERMINATE_WAITING_TIME_SECS = 60L; + + private final Logger logger = LogManager.getLogger(getClass()); + + // Executor service used for launching write thread. + private final ExecutorService producerExecutorService; + // Executor service used for launching read thread. + private final ExecutorService consumerExecutorService; + // Queue + protected final HoodieMessageQueue queue; + // Producers + private final List> producers; + // Consumer + protected final Option> consumer; + + public BaseHoodieQueueBasedExecutor(List> producers, + Option> consumer, + HoodieMessageQueue queue, + Runnable preExecuteRunnable) { + this.queue = queue; + this.producers = producers; + this.consumer = consumer; + // Ensure fixed thread for each producer thread + this.producerExecutorService = Executors.newFixedThreadPool(producers.size(), new CustomizedThreadFactory("executor-queue-producer", preExecuteRunnable)); + // Ensure single thread for consumer + this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("executor-queue-consumer", preExecuteRunnable)); + } + + protected void doProduce(HoodieMessageQueue queue, HoodieProducer producer) { + logger.info("Starting producer, populating records into the queue"); + try { + producer.produce(queue); + logger.info("Finished producing records into the queue"); + } catch (Exception e) { + logger.error("Failed to produce records", e); + queue.markAsFailed(e); + throw new HoodieException("Failed to produce records", e); + } + } + + protected abstract void doConsume(HoodieMessageQueue queue, HoodieConsumer consumer); + + /** + * Start producing + */ + public final CompletableFuture startProducingAsync() { + return allOf(producers.stream() + .map(producer -> CompletableFuture.supplyAsync(() -> { + doProduce(queue, producer); + return (Void) null; + }, producerExecutorService)) + .collect(Collectors.toList()) + ) + .thenApply(ignored -> (Void) null) + .whenComplete((result, throwable) -> { + // Regardless of how producing has completed, we have to close producers + // to make sure resources are properly cleaned up + producers.forEach(HoodieProducer::close); + // Mark production as done so that consumer will be able to exit + queue.seal(); + }); + } + + /** + * Start consumer + */ + private CompletableFuture startConsumingAsync() { + return consumer.map(consumer -> + CompletableFuture.supplyAsync(() -> { + doConsume(queue, consumer); + return (Void) null; + }, consumerExecutorService) + ) + .orElse(CompletableFuture.completedFuture(null)); + } + + @Override + public final boolean awaitTermination() { + // if current thread has been interrupted before awaitTermination was called, we still give + // executor a chance to proceeding. So clear the interrupt flag and reset it if needed before return. + boolean interruptedBefore = Thread.interrupted(); + boolean producerTerminated = false; + boolean consumerTerminated = false; + try { + producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS); + consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + // fail silently for any other interruption + } + // reset interrupt flag if needed + if (interruptedBefore) { + Thread.currentThread().interrupt(); + } + + return producerTerminated && consumerTerminated; + } + + @Override + public void shutdownNow() { + producerExecutorService.shutdownNow(); + consumerExecutorService.shutdownNow(); + } + + public boolean isRunning() { + return !queue.isEmpty(); + } + + /** + * Main API to run both production and consumption. + */ + @Override + public E execute() { + try { + checkState(this.consumer.isPresent()); + // Start consuming/producing asynchronously + CompletableFuture consuming = startConsumingAsync(); + CompletableFuture producing = startProducingAsync(); + + // NOTE: To properly support mode when there's no consumer, we have to fall back + // to producing future as the trigger for us to shut down the queue + return producing.thenCombine(consuming, (aVoid, anotherVoid) -> null) + .whenComplete((ignored, throwable) -> { + // Close the queue to release the resources + queue.close(); + }) + .thenApply(ignored -> consumer.get().finish()) + // Block until producing and consuming both finish + .get(); + } catch (Exception e) { + if (e instanceof InterruptedException) { + // In case {@code InterruptedException} was thrown, resetting the interrupted flag + // of the thread, we reset it (to true) again to permit subsequent handlers + // to be interrupted as well + Thread.currentThread().interrupt(); + } + + throw new HoodieException(e); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java index 85b5bf98be388..c379a7abcc0d9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java @@ -19,21 +19,15 @@ package org.apache.hudi.common.util.queue; import org.apache.hudi.common.util.DefaultSizeEstimator; -import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SizeEstimator; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.IOException; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.function.Function; /** @@ -41,117 +35,39 @@ * class takes as input the size limit, queue producer(s), consumer and transformer and exposes API to orchestrate * concurrent execution of these actors communicating through a central bounded queue */ -public class BoundedInMemoryExecutor extends HoodieExecutorBase { +public class BoundedInMemoryExecutor extends BaseHoodieQueueBasedExecutor { private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class); - private final HoodieMessageQueue queue; public BoundedInMemoryExecutor(final long bufferLimitInBytes, final Iterator inputItr, - IteratorBasedQueueConsumer consumer, Function transformFunction, Runnable preExecuteRunnable) { - this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, preExecuteRunnable); - } - - public BoundedInMemoryExecutor(final long bufferLimitInBytes, HoodieProducer producer, - Option> consumer, final Function transformFunction) { - this(bufferLimitInBytes, producer, consumer, transformFunction, Functions.noop()); - } - - public BoundedInMemoryExecutor(final long bufferLimitInBytes, HoodieProducer producer, - Option> consumer, final Function transformFunction, Runnable preExecuteRunnable) { - this(bufferLimitInBytes, Collections.singletonList(producer), consumer, transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable); + HoodieConsumer consumer, Function transformFunction, Runnable preExecuteRunnable) { + this(bufferLimitInBytes, Collections.singletonList(new IteratorBasedQueueProducer<>(inputItr)), + Option.of(consumer), transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable); } public BoundedInMemoryExecutor(final long bufferLimitInBytes, List> producers, - Option> consumer, final Function transformFunction, + Option> consumer, final Function transformFunction, final SizeEstimator sizeEstimator, Runnable preExecuteRunnable) { - super(producers, consumer, preExecuteRunnable); - this.queue = new BoundedInMemoryQueueIterable<>(bufferLimitInBytes, transformFunction, sizeEstimator); - } - - /** - * Start all producers at once. - */ - @Override - public CompletableFuture startProducers() { - // Latch to control when and which producer thread will close the queue - final CountDownLatch latch = new CountDownLatch(producers.size()); - - return CompletableFuture.allOf(producers.stream().map(producer -> { - return CompletableFuture.supplyAsync(() -> { - try { - producer.produce(queue); - } catch (Throwable e) { - LOG.error("error producing records", e); - queue.markAsFailed(e); - throw new HoodieException("Error producing records in bounded in memory executor", e); - } finally { - synchronized (latch) { - latch.countDown(); - if (latch.getCount() == 0) { - // Mark production as done so that consumer will be able to exit - try { - queue.close(); - } catch (IOException e) { - throw new HoodieIOException("Catch Exception when closing BoundedInMemoryQueue.", e); - } - } - } - } - return true; - }, producerExecutorService); - }).toArray(CompletableFuture[]::new)); - } - - /** - * Start only consumer. - */ - @Override - protected CompletableFuture startConsumer() { - return consumer.map(consumer -> { - return CompletableFuture.supplyAsync(() -> { - LOG.info("starting consumer thread"); - try { - E result = consumer.consume(queue); - LOG.info("Queue Consumption is done; notifying producer threads"); - return result; - } catch (Exception e) { - LOG.error("error consuming records", e); - queue.markAsFailed(e); - throw new HoodieException(e); - } - }, consumerExecutorService); - }).orElse(CompletableFuture.completedFuture(null)); - } - - @Override - public boolean isRemaining() { - return getQueue().iterator().hasNext(); + super(producers, consumer, new BoundedInMemoryQueue<>(bufferLimitInBytes, transformFunction, sizeEstimator), preExecuteRunnable); } @Override - protected void postAction() { - super.close(); - } - - @Override - public void shutdownNow() { - producerExecutorService.shutdownNow(); - consumerExecutorService.shutdownNow(); - // close queue to force producer stop + protected void doConsume(HoodieMessageQueue queue, HoodieConsumer consumer) { + LOG.info("Starting consumer, consuming records from the queue"); try { - queue.close(); - } catch (IOException e) { - throw new HoodieIOException("catch IOException while closing HoodieMessageQueue", e); + Iterator it = ((BoundedInMemoryQueue) queue).iterator(); + while (it.hasNext()) { + consumer.consume(it.next()); + } + LOG.info("All records from the queue have been consumed"); + } catch (Exception e) { + LOG.error("Error consuming records", e); + queue.markAsFailed(e); + throw new HoodieException(e); } } - @Override - public BoundedInMemoryQueueIterable getQueue() { - return (BoundedInMemoryQueueIterable)queue; - } - - @Override - protected void setup() { - // do nothing. + public Iterator getRecordIterator() { + return ((BoundedInMemoryQueue) queue).iterator(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueIterable.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java similarity index 94% rename from hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueIterable.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java index 47b8c81fc4600..09b61d1be101a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueIterable.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java @@ -49,7 +49,7 @@ * @param input payload data type * @param output payload data type */ -public class BoundedInMemoryQueueIterable extends HoodieIterableMessageQueue { +public class BoundedInMemoryQueue implements HoodieMessageQueue, Iterable { /** Interval used for polling records in the queue. **/ public static final int RECORD_POLL_INTERVAL_SEC = 1; @@ -60,7 +60,7 @@ public class BoundedInMemoryQueueIterable extends HoodieIterableMessageQue /** Maximum records that will be cached. **/ private static final int RECORD_CACHING_LIMIT = 128 * 1024; - private static final Logger LOG = LogManager.getLogger(BoundedInMemoryQueueIterable.class); + private static final Logger LOG = LogManager.getLogger(BoundedInMemoryQueue.class); /** * It indicates number of records to cache. We will be using sampled record's average size to @@ -116,7 +116,7 @@ public class BoundedInMemoryQueueIterable extends HoodieIterableMessageQue * @param memoryLimit MemoryLimit in bytes * @param transformFunction Transformer Function to convert input payload type to stored payload type */ - public BoundedInMemoryQueueIterable(final long memoryLimit, final Function transformFunction) { + public BoundedInMemoryQueue(final long memoryLimit, final Function transformFunction) { this(memoryLimit, transformFunction, new DefaultSizeEstimator() {}); } @@ -127,8 +127,8 @@ public BoundedInMemoryQueueIterable(final long memoryLimit, final Function * @param transformFunction Transformer Function to convert input payload type to stored payload type * @param payloadSizeEstimator Payload Size Estimator */ - public BoundedInMemoryQueueIterable(final long memoryLimit, final Function transformFunction, - final SizeEstimator payloadSizeEstimator) { + public BoundedInMemoryQueue(final long memoryLimit, final Function transformFunction, + final SizeEstimator payloadSizeEstimator) { this.memoryLimit = memoryLimit; this.transformFunction = transformFunction; this.payloadSizeEstimator = payloadSizeEstimator; @@ -241,11 +241,17 @@ public Option readNextRecord() { * Puts an empty entry to queue to denote termination. */ @Override - public void close() { + public void seal() { // done queueing records notifying queue-reader. isWriteDone.set(true); } + @Override + public void close() { + // NOTE: Closing is a no-op to support the 1-sided case, when the queue + // is just populated (for subsequent reading), but never consumed + } + private void throwExceptionIfFailed() { if (this.hasFailed.get() != null) { close(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java index 5bf92bdd37ca1..0ea2da90ac7ae 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java @@ -19,17 +19,10 @@ package org.apache.hudi.common.util.queue; import org.apache.hudi.common.util.Option; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.io.IOException; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; /** @@ -37,93 +30,25 @@ * class takes as queue producer(s), consumer and transformer and exposes API to orchestrate * concurrent execution of these actors communicating through disruptor */ -public class DisruptorExecutor extends HoodieExecutorBase { - - private static final Logger LOG = LogManager.getLogger(DisruptorExecutor.class); - private final HoodieMessageQueue queue; +public class DisruptorExecutor extends BaseHoodieQueueBasedExecutor { public DisruptorExecutor(final Option bufferSize, final Iterator inputItr, - IteratorBasedQueueConsumer consumer, Function transformFunction, Option waitStrategy, Runnable preExecuteRunnable) { - this(bufferSize, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable); - } - - public DisruptorExecutor(final Option bufferSize, HoodieProducer producer, - Option> consumer, final Function transformFunction, Option waitStrategy, Runnable preExecuteRunnable) { - this(bufferSize, Collections.singletonList(producer), consumer, transformFunction, waitStrategy, preExecuteRunnable); + HoodieConsumer consumer, Function transformFunction, Option waitStrategy, Runnable preExecuteRunnable) { + this(bufferSize, Collections.singletonList(new IteratorBasedQueueProducer<>(inputItr)), Option.of(consumer), + transformFunction, waitStrategy, preExecuteRunnable); } public DisruptorExecutor(final Option bufferSize, List> producers, - Option> consumer, final Function transformFunction, + Option> consumer, final Function transformFunction, final Option waitStrategy, Runnable preExecuteRunnable) { - super(producers, consumer, preExecuteRunnable); - this.queue = new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable); - } - - /** - * Start all Producers. - */ - @Override - public CompletableFuture startProducers() { - return CompletableFuture.allOf(producers.stream().map(producer -> { - return CompletableFuture.supplyAsync(() -> { - try { - producer.produce(queue); - } catch (Throwable e) { - LOG.error("error producing records", e); - throw new HoodieException("Error producing records in disruptor executor", e); - } - return true; - }, producerExecutorService); - }).toArray(CompletableFuture[]::new)); - } - - @Override - protected void setup() { - ((DisruptorMessageQueue)queue).setHandlers(consumer.get()); - ((DisruptorMessageQueue)queue).start(); - } - - @Override - protected void postAction() { - try { - super.close(); - queue.close(); - } catch (IOException e) { - throw new HoodieIOException("Catch IOException while closing DisruptorMessageQueue", e); - } - } - - @Override - protected CompletableFuture startConsumer() { - return producerFuture.thenApplyAsync(res -> { - try { - queue.close(); - consumer.get().finish(); - return consumer.get().getResult(); - } catch (IOException e) { - throw new HoodieIOException("Catch Exception when closing", e); - } - }, consumerExecutorService); - } - - @Override - public boolean isRemaining() { - return !queue.isEmpty(); - } - - @Override - public void shutdownNow() { - producerExecutorService.shutdownNow(); - consumerExecutorService.shutdownNow(); - try { - queue.close(); - } catch (IOException e) { - throw new HoodieIOException("Catch IOException while closing DisruptorMessageQueue"); - } + super(producers, consumer, new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable), preExecuteRunnable); } @Override - public DisruptorMessageQueue getQueue() { - return (DisruptorMessageQueue)queue; + protected void doConsume(HoodieMessageQueue queue, HoodieConsumer consumer) { + DisruptorMessageQueue disruptorQueue = (DisruptorMessageQueue) queue; + // Before we start producing, we need to set up Disruptor's queue + disruptorQueue.setHandlers(consumer); + disruptorQueue.start(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java index de0833f29d1a8..3a7b271c7107e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java @@ -20,19 +20,14 @@ import org.apache.hudi.common.util.CustomizedThreadFactory; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; -import com.lmax.disruptor.EventFactory; -import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; /** @@ -43,20 +38,17 @@ */ public class DisruptorMessageQueue implements HoodieMessageQueue { - private static final Logger LOG = LogManager.getLogger(DisruptorMessageQueue.class); - private final Disruptor queue; private final Function transformFunction; private final RingBuffer ringBuffer; - private final Lock closeLocker = new ReentrantLock(); - private boolean isDisruptorClosed = false; + private boolean isShutdown = false; public DisruptorMessageQueue(Option bufferSize, Function transformFunction, Option waitStrategyName, int totalProducers, Runnable preExecuteRunnable) { WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyName); CustomizedThreadFactory threadFactory = new CustomizedThreadFactory("disruptor", true, preExecuteRunnable); - this.queue = new Disruptor<>(new HoodieDisruptorEventFactory(), bufferSize.get(), threadFactory, totalProducers > 1 ? ProducerType.MULTI : ProducerType.SINGLE, waitStrategy); + this.queue = new Disruptor<>(HoodieDisruptorEvent::new, bufferSize.get(), threadFactory, totalProducers > 1 ? ProducerType.MULTI : ProducerType.SINGLE, waitStrategy); this.ringBuffer = queue.getRingBuffer(); this.transformFunction = transformFunction; } @@ -68,6 +60,10 @@ public long size() { @Override public void insertRecord(I value) throws Exception { + if (isShutdown) { + throw new HoodieException("Can't insert into the queue after it had already been closed"); + } + O applied = transformFunction.apply(value); EventTranslator translator = (event, sequence) -> event.set(applied); queue.getRingBuffer().publishEvent(translator); @@ -80,7 +76,7 @@ public Option readNextRecord() { @Override public void markAsFailed(Throwable e) { - // do nothing. + // no-op } @Override @@ -88,23 +84,22 @@ public boolean isEmpty() { return ringBuffer.getBufferSize() == ringBuffer.remainingCapacity(); } + @Override + public void seal() {} + @Override public void close() { - closeLocker.lock(); - if (!isDisruptorClosed) { - queue.shutdown(); - isDisruptorClosed = true; + synchronized (this) { + if (!isShutdown) { + isShutdown = true; + queue.shutdown(); + } } - closeLocker.unlock(); } - protected void setHandlers(IteratorBasedQueueConsumer consumer) { - queue.handleEventsWith(new EventHandler() { - - @Override - public void onEvent(HoodieDisruptorEvent event, long sequence, boolean endOfBatch) throws Exception { - consumer.consumeOneRecord(event.get()); - } + protected void setHandlers(HoodieConsumer consumer) { + queue.handleEventsWith((event, sequence, endOfBatch) -> { + consumer.consume(event.get()); }); } @@ -112,23 +107,10 @@ protected void start() { queue.start(); } - /** - * HoodieDisruptorEventFactory is used to create/preallocate HoodieDisruptorEvent. - * - */ - class HoodieDisruptorEventFactory implements EventFactory { - - @Override - public HoodieDisruptorEvent newInstance() { - return new HoodieDisruptorEvent(); - } - } - /** * The unit of data passed from producer to consumer in disruptor world. */ class HoodieDisruptorEvent { - private O value; public void set(O value) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java index df199158784bb..f83e8f01d9e24 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java @@ -40,8 +40,11 @@ public FunctionBasedQueueProducer(Function, Boolean> pr @Override public void produce(HoodieMessageQueue queue) { - LOG.info("starting function which will enqueue records"); + LOG.info("Starting to produce records into the queue"); producerFunction.apply(queue); - LOG.info("finished function which will enqueue records"); + LOG.info("Finished producing records into the queue"); } + + @Override + public void close() { /* no-op */ } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java index fb67fab6c74c7..5df0786ea2fe4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java @@ -26,5 +26,10 @@ public interface HoodieConsumer { /** * Consume records from inner message queue. */ - O consume(HoodieMessageQueue queue) throws Exception; + void consume(I record) throws Exception; + + /** + * Notifies implementation that we have exhausted consuming records from queue. + */ + O finish(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java index 7d51441edec29..bb010610b35f3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java @@ -18,12 +18,12 @@ package org.apache.hudi.common.util.queue; -import java.io.Closeable; - /** * HoodieExecutor which orchestrates concurrent producers and consumers communicating through a bounded in message queue. + * + * TODO cleanup unused generics */ -public interface HoodieExecutor extends Closeable { +public interface HoodieExecutor { /** * Main API to @@ -33,12 +33,13 @@ public interface HoodieExecutor extends Closeable { */ E execute(); - boolean isRemaining(); - /** - * Shutdown all the consumers and producers. + * Shuts executor down immediately, cleaning up any allocated resources */ void shutdownNow(); + /** + * Allows to gracefully await the termination of the executor + */ boolean awaitTermination(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java deleted file mode 100644 index 4930100fde497..0000000000000 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.common.util.queue; - -import org.apache.hudi.common.util.CustomizedThreadFactory; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieException; - -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -/** - * HoodieExecutorBase holds common elements producerExecutorService, consumerExecutorService, producers and a single consumer. - * Also HoodieExecutorBase control the lifecycle of producerExecutorService and consumerExecutorService. - */ -public abstract class HoodieExecutorBase implements HoodieExecutor { - - private static final Logger LOG = LogManager.getLogger(HoodieExecutorBase.class); - - private static final long TERMINATE_WAITING_TIME_SECS = 60L; - // Executor service used for launching write thread. - protected final ExecutorService producerExecutorService; - // Executor service used for launching read thread. - protected final ExecutorService consumerExecutorService; - // Producers - protected final List> producers; - // Consumer - protected final Option> consumer; - // pre-execute function to implement environment specific behavior before executors (producers/consumer) run - protected final Runnable preExecuteRunnable; - - CompletableFuture producerFuture; - - public HoodieExecutorBase(List> producers, Option> consumer, - Runnable preExecuteRunnable) { - this.producers = producers; - this.consumer = consumer; - this.preExecuteRunnable = preExecuteRunnable; - // Ensure fixed thread for each producer thread - this.producerExecutorService = Executors.newFixedThreadPool(producers.size(), new CustomizedThreadFactory("executor-queue-producer", preExecuteRunnable)); - // Ensure single thread for consumer - this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("executor-queue-consumer", preExecuteRunnable)); - } - - /** - * Start all Producers. - */ - public abstract CompletableFuture startProducers(); - - /** - * Start consumer. - */ - protected abstract CompletableFuture startConsumer(); - - /** - * Closing/cleaning up the executor's resources after consuming finished. - */ - protected abstract void postAction(); - - /** - * get bounded in message queue. - */ - public abstract HoodieMessageQueue getQueue(); - - /** - * set all the resources for current HoodieExecutor before start to produce and consume records. - */ - protected abstract void setup(); - - @Override - public boolean awaitTermination() { - // if current thread has been interrupted before awaitTermination was called, we still give - // executor a chance to proceeding. So clear the interrupt flag and reset it if needed before return. - boolean interruptedBefore = Thread.interrupted(); - boolean producerTerminated = false; - boolean consumerTerminated = false; - try { - producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS); - consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - // fail silently for any other interruption - } - // reset interrupt flag if needed - if (interruptedBefore) { - Thread.currentThread().interrupt(); - } - return producerTerminated && consumerTerminated; - } - - @Override - public void close() { - if (!producerExecutorService.isShutdown()) { - producerExecutorService.shutdown(); - } - if (!consumerExecutorService.isShutdown()) { - consumerExecutorService.shutdown(); - } - } - - /** - * Main API to run both production and consumption. - */ - @Override - public E execute() { - try { - ValidationUtils.checkState(this.consumer.isPresent()); - setup(); - producerFuture = startProducers(); - CompletableFuture future = startConsumer(); - return future.get(); - } catch (InterruptedException ie) { - shutdownNow(); - Thread.currentThread().interrupt(); - throw new HoodieException(ie); - } catch (Exception e) { - throw new HoodieException(e); - } finally { - postAction(); - } - } -} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java deleted file mode 100644 index 71ef39f2c1883..0000000000000 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.common.util.queue; - -import java.util.Iterator; - -/** - * IteratorBasedHoodieMessageQueue implements HoodieMessageQueue with Iterable - */ -public abstract class HoodieIterableMessageQueue implements HoodieMessageQueue, Iterable { - - public abstract Iterator iterator(); -} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java index 33098b7d02155..185cdea022e79 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java @@ -51,4 +51,15 @@ public interface HoodieMessageQueue extends Closeable { void markAsFailed(Throwable e); boolean isEmpty(); + + /** + * Seals the queue (for writing) preventing new records to be enqueued + */ + void seal(); + + /** + * Shuts down the queue, cleaning up the resources + */ + @Override + void close(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java index f56dd4cce9944..79147f48e3e5d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java @@ -18,12 +18,14 @@ package org.apache.hudi.common.util.queue; +import java.io.Closeable; + /** * Producer for {@link HoodieMessageQueue}. Memory Bounded Buffer supports multiple producers single consumer pattern. * * @param Input type for buffer items produced */ -public interface HoodieProducer { +public interface HoodieProducer extends Closeable { /** * API to enqueue entries to bounded queue. @@ -31,4 +33,7 @@ public interface HoodieProducer { * @param queue In Memory bounded queue */ void produce(HoodieMessageQueue queue) throws Exception; + + @Override + void close(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueConsumer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueConsumer.java deleted file mode 100644 index 713d6504645e0..0000000000000 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueConsumer.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.common.util.queue; - -import java.util.Iterator; - -/** - * Consume entries from queue and execute callback function. - */ -public abstract class IteratorBasedQueueConsumer implements HoodieConsumer { - - /** - * API to de-queue entries to memory bounded queue. - * - * @param queue In Memory bounded queue - */ - @Override - public O consume(HoodieMessageQueue queue) throws Exception { - - Iterator iterator = ((HoodieIterableMessageQueue) queue).iterator(); - - while (iterator.hasNext()) { - consumeOneRecord(iterator.next()); - } - - // Notifies done - finish(); - - return getResult(); - } - - /** - * Consumer One record. - */ - public abstract void consumeOneRecord(I record); - - /** - * Notifies implementation that we have exhausted consuming records from queue. - */ - public void finish(){} - - /** - * Return result of consuming records so far. - */ - protected abstract O getResult(); - -} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java index 7904fd61ebc80..ccfba1f32220d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.util.queue; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -32,7 +33,6 @@ public class IteratorBasedQueueProducer implements HoodieProducer { private static final Logger LOG = LogManager.getLogger(IteratorBasedQueueProducer.class); - // input iterator for producing items in the buffer. private final Iterator inputIterator; public IteratorBasedQueueProducer(Iterator inputIterator) { @@ -47,4 +47,11 @@ public void produce(HoodieMessageQueue queue) throws Exception { } LOG.info("finished buffering records"); } + + @Override + public void close() { + if (inputIterator instanceof ClosableIterator) { + ((ClosableIterator) inputIterator).close(); + } + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 53b253da808de..c455cb7616431 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -167,41 +167,10 @@ public static HoodieMergedLogRecordScanner logScanner( .build(); } - private static HoodieUnMergedLogRecordScanner unMergedLogScanner( - MergeOnReadInputSplit split, - Schema logSchema, - InternalSchema internalSchema, - org.apache.flink.configuration.Configuration flinkConf, - Configuration hadoopConf, - HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) { - FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf); - return HoodieUnMergedLogRecordScanner.newBuilder() - .withFileSystem(fs) - .withBasePath(split.getTablePath()) - .withLogFilePaths(split.getLogPaths().get()) - .withReaderSchema(logSchema) - .withInternalSchema(internalSchema) - .withLatestInstantTime(split.getLatestCommit()) - .withReadBlocksLazily( - string2Boolean( - flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, - HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) - .withReverseReader(false) - .withBufferSize( - flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, - HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) - .withInstantRange(split.getInstantRange()) - .withLogRecordScannerCallback(callback) - .build(); - } - /** * Utility to read and buffer the records in the unMerged log record scanner. */ public static class BoundedMemoryRecords { - // Log Record unmerged scanner - private final HoodieUnMergedLogRecordScanner scanner; - // Executor that runs the above producers in parallel private final BoundedInMemoryExecutor, HoodieRecord, ?> executor; @@ -214,19 +183,34 @@ public BoundedMemoryRecords( InternalSchema internalSchema, Configuration hadoopConf, org.apache.flink.configuration.Configuration flinkConf) { + HoodieUnMergedLogRecordScanner.Builder scannerBuilder = HoodieUnMergedLogRecordScanner.newBuilder() + .withFileSystem(FSUtils.getFs(split.getTablePath(), hadoopConf)) + .withBasePath(split.getTablePath()) + .withLogFilePaths(split.getLogPaths().get()) + .withReaderSchema(logSchema) + .withInternalSchema(internalSchema) + .withLatestInstantTime(split.getLatestCommit()) + .withReadBlocksLazily( + string2Boolean( + flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, + HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) + .withReverseReader(false) + .withBufferSize( + flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, + HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) + .withInstantRange(split.getInstantRange()); + this.executor = new BoundedInMemoryExecutor<>( StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf), - getParallelProducers(), + getParallelProducers(scannerBuilder), Option.empty(), Function.identity(), new DefaultSizeEstimator<>(), Functions.noop()); - // Consumer of this record reader - this.iterator = this.executor.getQueue().iterator(); - this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, internalSchema, flinkConf, hadoopConf, - record -> executor.getQueue().insertRecord(record)); + this.iterator = this.executor.getRecordIterator(); + // Start reading and buffering - this.executor.startProducers(); + this.executor.startProducingAsync(); } public Iterator> getRecordsIterator() { @@ -236,12 +220,18 @@ public Iterator> getRecordsIterator() { /** * Setup log and parquet reading in parallel. Both write to central buffer. */ - private List>> getParallelProducers() { + private List>> getParallelProducers( + HoodieUnMergedLogRecordScanner.Builder scannerBuilder + ) { List>> producers = new ArrayList<>(); - producers.add(new FunctionBasedQueueProducer<>(buffer -> { + producers.add(new FunctionBasedQueueProducer<>(queue -> { + HoodieUnMergedLogRecordScanner scanner = + scannerBuilder.withLogRecordScannerCallback(queue::insertRecord).build(); + // Scan all the delta-log files, filling in the queue scanner.scan(); return null; })); + return producers; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index 700a87cbb7284..e21e266b7229f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -18,6 +18,11 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.util.DefaultSizeEstimator; @@ -32,14 +37,8 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; - import java.io.IOException; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.function.Function; @@ -47,9 +46,6 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader implements RecordReader { - // Log Record unmerged scanner - private final HoodieUnMergedLogRecordScanner logRecordScanner; - // Parquet record reader private final RecordReader parquetReader; @@ -76,42 +72,50 @@ public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, this.parquetReader = new SafeParquetRecordReaderWrapper(realReader); // Iterator for consuming records from parquet file this.parquetRecordsIterator = new RecordReaderValueIterator<>(this.parquetReader); + + HoodieUnMergedLogRecordScanner.Builder scannerBuilder = + HoodieUnMergedLogRecordScanner.newBuilder() + .withFileSystem(FSUtils.getFs(split.getPath().toString(), this.jobConf)) + .withBasePath(split.getBasePath()) + .withLogFilePaths(split.getDeltaLogPaths()) + .withReaderSchema(getReaderSchema()) + .withLatestInstantTime(split.getMaxCommitTime()) + .withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) + .withReverseReader(false) + .withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)); + this.executor = new BoundedInMemoryExecutor<>( - HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), getParallelProducers(), + HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), getParallelProducers(scannerBuilder), Option.empty(), Function.identity(), new DefaultSizeEstimator<>(), Functions.noop()); // Consumer of this record reader - this.iterator = this.executor.getQueue().iterator(); - this.logRecordScanner = HoodieUnMergedLogRecordScanner.newBuilder() - .withFileSystem(FSUtils.getFs(split.getPath().toString(), this.jobConf)) - .withBasePath(split.getBasePath()) - .withLogFilePaths(split.getDeltaLogPaths()) - .withReaderSchema(getReaderSchema()) - .withLatestInstantTime(split.getMaxCommitTime()) - .withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) - .withReverseReader(false) - .withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) - .withLogRecordScannerCallback(record -> { - // convert Hoodie log record to Hadoop AvroWritable and buffer - GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema(), payloadProps).get(); - ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema()); - this.executor.getQueue().insertRecord(aWritable); - }) - .build(); + this.iterator = this.executor.getRecordIterator(); + // Start reading and buffering - this.executor.startProducers(); + this.executor.startProducingAsync(); } /** * Setup log and parquet reading in parallel. Both write to central buffer. */ - private List> getParallelProducers() { - List> producers = new ArrayList<>(); - producers.add(new FunctionBasedQueueProducer<>(buffer -> { - logRecordScanner.scan(); - return null; - })); - producers.add(new IteratorBasedQueueProducer<>(parquetRecordsIterator)); - return producers; + private List> getParallelProducers( + HoodieUnMergedLogRecordScanner.Builder scannerBuilder + ) { + return Arrays.asList( + new FunctionBasedQueueProducer<>(queue -> { + HoodieUnMergedLogRecordScanner scanner = + scannerBuilder.withLogRecordScannerCallback(record -> { + // convert Hoodie log record to Hadoop AvroWritable and buffer + GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema(), payloadProps).get(); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema()); + queue.insertRecord(aWritable); + }) + .build(); + // Scan all the delta-log files, filling in the queue + scanner.scan(); + return null; + }), + new IteratorBasedQueueProducer<>(parquetRecordsIterator) + ); } @Override @@ -149,6 +153,7 @@ public void close() throws IOException { @Override public float getProgress() throws IOException { - return Math.min(parquetReader.getProgress(), logRecordScanner.getProgress()); + // TODO fix to reflect scanner progress + return parquetReader.getProgress(); } } diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index fef63ed6879de..5c9b496c2f8d4 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -107,6 +107,7 @@ com.fasterxml.jackson.core:jackson-databind com.fasterxml.jackson.core:jackson-core + com.lmax:disruptor com.github.davidmoten:guava-mini com.github.davidmoten:hilbert-curve com.github.ben-manes.caffeine:caffeine diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml index 82d6f25016495..6321ee3c0e97d 100644 --- a/packaging/hudi-kafka-connect-bundle/pom.xml +++ b/packaging/hudi-kafka-connect-bundle/pom.xml @@ -90,6 +90,7 @@ org.apache.flink:flink-core ${flink.hadoop.compatibility.artifactId} + com.lmax:disruptor com.github.davidmoten:guava-mini com.github.davidmoten:hilbert-curve com.yammer.metrics:metrics-core