diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 3de4bd4f757b8..57cb8daa44579 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -769,6 +769,12 @@ private FlinkOptions() { .noDefaultValue() .withDescription("Serde properties to hive table, the data format is k1=v1\nk2=v2"); + public static final ConfigOption HIVE_SYNC_CONF_DIR = ConfigOptions + .key("hive_sync.conf.dir") + .stringType() + .noDefaultValue() + .withDescription("The hive configuration directory, where the hive-site.xml lies in, the file should be put on the client machine"); + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java index 72f20311504d0..d15ef280f532a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java @@ -19,6 +19,8 @@ package org.apache.hudi.configuration; import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.fs.Path; + import org.apache.hudi.util.FlinkClientUtil; import java.util.Map; @@ -51,4 +53,16 @@ public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration c options.forEach(hadoopConf::set); return hadoopConf; } + + /** + * Creates a Hive configuration with configured dir path or empty if no Hive conf dir is set. + */ + public static org.apache.hadoop.conf.Configuration getHiveConf(Configuration conf) { + String explicitDir = conf.getString(FlinkOptions.HIVE_SYNC_CONF_DIR, System.getenv("HIVE_CONF_DIR")); + org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + if (explicitDir != null) { + hadoopConf.addResource(new Path(explicitDir, "hive-site.xml")); + } + return hadoopConf; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 39976e5ee2dc4..75e8beaef17cf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -28,6 +29,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.CommitAckEvent; @@ -82,6 +84,11 @@ public class StreamWriteOperatorCoordinator */ private final Configuration conf; + /** + * Hive config options. + */ + private final SerializableConfiguration hiveConf; + /** * Coordinator context. */ @@ -160,6 +167,7 @@ public StreamWriteOperatorCoordinator( this.conf = conf; this.context = context; this.parallelism = context.currentParallelism(); + this.hiveConf = new SerializableConfiguration(HadoopConfigurations.getHiveConf(conf)); } @Override @@ -314,7 +322,7 @@ public void subtaskReady(int i, SubtaskGateway subtaskGateway) { private void initHiveSync() { this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); - this.hiveSyncContext = HiveSyncContext.create(conf); + this.hiveSyncContext = HiveSyncContext.create(conf, this.hiveConf); } private void syncHiveAsync() { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index bd837efc8737d..9fc5323d46a2d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool; +import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; @@ -58,7 +59,7 @@ public HiveSyncTool hiveSyncTool() { return new HiveSyncTool(this.syncConfig, this.hiveConf, this.fs); } - public static HiveSyncContext create(Configuration conf) { + public static HiveSyncContext create(Configuration conf, SerializableConfiguration serConf) { HiveSyncConfig syncConfig = buildSyncConfig(conf); org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf); String path = conf.getString(FlinkOptions.PATH); @@ -67,6 +68,7 @@ public static HiveSyncContext create(Configuration conf) { if (!FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_METASTORE_URIS)) { hadoopConf.set(HiveConf.ConfVars.METASTOREURIS.varname, conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS)); } + hiveConf.addResource(serConf.get()); hiveConf.addResource(hadoopConf); return new HiveSyncContext(syncConfig, hiveConf, fs); }