diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 1be90603605cd..903e67e0db589 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -675,6 +675,18 @@ private FlinkOptions() { .withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n" + "Disabled by default for backward compatibility."); + public static final ConfigOption HIVE_SYNC_TABLE_PROPERTIES = ConfigOptions + .key("hive_sync.table_properties") + .stringType() + .noDefaultValue() + .withDescription("Additional properties to store with table, the data format is k1=v1\nk2=v2"); + + public static final ConfigOption HIVE_SYNC_TABLE_SERDE_PROPERTIES = ConfigOptions + .key("hive_sync.serde_properties") + .stringType() + .noDefaultValue() + .withDescription("Serde properties to hive table, the data format is k1=v1\nk2=v2"); + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index 1c051c8cd2300..768d36e0abe4b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -74,6 +74,8 @@ private static HiveSyncConfig buildSyncConfig(Configuration conf) { hiveSyncConfig.syncMode = conf.getString(FlinkOptions.HIVE_SYNC_MODE); hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME); hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD); + hiveSyncConfig.tableProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_PROPERTIES); + hiveSyncConfig.serdeProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES); hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL); hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractPartitionKeys(conf)); hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME);