Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class RowDataKeyGen implements Serializable {

private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";

private final boolean hasRecordKey;

private final String[] recordKeyFields;
private final String[] partitionPathFields;

Expand Down Expand Up @@ -90,7 +92,11 @@ private RowDataKeyGen(

this.hiveStylePartitioning = hiveStylePartitioning;
this.encodePartitionPath = encodePartitionPath;
if (this.recordKeyFields.length == 1) {

this.hasRecordKey = hasRecordKey(fieldNames);
if (!hasRecordKey) {
this.recordKeyProjection = null;
} else if (this.recordKeyFields.length == 1) {
// efficient code path
this.simpleRecordKey = true;
int recordKeyIdx = fieldNames.indexOf(this.recordKeyFields[0]);
Expand All @@ -115,6 +121,14 @@ private RowDataKeyGen(
this.keyGenOpt = keyGenOpt;
}

/**
* Checks whether user provides any record key.
*/
private boolean hasRecordKey(List<String> fieldNames) {
return recordKeyFields.length != 1
|| fieldNames.contains(recordKeyFields[0]);
}

public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
if (TimestampBasedAvroKeyGenerator.class.getName().equals(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME))) {
Expand All @@ -134,7 +148,11 @@ public HoodieKey getHoodieKey(RowData rowData) {
}

public String getRecordKey(RowData rowData) {
if (this.simpleRecordKey) {
if (!hasRecordKey) {
// should be optimized to unique values that can be easily calculated with low cost
// for e.g, fileId + auto inc integer
return EMPTY_RECORDKEY_PLACEHOLDER;
} else if (this.simpleRecordKey) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether we should use the empty string for the pk-less scenario, because all the records would have the same value primary key, which breaks the pk-less semantics, for pk-less, we actually mean all the records are unique, there is no need to define the primary key.

Another solution is to use the UUID as the primary key, WDYT ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether we should use the empty string for the pk-less scenario, because all the records would have the same value primary key, which breaks the pk-less semantics, for pk-less, we actually mean all the records are unique, there is no need to define the primary key.

Another solution is to use the UUID as the primary key, WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if remove the pk field will cause error somewhere, and write a identical value should use very low storage in columnar file format like parquet, and UUID will use much more space since its uniq so cannot compress well, and i don't know where we can use uuid, so i think maybe store a identical value for pk is better.

I change default key value to RowDataKeyGen.EMPTY_RECORDKEY_PLACEHOLDER since empty row key will report error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #7622, empty string is also used for keyless primary keys, so it's okey here if we reach an agreement and never uses the primary key.

return getRecordKey(recordKeyFieldGetter.getFieldOrNull(rowData), this.recordKeyFields[0]);
} else {
Object[] keyValues = this.recordKeyProjection.projectAsValues(rowData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.hudi.table;

import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.index.HoodieIndex;
Expand All @@ -30,6 +32,7 @@
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -68,12 +71,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);

Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should not be empty.")));
setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
Copy link
Contributor

@danny0405 danny0405 Jan 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sanity check for source can be removed. When the primary key definition is missing, the streaming source for MOR table would distinguish the case as pk-less, so no deletes are emitted.

return new HoodieTableSource(
schema,
path,
Expand All @@ -87,12 +89,34 @@ public DynamicTableSink createDynamicTableSink(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
"Option [path] should not be empty.");
setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
return new HoodieTableSink(conf, schema);
}

/**
* Supplement the table config options if not specified.
*/
private void setupTableOptions(String basePath, Configuration conf) {
StreamerUtil.getTableConfig(basePath, HadoopConfigurations.getHadoopConf(conf))
.ifPresent(tableConfig -> {
if (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)
&& !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
conf.setString(FlinkOptions.RECORD_KEY_FIELD, tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));
}
if (tableConfig.contains(HoodieTableConfig.PRECOMBINE_FIELD)
&& !conf.contains(FlinkOptions.PRECOMBINE_FIELD)) {
conf.setString(FlinkOptions.PRECOMBINE_FIELD, tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD));
}
if (tableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)
&& !conf.contains(FlinkOptions.HIVE_STYLE_PARTITIONING)) {
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, tableConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
}
});
}

@Override
public String factoryIdentifier() {
return FACTORY_ID;
Expand All @@ -119,9 +143,17 @@ public Set<ConfigOption<?>> optionalOptions() {
* @param schema The table schema
*/
private void sanityCheck(Configuration conf, ResolvedSchema schema) {
List<String> fields = schema.getColumnNames();
if (!OptionsResolver.isAppendMode(conf)) {
checkRecordKey(conf, schema);
checkPreCombineKey(conf, schema);
}
}

// validate record key in pk absence.
/**
* Validate the record key.
*/
private void checkRecordKey(Configuration conf, ResolvedSchema schema) {
List<String> fields = schema.getColumnNames();
if (!schema.getPrimaryKey().isPresent()) {
String[] recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
if (recordKeys.length == 1
Expand All @@ -139,8 +171,13 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) {
+ "'" + FlinkOptions.RECORD_KEY_FIELD.key() + "' does not exist in the table schema.");
});
}
}

// validate pre_combine key
/**
* Validate pre_combine key.
*/
private void checkPreCombineKey(Configuration conf, ResolvedSchema schema) {
List<String> fields = schema.getColumnNames();
String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
if (!fields.contains(preCombineField)) {
if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -386,9 +387,9 @@ private Table translateSparkTable2Flink(ObjectPath tablePath, Table hiveTable) {
if (!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
// read the table config first
final boolean hiveStyle;
HoodieTableConfig tableConfig = StreamerUtil.getTableConfig(path, hiveConf);
if (tableConfig != null && tableConfig.contains(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
hiveStyle = Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
Option<HoodieTableConfig> tableConfig = StreamerUtil.getTableConfig(path, hiveConf);
if (tableConfig.isPresent() && tableConfig.get().contains(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
hiveStyle = Boolean.parseBoolean(tableConfig.get().getHiveStylePartitioningEnable());
} else {
// fallback to the partition path pattern
Path hoodieTablePath = new Path(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
Expand Down Expand Up @@ -287,20 +285,19 @@ public static HoodieTableMetaClient createMetaClient(Configuration conf) {
}

/**
* Returns the table config or null if the table does not exist.
* Returns the table config or empty if the table does not exist.
*/
@Nullable
public static HoodieTableConfig getTableConfig(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
public static Option<HoodieTableConfig> getTableConfig(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
Path metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
try {
if (fs.exists(metaPath)) {
return new HoodieTableConfig(fs, metaPath.toString(), null, null);
return Option.of(new HoodieTableConfig(fs, metaPath.toString(), null, null));
}
} catch (IOException e) {
throw new HoodieIOException("Get table config error", e);
}
return null;
return Option.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,24 @@ void testDateBasedKeyGenerator(String partitionFormat) {
assertThat(keyGen2.getPartitionPath(rowData2), is("dt=" + expectedPartition2));
assertThat(keyGen2.getPartitionPath(rowData3), is("dt=" + expectedPartition3));
}

@Test
void testPrimaryKeylessWrite() {
Configuration conf = TestConfigurations.getDefaultConf("path1");
conf.setString(FlinkOptions.RECORD_KEY_FIELD, "");
final RowData rowData1 = insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1"));
final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE);
assertThat(keyGen1.getRecordKey(rowData1), is("__empty__"));

// null record key and partition path
final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), null);
assertThat(keyGen1.getRecordKey(rowData2), is("__empty__"));

// empty record key and partition path
final RowData rowData3 = insertRow(StringData.fromString(""), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString(""));
assertThat(keyGen1.getRecordKey(rowData3), is("__empty__"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,26 @@ void beforeEach() throws IOException {
}

@Test
void testRequiredOptionsForSource() {
// miss pk and precombine key will throw exception
void testRequiredOptions() {
ResolvedSchema schema1 = SchemaBuilder.instance()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.build();
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext1));

// createDynamicTableSource doesn't call sanity check, will not throw exception
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext1));
// miss pk and precombine key will throw exception when create sink
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1));

// append mode does not throw
this.conf.set(FlinkOptions.OPERATION, "insert");
final MockContext sourceContext11 = MockContext.getInstance(this.conf, schema1, "f2");
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext11));
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext11));
this.conf.set(FlinkOptions.OPERATION, FlinkOptions.OPERATION.defaultValue());

// a non-exists precombine key will throw exception
ResolvedSchema schema2 = SchemaBuilder.instance()
.field("f0", DataTypes.INT().notNull())
Expand All @@ -105,7 +114,8 @@ void testRequiredOptionsForSource() {
.build();
this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, "non_exist_field");
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2");
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext2));
// createDynamicTableSource doesn't call sanity check, will not throw exception
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext2));
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2));
this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.PRECOMBINE_FIELD.defaultValue());

Expand All @@ -120,17 +130,17 @@ void testRequiredOptionsForSource() {
HoodieTableSource tableSource = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext3);
HoodieTableSink tableSink = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext3);
// the precombine field is overwritten
assertThat(tableSource.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE));
assertThat(tableSink.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE));
// precombine field not specified, use the default payload clazz
assertThat(tableSource.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()));
assertThat(tableSink.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()));

// given pk but miss the pre combine key with DefaultHoodieRecordPayload should throw
this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName());
final MockContext sourceContext4 = MockContext.getInstance(this.conf, schema3, "f2");

assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext4));
// createDynamicTableSource doesn't call sanity check, will not throw exception
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext4));
// given pk but miss the pre combine key with DefaultHoodieRecordPayload should throw
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext4));
this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue());

Expand Down Expand Up @@ -167,6 +177,74 @@ void testRequiredOptionsForSource() {
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext6));
}

@Test
void testSupplementTableConfig() throws Exception {
String tablePath = new File(tempFile.getAbsolutePath(), "dummy").getAbsolutePath();
// add pk and pre-combine key to table config
Configuration tableConf = new Configuration();
tableConf.setString(FlinkOptions.PATH, tablePath);
tableConf.setString(FlinkOptions.TABLE_NAME, "t2");
tableConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0,f1");
tableConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f2");

StreamerUtil.initTableIfNotExists(tableConf);

Configuration writeConf = new Configuration();
writeConf.set(FlinkOptions.PATH, tablePath);
writeConf.set(FlinkOptions.TABLE_NAME, "t2");

// fallback to table config
ResolvedSchema schema1 = SchemaBuilder.instance()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.build();
final MockContext sourceContext1 = MockContext.getInstance(writeConf, schema1, "f2");
HoodieTableSource source1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1);
HoodieTableSink sink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext1);
assertThat("pk not provided, fallback to table config",
source1.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat("pk not provided, fallback to table config",
sink1.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat("pre-combine key not provided, fallback to table config",
source1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f2"));
assertThat("pre-combine key not provided, fallback to table config",
sink1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f2"));

// write config always has higher priority
// set up a different primary key and pre_combine key with table config options
writeConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0");
writeConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f1");

final MockContext sourceContext2 = MockContext.getInstance(writeConf, schema1, "f2");
HoodieTableSource source2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2);
HoodieTableSink sink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext2);
assertThat("choose pk from write config",
source2.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
assertThat("choose pk from write config",
sink2.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
assertThat("choose preCombine key from write config",
source2.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f1"));
assertThat("choose preCombine pk from write config",
sink2.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f1"));

writeConf.removeConfig(FlinkOptions.RECORD_KEY_FIELD);
writeConf.removeConfig(FlinkOptions.PRECOMBINE_FIELD);

// pk defined in table config but missing in schema will throw
ResolvedSchema schema2 = SchemaBuilder.instance()
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.build();
final MockContext sourceContext3 = MockContext.getInstance(writeConf, schema2, "f2");
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext3),
"createDynamicTableSource won't call sanity check");
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext3),
"f0 is in table config as record key, but missing in input schema");
}

@Test
void testInferAvroSchemaForSource() {
// infer the schema if not specified
Expand Down