From 945048b15e2653ba30ad5b28be4887253a9af645 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 11 Nov 2022 11:53:39 -0800 Subject: [PATCH 1/5] Fixing Spark bundles (missing Disruptor as dep) --- packaging/hudi-spark-bundle/pom.xml | 5 ++++- packaging/hudi-utilities-bundle/pom.xml | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index 6e0dfe32d86e0..55f40442ae64b 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -91,13 +91,15 @@ org.jetbrains.kotlin:* org.rocksdb:rocksdbjni org.antlr:stringtemplate - org.apache.parquet:parquet-avro + com.lmax:disruptor com.github.davidmoten:guava-mini com.github.davidmoten:hilbert-curve com.github.ben-manes.caffeine:caffeine + org.apache.parquet:parquet-avro com.twitter:bijection-avro_${scala.binary.version} com.twitter:bijection-core_${scala.binary.version} + io.dropwizard.metrics:metrics-core io.dropwizard.metrics:metrics-graphite io.dropwizard.metrics:metrics-jmx @@ -127,6 +129,7 @@ org.apache.hbase.thirdparty:hbase-shaded-netty org.apache.hbase.thirdparty:hbase-shaded-protobuf org.apache.htrace:htrace-core4 + org.apache.curator:curator-framework org.apache.curator:curator-client org.apache.curator:curator-recipes diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index bbc28bf0f5964..f055bc00bb7f0 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -116,6 +116,7 @@ org.antlr:stringtemplate org.apache.parquet:parquet-avro + com.lmax:disruptor com.github.davidmoten:guava-mini com.github.davidmoten:hilbert-curve com.github.ben-manes.caffeine:caffeine From 481e7afc763486f6fcc0efe8ebd4bff2c6025873 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 11 Nov 2022 14:37:57 -0800 Subject: [PATCH 2/5] Tidying up configs --- .../src/main/java/org/apache/hudi/config/HoodieWriteConfig.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 514a4e38dc72c..e1b4bf6e50f47 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -93,6 +93,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY; +import static org.apache.hudi.common.util.queue.ExecutorType.DISRUPTOR; import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY; /** @@ -138,6 +139,7 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty EXECUTOR_TYPE = ConfigProperty .key("hoodie.write.executor.type") .defaultValue(BOUNDED_IN_MEMORY.name()) + .withValidValues(BOUNDED_IN_MEMORY.name(), DISRUPTOR.name()) .withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue." + "default value is BOUNDED_IN_MEMORY which use a bounded in-memory queue using LinkedBlockingQueue." + "Also users could use DISRUPTOR, which use disruptor as a lock free message queue " From 2f8a1f6693c232e105c0f2f16c10d013ba47af49 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 11 Nov 2022 17:31:32 -0800 Subject: [PATCH 3/5] Fixed config to fallback to default --- .../src/main/java/org/apache/hudi/config/HoodieWriteConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index e1b4bf6e50f47..107469f72b815 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1002,7 +1002,7 @@ public String getKeyGeneratorClass() { } public ExecutorType getExecutorType() { - return ExecutorType.valueOf(getString(EXECUTOR_TYPE).toUpperCase(Locale.ROOT)); + return ExecutorType.valueOf(getStringOrDefault(EXECUTOR_TYPE).toUpperCase(Locale.ROOT)); } public boolean isCDCEnabled() { From cb6449064a9d4b1484f3caf506a6527af9d51312 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 11 Nov 2022 17:31:57 -0800 Subject: [PATCH 4/5] Moved `QueueBasedExecutorFactory` to hudi-client-common --- .../main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename hudi-client/{hudi-spark-client => hudi-client-common}/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java (100%) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java similarity index 100% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java From 9edd3b329d9ac18154ef67a928f594a73ce4efd2 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 11 Nov 2022 17:32:36 -0800 Subject: [PATCH 5/5] Rebased `HoodieMergeHelper` onto `HoodieExecutor` instead of BIMQ --- .../action/commit/HoodieMergeHelper.java | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 5d1a55453d162..f3a5ff235b059 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -18,7 +18,14 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.Schema; import org.apache.avro.SchemaCompatibility; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; @@ -27,33 +34,26 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.InternalSchemaCache; -import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils; -import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; +import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.util.QueueBasedExecutorFactory; import java.io.IOException; +import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; @@ -91,7 +91,7 @@ public void runMerge(HoodieTable>, HoodieData wrapper = null; + HoodieExecutor wrapper = null; HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); Option querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema()); @@ -137,13 +137,14 @@ public void runMerge(HoodieTable>, HoodieData encoderCache = new ThreadLocal<>(); ThreadLocal decoderCache = new ThreadLocal<>(); - wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), readerIterator, - new UpdateHandler(mergeHandle), record -> { + + wrapper = QueueBasedExecutorFactory.create(table.getConfig(), readerIterator, new UpdateHandler(mergeHandle), record -> { if (!externalSchemaTransformation) { return record; } - return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record); + return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, record); }, table.getPreExecuteRunnable()); + wrapper.execute(); } catch (Exception e) { throw new HoodieException(e);