diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index 52ffa8565916..536a0282fbcc 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.utils; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.configuration.FlinkOptions; @@ -70,7 +71,8 @@ public static HiveSyncContext create(Configuration conf) { return new HiveSyncContext(syncConfig, hiveConf, fs); } - private static HiveSyncConfig buildSyncConfig(Configuration conf) { + @VisibleForTesting + public static HiveSyncConfig buildSyncConfig(Configuration conf) { HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(); hiveSyncConfig.basePath = conf.getString(FlinkOptions.PATH); hiveSyncConfig.baseFileFormat = conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT); @@ -83,7 +85,7 @@ private static HiveSyncConfig buildSyncConfig(Configuration conf) { hiveSyncConfig.tableProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_PROPERTIES); hiveSyncConfig.serdeProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES); hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL); - hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractPartitionKeys(conf)); + hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractHivePartitionFields(conf)); hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME); hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC); hiveSyncConfig.useFileListingFromMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java index 523062590ea9..99efa0b36a7a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java @@ -425,4 +425,17 @@ public static String[] extractPartitionKeys(org.apache.flink.configuration.Confi } return conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","); } + + /** + * Extracts the hive sync partition fields with given configuration. + * + * @param conf The flink configuration + * @return array of the hive partition fields + */ + public static String[] extractHivePartitionFields(org.apache.flink.configuration.Configuration conf) { + if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_PARTITION_FIELDS)) { + return extractPartitionKeys(conf); + } + return conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS).split(","); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java new file mode 100644 index 000000000000..7bfaade59ea2 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.flink.configuration.Configuration; + +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.hive.HiveSyncConfig; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Method; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for {@link HiveSyncContext}. + */ +public class TestHiveSyncContext { + /** + * Test that the file ids generated by the task can finally shuffled to itself. + */ + @Test + public void testBuildSyncConfig() throws Exception { + Configuration configuration1 = new Configuration(); + Configuration configuration2 = new Configuration(); + String hiveSyncPartitionField = "hiveSyncPartitionField"; + String partitionPathField = "partitionPathField"; + + configuration1.setString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS, hiveSyncPartitionField); + configuration1.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPathField); + + configuration2.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPathField); + + Class threadClazz = Class.forName("org.apache.hudi.sink.utils.HiveSyncContext"); + Method buildSyncConfigMethod = threadClazz.getDeclaredMethod("buildSyncConfig", Configuration.class); + buildSyncConfigMethod.setAccessible(true); + + HiveSyncConfig hiveSyncConfig1 = HiveSyncContext.buildSyncConfig(configuration1); + HiveSyncConfig hiveSyncConfig2 = HiveSyncContext.buildSyncConfig(configuration2); + + assertTrue(hiveSyncConfig1.partitionFields.get(0).equals(hiveSyncPartitionField)); + assertTrue(hiveSyncConfig2.partitionFields.get(0).equals(partitionPathField)); + + } +}