Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,31 +217,33 @@ private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table
}
}

// tweak the key gen class if possible
final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
if (partitions.length == 1) {
final String partitionField = partitions[0];
if (partitionField.isEmpty()) {
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName());
LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table",
FlinkOptions.KEYGEN_CLASS_NAME.key(), NonpartitionedAvroKeyGenerator.class.getName());
return;
if (StringUtils.isNullOrEmpty(conf.get(FlinkOptions.KEYGEN_CLASS_NAME))) {
// tweak the key gen class if possible
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, what kind of keygen clazz do you want to configure for non-partitioned table then ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @danny0405 I want to configure org.apache.hudi.keygen.ComplexAvroKeyGenerator for non partition in hudi-flink side, which not only follow partitions, because in spark side, use complex key as default, but flink can not assign complex for non partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #5815, we have fixed the spark sql to use NonpartitionedKeyGenerator for non partitioned table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But user cannot assign keygen_class seems not friendly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally that's true, but non-partitioned table is a special case and hudi configure the keygen clazz transparently for user.

Copy link
Contributor Author

@TJX2014 TJX2014 Sep 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hudi configure keygen clazz auto is great, so the option should not exists, once configured but not effect, is it strange?The code in spark has changed to follow hudi-partition way, but in historical data, if the layout of non-partitioned table with complex key by spark, the only chance for hudi-flink is to configure keygen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but we better use the right key gen clazz for better performance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, shorter key will gain better performance, but this option should also take effect, right?

final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
if (partitions.length == 1) {
final String partitionField = partitions[0];
if (partitionField.isEmpty()) {
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName());
LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table",
FlinkOptions.KEYGEN_CLASS_NAME.key(), NonpartitionedAvroKeyGenerator.class.getName());
return;
}
DataType partitionFieldType = table.getSchema().getFieldDataType(partitionField)
.orElseThrow(() -> new HoodieValidationException("Field " + partitionField + " does not exist"));
if (pks.length <= 1 && DataTypeUtils.isDatetimeType(partitionFieldType)) {
// timestamp based key gen only supports simple primary key
setupTimestampKeygenOptions(conf, partitionFieldType);
return;
}
}
DataType partitionFieldType = table.getSchema().getFieldDataType(partitionField)
.orElseThrow(() -> new HoodieValidationException("Field " + partitionField + " does not exist"));
if (pks.length <= 1 && DataTypeUtils.isDatetimeType(partitionFieldType)) {
// timestamp based key gen only supports simple primary key
setupTimestampKeygenOptions(conf, partitionFieldType);
return;
boolean complexHoodieKey = pks.length > 1 || partitions.length > 1;
if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS_NAME)) {
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName());
LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields",
FlinkOptions.KEYGEN_CLASS_NAME.key(), ComplexAvroKeyGenerator.class.getName());
}
}
boolean complexHoodieKey = pks.length > 1 || partitions.length > 1;
if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS_NAME)) {
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName());
LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields",
FlinkOptions.KEYGEN_CLASS_NAME.key(), ComplexAvroKeyGenerator.class.getName());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,19 @@ void testSetupHoodieKeyOptionsForSource() {
final Configuration conf3 = tableSource3.getConf();
assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS_NAME), is(NonpartitionedAvroKeyGenerator.class.getName()));

// non partition should also respect KEYGEN_CLASS_NAME in conf
this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, "dummyKeyGenClass");
// definition with simple primary key and partition path
ResolvedSchema schema4 = SchemaBuilder.instance()
.field("f0", DataTypes.INT().notNull())
.primaryKey("f0")
.build();
final MockContext sourceContext4 = MockContext.getInstance(this.conf, schema4, "");
final HoodieTableSource tableSource4 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext4);
final Configuration conf4 = tableSource4.getConf();
assertThat(conf4.get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
assertThat(conf4.get(FlinkOptions.KEYGEN_CLASS_NAME), is("dummyKeyGenClass"));
}

@Test
Expand Down