From c334b08e3d33828a5186048fdb814de2357638f1 Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Fri, 10 Nov 2023 11:39:47 +0800 Subject: [PATCH 1/7] [HUDI-7069] Optimize metaclient construction and include table config in write config for multi-table services. --- .../hudi/utilities/HoodieClusteringJob.java | 8 ++++---- .../hudi/utilities/HoodieCompactor.java | 7 ++++--- .../utilities/multitable/ClusteringTask.java | 19 ++++++++++++++++++- .../utilities/multitable/CompactionTask.java | 19 ++++++++++++++++++- .../multitable/MultiTableServiceUtils.java | 6 ++++++ 5 files changed, 50 insertions(+), 9 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index d0c1535a43c12..5cb6b9bbb150d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -56,17 +56,17 @@ public class HoodieClusteringJob { private HoodieTableMetaClient metaClient; public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) { - this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs)); + this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs), + UtilHelpers.createMetaClient(jsc, cfg.basePath, true)); } - public HoodieClusteringJob(JavaSparkContext jsc, Config cfg, TypedProperties props) { + public HoodieClusteringJob(JavaSparkContext jsc, Config cfg, TypedProperties props, HoodieTableMetaClient metaClient) { this.cfg = cfg; this.jsc = jsc; this.props = props; - this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true); + this.metaClient = metaClient; // Disable async cleaning, will trigger synchronous cleaning manually. this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false); - this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true); if (this.metaClient.getTableConfig().isMetadataTableAvailable()) { // add default lock config options if MDT is enabled. UtilHelpers.addLockOptions(cfg.basePath, this.props); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index c1464c2fe2c81..08e9ad80252ab 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -59,14 +59,15 @@ public class HoodieCompactor { private final HoodieTableMetaClient metaClient; public HoodieCompactor(JavaSparkContext jsc, Config cfg) { - this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs)); + this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs), + UtilHelpers.createMetaClient(jsc, cfg.basePath, true)); } - public HoodieCompactor(JavaSparkContext jsc, Config cfg, TypedProperties props) { + public HoodieCompactor(JavaSparkContext jsc, Config cfg, TypedProperties props, HoodieTableMetaClient metaClient) { this.cfg = cfg; this.jsc = jsc; this.props = props; - this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true); + this.metaClient = metaClient; // Disable async cleaning, will trigger synchronous cleaning manually. this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false); if (this.metaClient.getTableConfig().isMetadataTableAvailable()) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java index 532c59e97253e..e20d71e8cc9c4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.multitable; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.utilities.HoodieClusteringJob; import org.apache.spark.api.java.JavaSparkContext; @@ -43,13 +44,18 @@ class ClusteringTask extends TableServiceTask { */ private String clusteringMode; + /** + * Meta Client. + */ + private HoodieTableMetaClient metaClient; + @Override void run() { HoodieClusteringJob.Config clusteringConfig = new HoodieClusteringJob.Config(); clusteringConfig.basePath = basePath; clusteringConfig.parallelism = parallelism; clusteringConfig.runningMode = clusteringMode; - new HoodieClusteringJob(jsc, clusteringConfig, props).cluster(retry); + new HoodieClusteringJob(jsc, clusteringConfig, props, metaClient).cluster(retry); } /** @@ -98,6 +104,11 @@ public static final class Builder { */ private int retry; + /** + * Meta Client. + */ + private HoodieTableMetaClient metaClient; + private Builder() { } @@ -131,6 +142,11 @@ public Builder withRetry(int retry) { return this; } + public Builder withMetaclient(HoodieTableMetaClient metaClient) { + this.metaClient = metaClient; + return this; + } + public ClusteringTask build() { ClusteringTask clusteringTask = new ClusteringTask(); clusteringTask.jsc = this.jsc; @@ -139,6 +155,7 @@ public ClusteringTask build() { clusteringTask.retry = this.retry; clusteringTask.basePath = this.basePath; clusteringTask.props = this.props; + clusteringTask.metaClient = this.metaClient; return clusteringTask; } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java index 4ee3e14a66166..25b80e7cd45d5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.multitable; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.utilities.HoodieCompactor; import org.apache.spark.api.java.JavaSparkContext; @@ -48,6 +49,11 @@ class CompactionTask extends TableServiceTask { */ private int parallelism; + /** + * Meta Client. + */ + private HoodieTableMetaClient metaClient; + @Override void run() { HoodieCompactor.Config compactionCfg = new HoodieCompactor.Config(); @@ -56,7 +62,7 @@ void run() { compactionCfg.runningMode = compactionRunningMode; compactionCfg.parallelism = parallelism; compactionCfg.retry = retry; - new HoodieCompactor(jsc, compactionCfg, props).compact(retry); + new HoodieCompactor(jsc, compactionCfg, props, metaClient).compact(retry); } /** @@ -109,6 +115,11 @@ public static final class Builder { */ private JavaSparkContext jsc; + /** + * Meta Client. + */ + private HoodieTableMetaClient metaClient; + public Builder withProps(TypedProperties props) { this.props = props; return this; @@ -144,6 +155,11 @@ public Builder withJsc(JavaSparkContext jsc) { return this; } + public Builder withMetaclient(HoodieTableMetaClient metaClient) { + this.metaClient = metaClient; + return this; + } + public CompactionTask build() { CompactionTask compactionTask = new CompactionTask(); compactionTask.basePath = this.basePath; @@ -153,6 +169,7 @@ public CompactionTask build() { compactionTask.compactionStrategyName = this.compactionStrategyName; compactionTask.retry = this.retry; compactionTask.props = this.props; + compactionTask.metaClient = this.metaClient; return compactionTask; } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java index f9a761ea6b810..911305f9f0616 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java @@ -22,9 +22,11 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; +import org.apache.hudi.utilities.UtilHelpers; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -166,6 +168,8 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js HoodieMultiTableServicesMain.Config cfg, TypedProperties props) { TableServicePipeline pipeline = new TableServicePipeline(); + HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, basePath, true); + props.putAll(metaClient.getTableConfig().getProps()); if (cfg.enableCompaction) { pipeline.add(CompactionTask.newBuilder() .withJsc(jsc) @@ -175,6 +179,7 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js .withCompactionStrategyName(cfg.compactionStrategyClassName) .withProps(props) .withRetry(cfg.retry) + .withMetaclient(metaClient) .build()); } if (cfg.enableClustering) { @@ -185,6 +190,7 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js .withClusteringRunningMode(cfg.clusteringRunningMode) .withProps(props) .withRetry(cfg.retry) + .withMetaclient(metaClient) .build()); } if (cfg.enableClean) { From 78d78cf10cb0d6fec29f05413e739c82f5ab519d Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Mon, 13 Nov 2023 10:10:08 +0800 Subject: [PATCH 2/7] [HUDI-7069] Optimize metaclient construction and include table config in write config for multi-table services. --- .../main/java/org/apache/hudi/utilities/HoodieCompactor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 08e9ad80252ab..9b03cb7a72417 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -56,7 +56,7 @@ public class HoodieCompactor { private transient FileSystem fs; private TypedProperties props; private final JavaSparkContext jsc; - private final HoodieTableMetaClient metaClient; + private HoodieTableMetaClient metaClient; public HoodieCompactor(JavaSparkContext jsc, Config cfg) { this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs), @@ -257,7 +257,7 @@ private int doCompact(JavaSparkContext jsc) throws Exception { // If no compaction instant is provided by --instant-time, find the earliest scheduled compaction // instant from the active timeline if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) { - HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true); + metaClient = HoodieTableMetaClient.reload(metaClient); Option firstCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant(); if (firstCompactionInstant.isPresent()) { cfg.compactionInstantTime = firstCompactionInstant.get().getTimestamp(); From c168e2b99f16fe3ff946feac051e164d9882c582 Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Mon, 13 Nov 2023 15:21:18 +0800 Subject: [PATCH 3/7] [HUDI-7069] Optimize metaclient construction and include table config in write config for multi-table services. --- .../apache/hudi/utilities/multitable/MultiTableServiceUtils.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java index 911305f9f0616..656220c0feb65 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java @@ -169,6 +169,7 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js TypedProperties props) { TableServicePipeline pipeline = new TableServicePipeline(); HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, basePath, true); + // Add the table config to the write config. props.putAll(metaClient.getTableConfig().getProps()); if (cfg.enableCompaction) { pipeline.add(CompactionTask.newBuilder() From a76b1edc06812a669fe7491029ea84e163e0da3c Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Mon, 13 Nov 2023 17:51:05 +0800 Subject: [PATCH 4/7] [HUDI-7069] Optimize metaclient construction and include table config in write config for multi-table services. --- .../hudi/utilities/multitable/MultiTableServiceUtils.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java index 656220c0feb65..d6bf2b91c59eb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; @@ -163,14 +164,17 @@ private static boolean isHoodieTable(Path path, Configuration conf) { } } + public static void addNecessaryTableConfigToWriteConfig(HoodieTableMetaClient metaClient, TypedProperties props) { + props.put(HoodieTableConfig.NAME.key(), metaClient.getTableConfig().getTableName()); + } + public static TableServicePipeline buildTableServicePipeline(JavaSparkContext jsc, String basePath, HoodieMultiTableServicesMain.Config cfg, TypedProperties props) { TableServicePipeline pipeline = new TableServicePipeline(); HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, basePath, true); - // Add the table config to the write config. - props.putAll(metaClient.getTableConfig().getProps()); + addNecessaryTableConfigToWriteConfig(metaClient, props); if (cfg.enableCompaction) { pipeline.add(CompactionTask.newBuilder() .withJsc(jsc) From 934de8edf14d1986826740051ec9c92672788cc4 Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Tue, 14 Nov 2023 14:40:37 +0800 Subject: [PATCH 5/7] [HUDI-7069] Optimize metaclient construction and include table config in write config for multi-table services. --- .../multitable/MultiTableServiceUtils.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java index d6bf2b91c59eb..9deb0cf7da4ad 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; @@ -164,17 +165,18 @@ private static boolean isHoodieTable(Path path, Configuration conf) { } } - public static void addNecessaryTableConfigToWriteConfig(HoodieTableMetaClient metaClient, TypedProperties props) { - props.put(HoodieTableConfig.NAME.key(), metaClient.getTableConfig().getTableName()); - } - public static TableServicePipeline buildTableServicePipeline(JavaSparkContext jsc, String basePath, HoodieMultiTableServicesMain.Config cfg, TypedProperties props) { TableServicePipeline pipeline = new TableServicePipeline(); HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, basePath, true); - addNecessaryTableConfigToWriteConfig(metaClient, props); + TypedProperties propsWithTableConfig = new TypedProperties(); + propsWithTableConfig.putAll(metaClient.getTableConfig().getProps()); + props.stringPropertyNames().stream() + .filter(key -> !propsWithTableConfig.containsKey(key) || StringUtils.isNullOrEmpty(propsWithTableConfig.get(key).toString())) + .forEach(key -> propsWithTableConfig.put(key, props.get(key))); + if (cfg.enableCompaction) { pipeline.add(CompactionTask.newBuilder() .withJsc(jsc) @@ -182,7 +184,7 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js .withParallelism(cfg.parallelism) .withCompactionRunningMode(cfg.compactionRunningMode) .withCompactionStrategyName(cfg.compactionStrategyClassName) - .withProps(props) + .withProps(propsWithTableConfig) .withRetry(cfg.retry) .withMetaclient(metaClient) .build()); @@ -193,7 +195,7 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js .withJsc(jsc) .withParallelism(cfg.parallelism) .withClusteringRunningMode(cfg.clusteringRunningMode) - .withProps(props) + .withProps(propsWithTableConfig) .withRetry(cfg.retry) .withMetaclient(metaClient) .build()); @@ -203,14 +205,14 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js .withBasePath(basePath) .withJsc(jsc) .withRetry(cfg.retry) - .withProps(props) + .withProps(propsWithTableConfig) .build()); } if (cfg.enableArchive) { pipeline.add(ArchiveTask.newBuilder() .withBasePath(basePath) .withJsc(jsc) - .withProps(props) + .withProps(propsWithTableConfig) .withRetry(cfg.retry) .build()); } From 0fc15c72654edfb270efeda12db4504cfcc8d829 Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Tue, 14 Nov 2023 14:50:05 +0800 Subject: [PATCH 6/7] [HUDI-7069] Optimize metaclient construction and include table config in write config for multi-table services. --- .../hudi/utilities/multitable/MultiTableServiceUtils.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java index 9deb0cf7da4ad..d4ec09da2c547 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java @@ -22,7 +22,6 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -41,7 +40,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; @@ -172,9 +173,10 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js TableServicePipeline pipeline = new TableServicePipeline(); HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, basePath, true); TypedProperties propsWithTableConfig = new TypedProperties(); - propsWithTableConfig.putAll(metaClient.getTableConfig().getProps()); + Map tableConfigMap = new HashMap<>(metaClient.getTableConfig().propsMap()); + propsWithTableConfig.putAll(tableConfigMap); props.stringPropertyNames().stream() - .filter(key -> !propsWithTableConfig.containsKey(key) || StringUtils.isNullOrEmpty(propsWithTableConfig.get(key).toString())) + .filter(key -> !tableConfigMap.containsKey(key) || StringUtils.isNullOrEmpty(tableConfigMap.get(key))) .forEach(key -> propsWithTableConfig.put(key, props.get(key))); if (cfg.enableCompaction) { From 4a35dfe9d62932fd8aae3af3385d0a1d08a216f1 Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Tue, 14 Nov 2023 14:57:14 +0800 Subject: [PATCH 7/7] [HUDI-7069] Optimize metaclient construction and include table config in write config for multi-table services. --- .../utilities/multitable/MultiTableServiceUtils.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java index d4ec09da2c547..f600db65733d2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; @@ -40,9 +39,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; @@ -172,12 +169,8 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js TypedProperties props) { TableServicePipeline pipeline = new TableServicePipeline(); HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, basePath, true); - TypedProperties propsWithTableConfig = new TypedProperties(); - Map tableConfigMap = new HashMap<>(metaClient.getTableConfig().propsMap()); - propsWithTableConfig.putAll(tableConfigMap); - props.stringPropertyNames().stream() - .filter(key -> !tableConfigMap.containsKey(key) || StringUtils.isNullOrEmpty(tableConfigMap.get(key))) - .forEach(key -> propsWithTableConfig.put(key, props.get(key))); + TypedProperties propsWithTableConfig = new TypedProperties(metaClient.getTableConfig().getProps()); + propsWithTableConfig.putAll(props); if (cfg.enableCompaction) { pipeline.add(CompactionTask.newBuilder()