diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 639467fcdaf8f..88b63d5185ded 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -18,11 +18,6 @@ package org.apache.hudi.table; -import org.apache.avro.Schema; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; @@ -59,6 +54,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; +import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; @@ -75,10 +71,17 @@ import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.storage.HoodieLayoutFactory; import org.apache.hudi.table.storage.HoodieStorageLayout; + +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import javax.annotation.Nonnull; + import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -776,8 +779,8 @@ public final Option getMetadataWriter(String triggeri * @param triggeringInstantTimestamp - The instant that is triggering this metadata write * @return instance of {@link HoodieTableMetadataWriter} */ - public Option getMetadataWriter(String triggeringInstantTimestamp, - Option actionMetadata) { + public Option getMetadataWriter(String triggeringInstantTimestamp, + Option actionMetadata) { // Each engine is expected to override this and // provide the actual metadata writer, if enabled. return Option.empty(); @@ -786,4 +789,8 @@ public Option getMetad public HoodieTableMetadata getMetadataTable() { return this.metadata; } + + public Runnable getPreExecuteRunnable() { + return Functions.noop(); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java index c88b4ee66a098..38d4e60f648ec 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java @@ -91,7 +91,7 @@ public void runMerge(HoodieTable>, List, List ThreadLocal encoderCache = new ThreadLocal<>(); ThreadLocal decoderCache = new ThreadLocal<>(); - wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator), + wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator), Option.of(new UpdateHandler(mergeHandle)), record -> { if (!externalSchemaTransformation) { return record; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java index 0df6d3a90cc50..7878d857761ea 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java @@ -91,7 +91,7 @@ public void runMerge(HoodieTable>, List, List ThreadLocal encoderCache = new ThreadLocal<>(); ThreadLocal decoderCache = new ThreadLocal<>(); - wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator), + wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator), Option.of(new UpdateHandler(mergeHandle)), record -> { if (!externalSchemaTransformation) { return record; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkBoundedInMemoryExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkBoundedInMemoryExecutor.java deleted file mode 100644 index d240c065d0834..0000000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkBoundedInMemoryExecutor.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.execution; - -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; -import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; -import org.apache.hudi.config.HoodieWriteConfig; - -import org.apache.spark.TaskContext; -import org.apache.spark.TaskContext$; - -import java.util.Iterator; -import java.util.function.Function; - -public class SparkBoundedInMemoryExecutor extends BoundedInMemoryExecutor { - - // Need to set current spark thread's TaskContext into newly launched thread so that new thread can access - // TaskContext properties. - final TaskContext sparkThreadTaskContext; - - public SparkBoundedInMemoryExecutor(final HoodieWriteConfig hoodieConfig, final Iterator inputItr, - BoundedInMemoryQueueConsumer consumer, Function bufferedIteratorTransform) { - this(hoodieConfig, new IteratorBasedQueueProducer<>(inputItr), consumer, bufferedIteratorTransform); - } - - public SparkBoundedInMemoryExecutor(final HoodieWriteConfig hoodieConfig, BoundedInMemoryQueueProducer producer, - BoundedInMemoryQueueConsumer consumer, Function bufferedIteratorTransform) { - super(hoodieConfig.getWriteBufferLimitBytes(), producer, Option.of(consumer), bufferedIteratorTransform); - this.sparkThreadTaskContext = TaskContext.get(); - } - - @Override - public void preExecute() { - // Passing parent thread's TaskContext to newly launched thread for it to access original TaskContext properties. - TaskContext$.MODULE$.setTaskContext(sparkThreadTaskContext); - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index 088872bbd4381..a8a9e49c01c00 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -18,7 +18,6 @@ package org.apache.hudi.execution; -import org.apache.avro.Schema; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; @@ -30,6 +29,8 @@ import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.HoodieTable; +import org.apache.avro.Schema; + import java.util.Iterator; import java.util.List; @@ -84,8 +85,8 @@ protected List computeNext() { schema = HoodieAvroUtils.addMetadataFields(schema); } bufferedIteratorExecutor = - new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr, getInsertHandler(), - getTransformFunction(schema, hoodieConfig)); + new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(), + getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); final List result = bufferedIteratorExecutor.execute(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); return result; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index 118438c602303..bb8c95d745ab1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -18,8 +18,6 @@ package org.apache.hudi.table; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.data.HoodieData; @@ -39,6 +37,11 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.action.HoodieWriteMetadata; + +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hadoop.fs.Path; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; import org.apache.spark.api.java.JavaRDD; import java.io.IOException; @@ -111,8 +114,8 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con * @return instance of {@link HoodieTableMetadataWriter} */ @Override - public Option getMetadataWriter(String triggeringInstantTimestamp, - Option actionMetadata) { + public Option getMetadataWriter(String triggeringInstantTimestamp, + Option actionMetadata) { if (config.isMetadataTableEnabled()) { // Create the metadata table writer. First time after the upgrade this creation might trigger // metadata table bootstrapping. Bootstrapping process could fail and checking the table @@ -132,4 +135,10 @@ public Option getMetad return Option.empty(); } + + @Override + public Runnable getPreExecuteRunnable() { + final TaskContext taskContext = TaskContext.get(); + return () -> TaskContext$.MODULE$.setTaskContext(taskContext); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java index fe6dcc12733dc..e3d0e9b3c69d4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; import org.apache.hudi.io.HoodieBootstrapHandle; import org.apache.hudi.keygen.KeyGeneratorInterface; import org.apache.hudi.table.HoodieTable; @@ -68,7 +67,7 @@ void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, Path so Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf())); TypeDescription orcSchema = orcReader.getSchema(); try (RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema))) { - wrapper = new SparkBoundedInMemoryExecutor(config, + wrapper = new BoundedInMemoryExecutor(config.getWriteBufferLimitBytes(), new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> { String recKey = keyGenerator.getKey(inp).getRecordKey(); GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA); @@ -76,7 +75,7 @@ void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, Path so BootstrapRecordPayload payload = new BootstrapRecordPayload(gr); HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload); return rec; - }); + }, table.getPreExecuteRunnable()); wrapper.execute(); } catch (Exception e) { throw new HoodieException(e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java index 2bb9bc4ead38b..d07ea771bc557 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; import org.apache.hudi.io.HoodieBootstrapHandle; import org.apache.hudi.keygen.KeyGeneratorInterface; import org.apache.hudi.table.HoodieTable; @@ -72,7 +71,7 @@ void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, try { ParquetReader reader = AvroParquetReader.builder(sourceFilePath).withConf(table.getHadoopConf()).build(); - wrapper = new SparkBoundedInMemoryExecutor(config, + wrapper = new BoundedInMemoryExecutor(config.getWriteBufferLimitBytes(), new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> { String recKey = keyGenerator.getKey(inp).getRecordKey(); GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA); @@ -80,7 +79,7 @@ void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, BootstrapRecordPayload payload = new BootstrapRecordPayload(gr); HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload); return rec; - }); + }, table.getPreExecuteRunnable()); wrapper.execute(); } catch (Exception e) { throw new HoodieException(e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java index 80615451374b6..e87c3ef5ba77e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -90,13 +89,13 @@ public void runMerge(HoodieTable>, JavaRDD ThreadLocal encoderCache = new ThreadLocal<>(); ThreadLocal decoderCache = new ThreadLocal<>(); - wrapper = new SparkBoundedInMemoryExecutor(table.getConfig(), readerIterator, + wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), readerIterator, new UpdateHandler(mergeHandle), record -> { if (!externalSchemaTransformation) { return record; } return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record); - }); + }, table.getPreExecuteRunnable()); wrapper.execute(); } catch (Exception e) { throw new HoodieException(e); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java similarity index 82% rename from hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java index ecb18c6bc2828..91f9cbc96e6ed 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java @@ -22,12 +22,15 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.avro.generic.IndexedRecord; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,7 +47,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class TestSparkBoundedInMemoryExecutor extends HoodieClientTestHarness { +public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness { private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); @@ -58,6 +61,11 @@ public void tearDown() throws Exception { cleanupResources(); } + private Runnable getPreExecuteRunnable() { + final TaskContext taskContext = TaskContext.get(); + return () -> TaskContext$.MODULE$.setTaskContext(taskContext); + } + @Test public void testExecutor() { @@ -85,10 +93,10 @@ protected Integer getResult() { } }; - SparkBoundedInMemoryExecutor>, Integer> executor = null; + BoundedInMemoryExecutor>, Integer> executor = null; try { - executor = new SparkBoundedInMemoryExecutor(hoodieWriteConfig, hoodieRecords.iterator(), consumer, - getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, + getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); int result = executor.execute(); // It should buffer and write 100 records assertEquals(100, result); @@ -131,11 +139,11 @@ protected Integer getResult() { } }; - SparkBoundedInMemoryExecutor>, Integer> executor = null; + BoundedInMemoryExecutor>, Integer> executor = null; try { - executor = new SparkBoundedInMemoryExecutor(hoodieWriteConfig, hoodieRecords.iterator(), consumer, - getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); - SparkBoundedInMemoryExecutor>, Integer> finalExecutor = executor; + executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, + getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); + BoundedInMemoryExecutor>, Integer> finalExecutor = executor; Thread.currentThread().interrupt(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java b/hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java index 3ec96be207330..0b82f091402a0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java @@ -25,6 +25,11 @@ */ public interface Functions { + static Runnable noop() { + return () -> { + }; + } + /** * A function which has not any parameter. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java index 872837913b054..68b840a4794d6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.util.queue; import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SizeEstimator; import org.apache.hudi.exception.HoodieException; @@ -26,7 +27,8 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -54,29 +56,35 @@ public class BoundedInMemoryExecutor { private final List> producers; // Consumer private final Option> consumer; + // pre-execute function to implement environment specific behavior before executors (producers/consumer) run + private final Runnable preExecuteRunnable; + + public BoundedInMemoryExecutor(final long bufferLimitInBytes, final Iterator inputItr, + BoundedInMemoryQueueConsumer consumer, Function transformFunction, Runnable preExecuteRunnable) { + this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, preExecuteRunnable); + } public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer producer, Option> consumer, final Function transformFunction) { - this(bufferLimitInBytes, Arrays.asList(producer), consumer, transformFunction, new DefaultSizeEstimator<>()); + this(bufferLimitInBytes, producer, consumer, transformFunction, Functions.noop()); + } + + public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer producer, + Option> consumer, final Function transformFunction, Runnable preExecuteRunnable) { + this(bufferLimitInBytes, Collections.singletonList(producer), consumer, transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable); } public BoundedInMemoryExecutor(final long bufferLimitInBytes, List> producers, Option> consumer, final Function transformFunction, - final SizeEstimator sizeEstimator) { + final SizeEstimator sizeEstimator, Runnable preExecuteRunnable) { this.producers = producers; this.consumer = consumer; + this.preExecuteRunnable = preExecuteRunnable; // Ensure single thread for each producer thread and one for consumer this.executorService = Executors.newFixedThreadPool(producers.size() + 1); this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes, transformFunction, sizeEstimator); } - /** - * Callback to implement environment specific behavior before executors (producers/consumer) run. - */ - public void preExecute() { - // Do Nothing in general context - } - /** * Start all Producers. */ @@ -88,7 +96,7 @@ public ExecutorCompletionService startProducers() { producers.stream().map(producer -> { return completionService.submit(() -> { try { - preExecute(); + preExecuteRunnable.run(); producer.produce(queue); } catch (Throwable e) { LOG.error("error producing records", e); @@ -116,7 +124,7 @@ private Future startConsumer() { return consumer.map(consumer -> { return executorService.submit(() -> { LOG.info("starting consumer thread"); - preExecute(); + preExecuteRunnable.run(); try { E result = consumer.consume(queue); LOG.info("Queue Consumption is done; notifying producer threads"); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 2c3318362b053..fa404cc2163ec 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; @@ -50,6 +51,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.function.Function; /** * Utilities for format. @@ -193,8 +195,9 @@ public BoundedMemoryRecords( HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(new JobConf(hadoopConf)), getParallelProducers(), Option.empty(), - x -> x, - new DefaultSizeEstimator<>()); + Function.identity(), + new DefaultSizeEstimator<>(), + Functions.noop()); // Consumer of this record reader this.iterator = this.executor.getQueue().iterator(); this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, hadoopConf, diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index 313b96488c6d5..84c808865072a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -18,18 +18,10 @@ package org.apache.hudi.hadoop.realtime; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; @@ -40,6 +32,18 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; + class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader implements RecordReader { @@ -74,7 +78,7 @@ public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, this.parquetRecordsIterator = new RecordReaderValueIterator<>(this.parquetReader); this.executor = new BoundedInMemoryExecutor<>( HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), getParallelProducers(), - Option.empty(), x -> x, new DefaultSizeEstimator<>()); + Option.empty(), Function.identity(), new DefaultSizeEstimator<>(), Functions.noop()); // Consumer of this record reader this.iterator = this.executor.getQueue().iterator(); this.logRecordScanner = HoodieUnMergedLogRecordScanner.newBuilder()