Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2259,6 +2259,10 @@ public WriteConcurrencyMode getWriteConcurrencyMode() {
return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE));
}

public Boolean doAutoGenerateRecordKeys() {
return getBooleanOrDefault(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS);
}

public boolean isEarlyConflictDetectionEnable() {
return getBoolean(EARLY_CONFLICT_DETECTION_ENABLE);
}
Expand Down Expand Up @@ -2805,6 +2809,11 @@ public Builder doSkipDefaultPartitionValidation(boolean skipDefaultPartitionVali
return this;
}

public Builder withAutoGenerateRecordKeys(boolean autoGenerateRecordKeys) {
writeConfig.setValue(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS, String.valueOf(autoGenerateRecordKeys));
return this;
}

public Builder withEarlyConflictDetectionEnable(boolean enable) {
writeConfig.setValue(EARLY_CONFLICT_DETECTION_ENABLE, String.valueOf(enable));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.hudi.keygen;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.apache.avro.generic.GenericRecord;

import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.ValidationUtils.checkArgument;

/**
* Avro complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
*/
Expand All @@ -32,19 +36,26 @@ public class ComplexAvroKeyGenerator extends BaseKeyGenerator {

public ComplexAvroKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
this.recordKeyFields = props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
.collect(Collectors.toList()) : Collections.EMPTY_LIST;
this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
instantiateAutoRecordKeyGenerator();
}

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
if (autoGenerateRecordKeys) {
return autoRecordKeyGenerator.getRecordKey(record);
} else {
checkArgument(getRecordKeyFieldNames().size() > 0, "Record key fields cannot be empty");
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@
*/
public class CustomAvroKeyGenerator extends BaseKeyGenerator {

private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
public static final String SPLIT_REGEX = ":";

/**
* Used as a part of config in CustomKeyGenerator.java.
*/
Expand All @@ -57,6 +54,7 @@ public CustomAvroKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
instantiateAutoRecordKeyGenerator();
}

@Override
Expand Down Expand Up @@ -102,10 +100,14 @@ public String getPartitionPath(GenericRecord record) {

@Override
public String getRecordKey(GenericRecord record) {
validateRecordKeyFields();
return getRecordKeyFieldNames().size() == 1
? new SimpleAvroKeyGenerator(config).getRecordKey(record)
: new ComplexAvroKeyGenerator(config).getRecordKey(record);
if (autoGenerateRecordKeys) {
return autoRecordKeyGenerator.getRecordKey(record);
} else {
validateRecordKeyFields();
return getRecordKeyFieldNames().size() == 1
? new SimpleAvroKeyGenerator(config).getRecordKey(record)
: new ComplexAvroKeyGenerator(config).getRecordKey(record);
}
}

private void validateRecordKeyFields() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,17 @@ public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
public GlobalAvroDeleteKeyGenerator(TypedProperties config) {
super(config);
this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
this.partitionPathFields = new ArrayList<>();
instantiateAutoRecordKeyGenerator();
}

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
if (autoGenerateRecordKeys) {
return autoRecordKeyGenerator.getRecordKey(record);
} else {
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
}
}

@Override
Expand All @@ -50,7 +56,7 @@ public String getPartitionPath(GenericRecord record) {

@Override
public List<String> getPartitionPathFields() {
return new ArrayList<>();
return partitionPathFields;
}

public String getEmptyPartition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ public static String getRecordKey(GenericRecord record, List<String> recordKeyFi
keyIsNullEmpty = false;
}
}
recordKey.deleteCharAt(recordKey.length() - 1);
if (recordKey.length() > 0) {
recordKey.deleteCharAt(recordKey.length() - 1);
}
if (keyIsNullEmpty) {
throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: "
+ recordKeyFields.toString() + " cannot be entirely null or empty.");
+ recordKeyFields + " cannot be entirely null or empty.");
}
return recordKey.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public NonpartitionedAvroKeyGenerator(TypedProperties props) {
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
this.partitionPathFields = EMPTY_PARTITION_FIELD_LIST;
instantiateAutoRecordKeyGenerator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please assign var inside the ctor, make it final

}

@Override
Expand All @@ -53,13 +54,17 @@ public List<String> getPartitionPathFields() {

@Override
public String getRecordKey(GenericRecord record) {
// for backward compatibility, we need to use the right format according to the number of record key fields
// 1. if there is only one record key field, the format of record key is just "<value>"
// 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
if (getRecordKeyFieldNames().size() == 1) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
if (autoGenerateRecordKeys) {
return autoRecordKeyGenerator.getRecordKey(record);
} else {
// for backward compatibility, we need to use the right format according to the number of record key fields
// 1. if there is only one record key field, the format of record key is just "<value>"
// 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
if (getRecordKeyFieldNames().size() == 1) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
}
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
}
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
}

public String getEmptyPartition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.hudi.keygen;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.apache.avro.generic.GenericRecord;

import java.util.Collections;

import static org.apache.hudi.common.util.ValidationUtils.checkArgument;

/**
* Avro simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
*/
Expand All @@ -43,11 +46,17 @@ public SimpleAvroKeyGenerator(TypedProperties props) {
? Collections.emptyList()
: Collections.singletonList(recordKeyField);
this.partitionPathFields = Collections.singletonList(partitionPathField);
instantiateAutoRecordKeyGenerator();
}

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
if (autoGenerateRecordKeys) {
return autoRecordKeyGenerator.getRecordKey(record);
} else {
checkArgument(getRecordKeyFieldNames().size() == 1, "Only 1 record key field allowed for SimpleKeyGenerator");
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,66 +29,76 @@

import java.io.IOException;

public class TestKeylessKeyGenerator {
import static org.junit.jupiter.api.Assertions.assertEquals;

public class TestAutoRecordKeyGenerator {
private static final long TIME = 1672265446090L;
private static final Schema SCHEMA;
private static final String PARTITION_PATH_STR = "partition1";

static {
try {
SCHEMA = new Schema.Parser().parse(TestKeylessKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc"));
SCHEMA = new Schema.Parser().parse(TestAutoRecordKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc"));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}

@Test
public void createKeyWithoutPartitionColumn() {
KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we testing ComplexKeyGenerator here?

We need to test 2 concerns:

  • AutoRecordKeyGenerator itself
  • All existing key-generators in auto-gen mode (just one parameterized test for getRecordKey should be fine)

GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null);
String actualForRecord = keyGenerator.getRecordKey(record);
Assertions.assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord);
assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord);
assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record));
}

@Test
public void createKeyWithPartition() {
KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("integer_field:SIMPLE,partition_field:SIMPLE,nested_struct.doubly_nested:SIMPLE", 3));
GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("integer_field,partition_field,nested_struct.doubly_nested", 3));
GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null);
String actualForRecord = keyGenerator.getRecordKey(record);
Assertions.assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord);
assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord);
assertEquals("123/partition1/__HIVE_DEFAULT_PARTITION__", keyGenerator.getPartitionPath(record));
}

@Test
public void nullFieldsProperlyHandled() {
KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
GenericRecord record = createRecord("partition1", "value1", null, null, null, null);
ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3));
GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", null, null, null, null);
String actualForRecord = keyGenerator.getRecordKey(record);
Assertions.assertEquals("22dee533-e64f-3694-8242-5ec5f25e6d11", actualForRecord);
assertEquals("a107710e-4d3b-33a4-bbbf-d891c7147034", actualForRecord);
assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record));
}

@Test
public void assertOnlySubsetOfFieldsUsed() {
KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
GenericRecord record1 = createRecord("partition1", "value1", 123, 456L, TIME, null);
ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3));
GenericRecord record1 = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null);
String actualForRecord1 = keyGenerator.getRecordKey(record1);
GenericRecord record2 = createRecord("partition2", "value2", 123, 456L, TIME, null);
String actualForRecord2 = keyGenerator.getRecordKey(record2);
Assertions.assertEquals(actualForRecord2, actualForRecord1);
assertEquals(actualForRecord2, actualForRecord1);
assertEquals("partition2", keyGenerator.getPartitionPath(record2));
}

@Test
public void numFieldsImpactsKeyGen() {
KeylessKeyGenerator keyGenerator1 = new KeylessKeyGenerator(getKeyGenProperties("", 3));
KeylessKeyGenerator keyGenerator2 = new KeylessKeyGenerator(getKeyGenProperties("", 10));
GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
ComplexAvroKeyGenerator keyGenerator1 = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3));
ComplexAvroKeyGenerator keyGenerator2 = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 10));
GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null);
Assertions.assertNotEquals(keyGenerator1.getRecordKey(record), keyGenerator2.getRecordKey(record));
assertEquals(PARTITION_PATH_STR, keyGenerator1.getPartitionPath(record));
assertEquals(PARTITION_PATH_STR, keyGenerator2.getPartitionPath(record));
}

@Test
public void nestedColumnsUsed() {
KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 10));
ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 10));
GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, 20.1);
String actualForRecord = keyGenerator.getRecordKey(record);
Assertions.assertEquals("6bbd8811-6ea1-3ef1-840c-f7a51d8f378c", actualForRecord);
assertEquals("569de5d6-55b8-38bf-9256-efc0f6e2ae84", actualForRecord);
assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record));
}

protected GenericRecord createRecord(String partitionField, String stringValue, Integer integerValue, Long longValue, Long timestampValue, Double nestedDouble) {
Expand All @@ -112,8 +122,9 @@ protected GenericRecord createRecord(String partitionField, String stringValue,
protected TypedProperties getKeyGenProperties(String partitionPathField, int numFieldsInKeyGen) {
TypedProperties properties = new TypedProperties();
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionPathField);
properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), numFieldsInKeyGen);
properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.key(), numFieldsInKeyGen);
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "");
properties.put(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(),"true");
return properties;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.spark.unsafe.types.UTF8String;

import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Collectors;

/**
Expand All @@ -41,14 +42,16 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {

public ComplexKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(FIELDS_SEP))
this.recordKeyFields = props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(FIELDS_SEP))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP))
.collect(Collectors.toList()) : Collections.EMPTY_LIST;
this.partitionPathFields = props.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
? Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
.collect(Collectors.toList()) : Collections.EMPTY_LIST;
this.complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props);
}

Expand All @@ -64,14 +67,22 @@ public String getPartitionPath(GenericRecord record) {

@Override
public String getRecordKey(Row row) {
tryInitRowAccessor(row.schema());
return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
if (autoGenerateRecordKeys) {
return super.getRecordKey(row);
} else {
tryInitRowAccessor(row.schema());
return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
}
}

@Override
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
tryInitRowAccessor(schema);
return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
if (autoGenerateRecordKeys) {
return super.getRecordKey(internalRow, schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we redirecting to BuiltinKeyGenerator here?
On top of that i don't see that we've changed it either, so how is this supposed to work?

} else {
tryInitRowAccessor(schema);
return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
}
}

@Override
Expand Down
Loading