diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 5d1a55453d162..9249d42f41e23 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -18,7 +18,14 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.Schema; import org.apache.avro.SchemaCompatibility; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; @@ -27,34 +34,26 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.InternalSchemaCache; -import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils; -import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; +import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.hadoop.conf.Configuration; - import java.io.IOException; +import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.HashMap; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; public class HoodieMergeHelper extends @@ -91,7 +90,6 @@ public void runMerge(HoodieTable>, HoodieData wrapper = null; HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); Option querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema()); @@ -137,14 +135,22 @@ public void runMerge(HoodieTable>, HoodieData encoderCache = new ThreadLocal<>(); ThreadLocal decoderCache = new ThreadLocal<>(); - wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), readerIterator, - new UpdateHandler(mergeHandle), record -> { - if (!externalSchemaTransformation) { - return record; - } - return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record); - }, table.getPreExecuteRunnable()); - wrapper.execute(); + + Function transformer; + if (!externalSchemaTransformation) { + transformer = Function.identity(); + } else { + transformer = record -> transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, + decoderCache, record); + } + + // TODO elaborate + while (readerIterator.hasNext()) { + GenericRecord record = readerIterator.next(); + GenericRecord transformed = transformer.apply(record); + + mergeHandle.write(transformed); + } } catch (Exception e) { throw new HoodieException(e); } finally { @@ -153,10 +159,7 @@ public void runMerge(HoodieTable>, HoodieData computeNext() { null; try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); - bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), - Option.of(getExplicitInsertHandler()), getTransformFunction(schema, hoodieConfig)); + bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, + getExplicitInsertHandler(), getTransformFunction(schema, hoodieConfig), null); final List result = bufferedIteratorExecutor.execute(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); return result; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java index 01466484d63db..84285995b66e5 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java @@ -18,31 +18,29 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.hadoop.conf.Configuration; - import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.function.Function; /** * Flink merge helper. @@ -93,13 +91,16 @@ public void runMerge(HoodieTable>, List, List ThreadLocal encoderCache = new ThreadLocal<>(); ThreadLocal decoderCache = new ThreadLocal<>(); - wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator), - Option.of(new UpdateHandler(mergeHandle)), record -> { + + Function transformer = record -> { if (!externalSchemaTransformation) { return record; } return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record); - }); + }; + + wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), readerIterator, + new UpdateHandler(mergeHandle), transformer, null); wrapper.execute(); } catch (Exception e) { throw new HoodieException(e); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java index 9821aedc875cd..057c8c0626a9a 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java @@ -18,20 +18,17 @@ package org.apache.hudi.execution; +import org.apache.avro.Schema; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.Schema; - import java.util.Iterator; import java.util.List; @@ -65,7 +62,7 @@ protected List computeNext() { try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); bufferedIteratorExecutor = - new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema)); + new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(), getTransformFunction(schema), null); final List result = bufferedIteratorExecutor.execute(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); return result; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java index 46dd30a7cb773..853f483c44514 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java @@ -18,31 +18,29 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.hadoop.conf.Configuration; - import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.function.Function; public class JavaMergeHelper extends BaseMergeHelper>, List, List> { @@ -91,13 +89,16 @@ public void runMerge(HoodieTable>, List, List ThreadLocal encoderCache = new ThreadLocal<>(); ThreadLocal decoderCache = new ThreadLocal<>(); - wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator), - Option.of(new UpdateHandler(mergeHandle)), record -> { + + Function transformer = record -> { if (!externalSchemaTransformation) { return record; } return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record); - }); + }; + + wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), readerIterator, + new UpdateHandler(mergeHandle), transformer, null); wrapper.execute(); } catch (Exception e) { throw new HoodieException(e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index d2555f95980b7..4e9ca1c463624 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -18,22 +18,20 @@ package org.apache.hudi.execution; +import org.apache.avro.Schema; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.Schema; -import org.apache.hudi.util.QueueBasedExecutorFactory; - import java.util.Iterator; import java.util.List; +import java.util.function.Function; public class SparkLazyInsertIterable extends HoodieLazyInsertIterable { @@ -78,26 +76,27 @@ public SparkLazyInsertIterable(Iterator> recordItr, @Override protected List computeNext() { // Executor service used for launching writer thread. - HoodieExecutor> bufferedIteratorExecutor = null; try { Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); if (useWriterSchema) { schema = HoodieAvroUtils.addMetadataFields(schema); } - bufferedIteratorExecutor = QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), - getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); + CopyOnWriteInsertHandler insertHandler = getInsertHandler(); + Function, HoodieInsertValueGenResult> transformer = getTransformFunction(schema, hoodieConfig); + + // TODO elaborate + while (inputItr.hasNext()) { + HoodieRecord record = inputItr.next(); + HoodieInsertValueGenResult transformed = transformer.apply(record); - final List result = bufferedIteratorExecutor.execute(); - assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); - return result; + insertHandler.consumeOneRecord(transformed); + } + + insertHandler.finish(); + return insertHandler.getResult(); } catch (Exception e) { throw new HoodieException(e); - } finally { - if (null != bufferedIteratorExecutor) { - bufferedIteratorExecutor.shutdownNow(); - bufferedIteratorExecutor.awaitTermination(); - } } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java index ce5898c7c3101..07fc04ca591ac 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java @@ -19,7 +19,6 @@ package org.apache.hudi.common.util.queue; import org.apache.hudi.common.util.DefaultSizeEstimator; -import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SizeEstimator; import org.apache.hudi.exception.HoodieException; @@ -48,17 +47,7 @@ public class BoundedInMemoryExecutor extends HoodieExecutorBase inputItr, IteratorBasedQueueConsumer consumer, Function transformFunction, Runnable preExecuteRunnable) { - this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, preExecuteRunnable); - } - - public BoundedInMemoryExecutor(final long bufferLimitInBytes, HoodieProducer producer, - Option> consumer, final Function transformFunction) { - this(bufferLimitInBytes, producer, consumer, transformFunction, Functions.noop()); - } - - public BoundedInMemoryExecutor(final long bufferLimitInBytes, HoodieProducer producer, - Option> consumer, final Function transformFunction, Runnable preExecuteRunnable) { - this(bufferLimitInBytes, Collections.singletonList(producer), consumer, transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable); + this(bufferLimitInBytes, Collections.singletonList(new IteratorBasedQueueProducer<>(inputItr)), Option.of(consumer), transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable); } public BoundedInMemoryExecutor(final long bufferLimitInBytes, List> producers,