diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 1cf66ea3437ef..d3b6edeb09e25 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -217,31 +217,33 @@ private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table } } - // tweak the key gen class if possible - final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","); - final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","); - if (partitions.length == 1) { - final String partitionField = partitions[0]; - if (partitionField.isEmpty()) { - conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName()); - LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table", - FlinkOptions.KEYGEN_CLASS_NAME.key(), NonpartitionedAvroKeyGenerator.class.getName()); - return; + if (StringUtils.isNullOrEmpty(conf.get(FlinkOptions.KEYGEN_CLASS_NAME))) { + // tweak the key gen class if possible + final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","); + final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","); + if (partitions.length == 1) { + final String partitionField = partitions[0]; + if (partitionField.isEmpty()) { + conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName()); + LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table", + FlinkOptions.KEYGEN_CLASS_NAME.key(), NonpartitionedAvroKeyGenerator.class.getName()); + return; + } + DataType partitionFieldType = table.getSchema().getFieldDataType(partitionField) + .orElseThrow(() -> new HoodieValidationException("Field " + partitionField + " does not exist")); + if (pks.length <= 1 && DataTypeUtils.isDatetimeType(partitionFieldType)) { + // timestamp based key gen only supports simple primary key + setupTimestampKeygenOptions(conf, partitionFieldType); + return; + } } - DataType partitionFieldType = table.getSchema().getFieldDataType(partitionField) - .orElseThrow(() -> new HoodieValidationException("Field " + partitionField + " does not exist")); - if (pks.length <= 1 && DataTypeUtils.isDatetimeType(partitionFieldType)) { - // timestamp based key gen only supports simple primary key - setupTimestampKeygenOptions(conf, partitionFieldType); - return; + boolean complexHoodieKey = pks.length > 1 || partitions.length > 1; + if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS_NAME)) { + conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName()); + LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields", + FlinkOptions.KEYGEN_CLASS_NAME.key(), ComplexAvroKeyGenerator.class.getName()); } } - boolean complexHoodieKey = pks.length > 1 || partitions.length > 1; - if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS_NAME)) { - conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName()); - LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields", - FlinkOptions.KEYGEN_CLASS_NAME.key(), ComplexAvroKeyGenerator.class.getName()); - } } /** diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index f7a35e57f2b09..a3e7d1f97b9e6 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -223,6 +223,19 @@ void testSetupHoodieKeyOptionsForSource() { final Configuration conf3 = tableSource3.getConf(); assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS_NAME), is(NonpartitionedAvroKeyGenerator.class.getName())); + + // non partition should also respect KEYGEN_CLASS_NAME in conf + this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, "dummyKeyGenClass"); + // definition with simple primary key and partition path + ResolvedSchema schema4 = SchemaBuilder.instance() + .field("f0", DataTypes.INT().notNull()) + .primaryKey("f0") + .build(); + final MockContext sourceContext4 = MockContext.getInstance(this.conf, schema4, ""); + final HoodieTableSource tableSource4 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext4); + final Configuration conf4 = tableSource4.getConf(); + assertThat(conf4.get(FlinkOptions.RECORD_KEY_FIELD), is("f0")); + assertThat(conf4.get(FlinkOptions.KEYGEN_CLASS_NAME), is("dummyKeyGenClass")); } @Test