diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java index a2414abc3de21..57b95788d9aa6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java @@ -56,6 +56,8 @@ public class RowDataKeyGen implements Serializable { private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + private final boolean hasRecordKey; + private final String[] recordKeyFields; private final String[] partitionPathFields; @@ -90,7 +92,11 @@ private RowDataKeyGen( this.hiveStylePartitioning = hiveStylePartitioning; this.encodePartitionPath = encodePartitionPath; - if (this.recordKeyFields.length == 1) { + + this.hasRecordKey = hasRecordKey(fieldNames); + if (!hasRecordKey) { + this.recordKeyProjection = null; + } else if (this.recordKeyFields.length == 1) { // efficient code path this.simpleRecordKey = true; int recordKeyIdx = fieldNames.indexOf(this.recordKeyFields[0]); @@ -115,6 +121,14 @@ private RowDataKeyGen( this.keyGenOpt = keyGenOpt; } + /** + * Checks whether user provides any record key. + */ + private boolean hasRecordKey(List fieldNames) { + return recordKeyFields.length != 1 + || fieldNames.contains(recordKeyFields[0]); + } + public static RowDataKeyGen instance(Configuration conf, RowType rowType) { Option keyGeneratorOpt = Option.empty(); if (TimestampBasedAvroKeyGenerator.class.getName().equals(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME))) { @@ -134,7 +148,11 @@ public HoodieKey getHoodieKey(RowData rowData) { } public String getRecordKey(RowData rowData) { - if (this.simpleRecordKey) { + if (!hasRecordKey) { + // should be optimized to unique values that can be easily calculated with low cost + // for e.g, fileId + auto inc integer + return EMPTY_RECORDKEY_PLACEHOLDER; + } else if (this.simpleRecordKey) { return getRecordKey(recordKeyFieldGetter.getFieldOrNull(rowData), this.recordKeyFields[0]); } else { Object[] keyValues = this.recordKeyProjection.projectAsValues(rowData); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index c7a79561b3f75..d57d971f47692 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -19,8 +19,10 @@ package org.apache.hudi.table; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.index.HoodieIndex; @@ -30,6 +32,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.DataTypeUtils; +import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; @@ -68,12 +71,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab @Override public DynamicTableSource createDynamicTableSource(Context context) { Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); - ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); - sanityCheck(conf, schema); - setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); - Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> new ValidationException("Option [path] should not be empty."))); + setupTableOptions(conf.getString(FlinkOptions.PATH), conf); + ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); + setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); return new HoodieTableSource( schema, path, @@ -87,12 +89,34 @@ public DynamicTableSink createDynamicTableSink(Context context) { Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)), "Option [path] should not be empty."); + setupTableOptions(conf.getString(FlinkOptions.PATH), conf); ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); sanityCheck(conf, schema); setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); return new HoodieTableSink(conf, schema); } + /** + * Supplement the table config options if not specified. + */ + private void setupTableOptions(String basePath, Configuration conf) { + StreamerUtil.getTableConfig(basePath, HadoopConfigurations.getHadoopConf(conf)) + .ifPresent(tableConfig -> { + if (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS) + && !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) { + conf.setString(FlinkOptions.RECORD_KEY_FIELD, tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)); + } + if (tableConfig.contains(HoodieTableConfig.PRECOMBINE_FIELD) + && !conf.contains(FlinkOptions.PRECOMBINE_FIELD)) { + conf.setString(FlinkOptions.PRECOMBINE_FIELD, tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)); + } + if (tableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE) + && !conf.contains(FlinkOptions.HIVE_STYLE_PARTITIONING)) { + conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, tableConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)); + } + }); + } + @Override public String factoryIdentifier() { return FACTORY_ID; @@ -119,9 +143,17 @@ public Set> optionalOptions() { * @param schema The table schema */ private void sanityCheck(Configuration conf, ResolvedSchema schema) { - List fields = schema.getColumnNames(); + if (!OptionsResolver.isAppendMode(conf)) { + checkRecordKey(conf, schema); + checkPreCombineKey(conf, schema); + } + } - // validate record key in pk absence. + /** + * Validate the record key. + */ + private void checkRecordKey(Configuration conf, ResolvedSchema schema) { + List fields = schema.getColumnNames(); if (!schema.getPrimaryKey().isPresent()) { String[] recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","); if (recordKeys.length == 1 @@ -139,8 +171,13 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) { + "'" + FlinkOptions.RECORD_KEY_FIELD.key() + "' does not exist in the table schema."); }); } + } - // validate pre_combine key + /** + * Validate pre_combine key. + */ + private void checkPreCombineKey(Configuration conf, ResolvedSchema schema) { + List fields = schema.getColumnNames(); String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD); if (!fields.contains(preCombineField)) { if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index 8644435b5abea..6dcdf11841585 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; @@ -386,9 +387,9 @@ private Table translateSparkTable2Flink(ObjectPath tablePath, Table hiveTable) { if (!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) { // read the table config first final boolean hiveStyle; - HoodieTableConfig tableConfig = StreamerUtil.getTableConfig(path, hiveConf); - if (tableConfig != null && tableConfig.contains(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) { - hiveStyle = Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable()); + Option tableConfig = StreamerUtil.getTableConfig(path, hiveConf); + if (tableConfig.isPresent() && tableConfig.get().contains(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) { + hiveStyle = Boolean.parseBoolean(tableConfig.get().getHiveStylePartitioningEnable()); } else { // fallback to the partition path pattern Path hoodieTablePath = new Path(path); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index a6bddf1e82f86..1e5af896928a0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -57,8 +57,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.io.BufferedReader; import java.io.IOException; import java.io.StringReader; @@ -287,20 +285,19 @@ public static HoodieTableMetaClient createMetaClient(Configuration conf) { } /** - * Returns the table config or null if the table does not exist. + * Returns the table config or empty if the table does not exist. */ - @Nullable - public static HoodieTableConfig getTableConfig(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) { + public static Option getTableConfig(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) { FileSystem fs = FSUtils.getFs(basePath, hadoopConf); Path metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME); try { if (fs.exists(metaPath)) { - return new HoodieTableConfig(fs, metaPath.toString(), null, null); + return Option.of(new HoodieTableConfig(fs, metaPath.toString(), null, null)); } } catch (IOException e) { throw new HoodieIOException("Get table config error", e); } - return null; + return Option.empty(); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java index a66874c486414..6fae13811f4cf 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java @@ -165,4 +165,24 @@ void testDateBasedKeyGenerator(String partitionFormat) { assertThat(keyGen2.getPartitionPath(rowData2), is("dt=" + expectedPartition2)); assertThat(keyGen2.getPartitionPath(rowData3), is("dt=" + expectedPartition3)); } + + @Test + void testPrimaryKeylessWrite() { + Configuration conf = TestConfigurations.getDefaultConf("path1"); + conf.setString(FlinkOptions.RECORD_KEY_FIELD, ""); + final RowData rowData1 = insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")); + final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE); + assertThat(keyGen1.getRecordKey(rowData1), is("__empty__")); + + // null record key and partition path + final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), null); + assertThat(keyGen1.getRecordKey(rowData2), is("__empty__")); + + // empty record key and partition path + final RowData rowData3 = insertRow(StringData.fromString(""), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("")); + assertThat(keyGen1.getRecordKey(rowData3), is("__empty__")); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index f7a35e57f2b09..b9964b70f79d6 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -86,17 +86,26 @@ void beforeEach() throws IOException { } @Test - void testRequiredOptionsForSource() { - // miss pk and precombine key will throw exception + void testRequiredOptions() { ResolvedSchema schema1 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) .build(); final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2"); - assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext1)); + + // createDynamicTableSource doesn't call sanity check, will not throw exception + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext1)); + // miss pk and precombine key will throw exception when create sink assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1)); + // append mode does not throw + this.conf.set(FlinkOptions.OPERATION, "insert"); + final MockContext sourceContext11 = MockContext.getInstance(this.conf, schema1, "f2"); + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext11)); + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext11)); + this.conf.set(FlinkOptions.OPERATION, FlinkOptions.OPERATION.defaultValue()); + // a non-exists precombine key will throw exception ResolvedSchema schema2 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) @@ -105,7 +114,8 @@ void testRequiredOptionsForSource() { .build(); this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, "non_exist_field"); final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2"); - assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext2)); + // createDynamicTableSource doesn't call sanity check, will not throw exception + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext2)); assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2)); this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.PRECOMBINE_FIELD.defaultValue()); @@ -120,17 +130,17 @@ void testRequiredOptionsForSource() { HoodieTableSource tableSource = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext3); HoodieTableSink tableSink = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext3); // the precombine field is overwritten - assertThat(tableSource.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE)); assertThat(tableSink.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE)); // precombine field not specified, use the default payload clazz assertThat(tableSource.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue())); assertThat(tableSink.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue())); - // given pk but miss the pre combine key with DefaultHoodieRecordPayload should throw this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName()); final MockContext sourceContext4 = MockContext.getInstance(this.conf, schema3, "f2"); - assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext4)); + // createDynamicTableSource doesn't call sanity check, will not throw exception + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext4)); + // given pk but miss the pre combine key with DefaultHoodieRecordPayload should throw assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext4)); this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()); @@ -167,6 +177,74 @@ void testRequiredOptionsForSource() { assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext6)); } + @Test + void testSupplementTableConfig() throws Exception { + String tablePath = new File(tempFile.getAbsolutePath(), "dummy").getAbsolutePath(); + // add pk and pre-combine key to table config + Configuration tableConf = new Configuration(); + tableConf.setString(FlinkOptions.PATH, tablePath); + tableConf.setString(FlinkOptions.TABLE_NAME, "t2"); + tableConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0,f1"); + tableConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f2"); + + StreamerUtil.initTableIfNotExists(tableConf); + + Configuration writeConf = new Configuration(); + writeConf.set(FlinkOptions.PATH, tablePath); + writeConf.set(FlinkOptions.TABLE_NAME, "t2"); + + // fallback to table config + ResolvedSchema schema1 = SchemaBuilder.instance() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) + .build(); + final MockContext sourceContext1 = MockContext.getInstance(writeConf, schema1, "f2"); + HoodieTableSource source1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1); + HoodieTableSink sink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext1); + assertThat("pk not provided, fallback to table config", + source1.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); + assertThat("pk not provided, fallback to table config", + sink1.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); + assertThat("pre-combine key not provided, fallback to table config", + source1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f2")); + assertThat("pre-combine key not provided, fallback to table config", + sink1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f2")); + + // write config always has higher priority + // set up a different primary key and pre_combine key with table config options + writeConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0"); + writeConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f1"); + + final MockContext sourceContext2 = MockContext.getInstance(writeConf, schema1, "f2"); + HoodieTableSource source2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2); + HoodieTableSink sink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext2); + assertThat("choose pk from write config", + source2.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0")); + assertThat("choose pk from write config", + sink2.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0")); + assertThat("choose preCombine key from write config", + source2.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f1")); + assertThat("choose preCombine pk from write config", + sink2.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f1")); + + writeConf.removeConfig(FlinkOptions.RECORD_KEY_FIELD); + writeConf.removeConfig(FlinkOptions.PRECOMBINE_FIELD); + + // pk defined in table config but missing in schema will throw + ResolvedSchema schema2 = SchemaBuilder.instance() + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) + .build(); + final MockContext sourceContext3 = MockContext.getInstance(writeConf, schema2, "f2"); + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext3), + "createDynamicTableSource won't call sanity check"); + assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext3), + "f0 is in table config as record key, but missing in input schema"); + } + @Test void testInferAvroSchemaForSource() { // infer the schema if not specified