Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,12 @@ private FlinkOptions() {
.noDefaultValue()
.withDescription("Serde properties to hive table, the data format is k1=v1\nk2=v2");

public static final ConfigOption<String> HIVE_SYNC_CONF_DIR = ConfigOptions
.key("hive_sync.conf.dir")
.stringType()
.noDefaultValue()
.withDescription("The hive configuration directory, where the hive-site.xml lies in, the file should be put on the client machine");

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hudi.configuration;

import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hudi.util.FlinkClientUtil;

import java.util.Map;
Expand Down Expand Up @@ -51,4 +53,16 @@ public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration c
options.forEach(hadoopConf::set);
return hadoopConf;
}

/**
* Creates a Hive configuration with configured dir path or empty if no Hive conf dir is set.
*/
public static org.apache.hadoop.conf.Configuration getHiveConf(Configuration conf) {
String explicitDir = conf.getString(FlinkOptions.HIVE_SYNC_CONF_DIR, System.getenv("HIVE_CONF_DIR"));
org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
if (explicitDir != null) {
hadoopConf.addResource(new Path(explicitDir, "hive-site.xml"));
}
return hadoopConf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -28,6 +29,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.CommitAckEvent;
Expand Down Expand Up @@ -82,6 +84,11 @@ public class StreamWriteOperatorCoordinator
*/
private final Configuration conf;

/**
* Hive config options.
*/
private final SerializableConfiguration hiveConf;

/**
* Coordinator context.
*/
Expand Down Expand Up @@ -160,6 +167,7 @@ public StreamWriteOperatorCoordinator(
this.conf = conf;
this.context = context;
this.parallelism = context.currentParallelism();
this.hiveConf = new SerializableConfiguration(HadoopConfigurations.getHiveConf(conf));
}

@Override
Expand Down Expand Up @@ -314,7 +322,7 @@ public void subtaskReady(int i, SubtaskGateway subtaskGateway) {

private void initHiveSync() {
this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
this.hiveSyncContext = HiveSyncContext.create(conf);
this.hiveSyncContext = HiveSyncContext.create(conf, this.hiveConf);
}

private void syncHiveAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
Expand Down Expand Up @@ -58,7 +59,7 @@ public HiveSyncTool hiveSyncTool() {
return new HiveSyncTool(this.syncConfig, this.hiveConf, this.fs);
}

public static HiveSyncContext create(Configuration conf) {
public static HiveSyncContext create(Configuration conf, SerializableConfiguration serConf) {
HiveSyncConfig syncConfig = buildSyncConfig(conf);
org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf);
String path = conf.getString(FlinkOptions.PATH);
Expand All @@ -67,6 +68,7 @@ public static HiveSyncContext create(Configuration conf) {
if (!FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_METASTORE_URIS)) {
hadoopConf.set(HiveConf.ConfVars.METASTOREURIS.varname, conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS));
}
hiveConf.addResource(serConf.get());
hiveConf.addResource(hadoopConf);
return new HiveSyncContext(syncConfig, hiveConf, fs);
}
Expand Down