diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 7c48adf32f17f..8a98657f242e2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -479,32 +479,6 @@ object DataSourceWriteOptions { + "Use this when you are in the process of migrating from " + "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to org.apache.hudi input format") - // spark data source write pool name. Incase of streaming sink, users might be interested to set custom scheduling configs - // for regular writes and async compaction. In such cases, this pool name will be used for spark datasource writes. - val SPARK_DATASOURCE_WRITER_POOL_NAME = "sparkdatasourcewrite" - - /* - When async compaction is enabled (deltastreamer or streaming sink), users might be interested to set custom - scheduling configs for regular writes and async compaction. This is the property used to set custom scheduler config - file with spark. In Deltastreamer, the file is generated within hudi and set if necessary. Where as in case of streaming - sink, users have to set this property when they invoke spark shell. - Sample format of the file contents. - - - - FAIR - 4 - 2 - - - FAIR - 3 - 1 - - - */ - val SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file" - /** @deprecated Use {@link HIVE_SYNC_MODE} instead of this config from 0.9.0 */ @Deprecated val HIVE_USE_JDBC: ConfigProperty[String] = ConfigProperty diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index d5b03f5befd6e..b7f04c54e87c3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -124,8 +124,8 @@ object HoodieSparkSqlWriter { val jsc = new JavaSparkContext(sparkContext) if (asyncCompactionTriggerFn.isDefined) { - if (jsc.getConf.getOption(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY).isDefined) { - jsc.setLocalProperty("spark.scheduler.pool", DataSourceWriteOptions.SPARK_DATASOURCE_WRITER_POOL_NAME) + if (jsc.getConf.getOption(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY).isDefined) { + jsc.setLocalProperty("spark.scheduler.pool", SparkConfigs.SPARK_DATASOURCE_WRITER_POOL_NAME) } } val instantTime = HoodieActiveTimeline.createNewInstantTime() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkConfigs.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkConfigs.scala new file mode 100644 index 0000000000000..75dee2108914f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkConfigs.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +object SparkConfigs { + + // spark data source write pool name. Incase of streaming sink, users might be interested to set custom scheduling configs + // for regular writes and async compaction. In such cases, this pool name will be used for spark datasource writes. + val SPARK_DATASOURCE_WRITER_POOL_NAME = "sparkdatasourcewrite" + + /* + When async compaction is enabled (deltastreamer or streaming sink), users might be interested to set custom + scheduling configs for regular writes and async compaction. This is the property used to set custom scheduler config + file with spark. In Deltastreamer, the file is generated within hudi and set if necessary. Where as in case of streaming + sink, users have to set this property when they invoke spark shell. + Sample format of the file contents. + + + + FAIR + 4 + 2 + + + FAIR + 3 + 1 + + + */ + val SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file" + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java index ed288673abfbf..b991f9d46cb0b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java @@ -18,7 +18,7 @@ package org.apache.hudi.utilities.deltastreamer; -import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.SparkConfigs; import org.apache.hudi.async.AsyncCompactService; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.util.Option; @@ -85,7 +85,7 @@ public static Map getSparkSchedulingConfigs(HoodieDeltaStreamer. && cfg.continuousMode && cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight, cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare); - additionalSparkConfigs.put(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY(), sparkSchedulingConfFile); + additionalSparkConfigs.put(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY(), sparkSchedulingConfFile); } else { LOG.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode " + "is not set to FAIR at instantiation time. Continuing without scheduling configs"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSchedulerConfGenerator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSchedulerConfGenerator.java index 034284599f3e9..26b8bc1c88580 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSchedulerConfGenerator.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSchedulerConfGenerator.java @@ -18,7 +18,7 @@ package org.apache.hudi.utilities.deltastreamer; -import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.SparkConfigs; import org.apache.hudi.common.model.HoodieTableType; import org.junit.jupiter.api.Test; @@ -34,21 +34,21 @@ public class TestSchedulerConfGenerator { public void testGenerateSparkSchedulingConf() throws Exception { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); Map configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); - assertNull(configs.get(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "spark.scheduler.mode not set"); + assertNull(configs.get(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "spark.scheduler.mode not set"); System.setProperty(SchedulerConfGenerator.SPARK_SCHEDULER_MODE_KEY, "FAIR"); cfg.continuousMode = false; configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); - assertNull(configs.get(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "continuousMode is false"); + assertNull(configs.get(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "continuousMode is false"); cfg.continuousMode = true; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); - assertNull(configs.get(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), + assertNull(configs.get(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "table type is not MERGE_ON_READ"); cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); - assertNotNull(configs.get(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "all satisfies"); + assertNotNull(configs.get(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "all satisfies"); } }