From 707c9771901e407d7498c0b50b7384b70cb6d0b3 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Sat, 21 Jan 2023 14:53:22 -0800 Subject: [PATCH 1/3] Adding auto generation of record keys w/ hudi. Added support to all key gen classes --- .../apache/hudi/config/HoodieWriteConfig.java | 9 ++ .../hudi/keygen/ComplexAvroKeyGenerator.java | 13 +- .../hudi/keygen/CustomAvroKeyGenerator.java | 16 +-- .../keygen/GlobalAvroDeleteKeyGenerator.java | 10 +- .../NonpartitionedAvroKeyGenerator.java | 17 ++- .../hudi/keygen/SimpleAvroKeyGenerator.java | 7 +- .../TimestampBasedAvroKeyGenerator.java | 1 + ...r.java => TestAutoRecordKeyGenerator.java} | 23 ++-- .../hudi/keygen/ComplexKeyGenerator.java | 22 +++- .../hudi/keygen/CustomKeyGenerator.java | 10 +- .../hudi/keygen/GlobalDeleteKeyGenerator.java | 16 ++- .../keygen/NonpartitionedKeyGenerator.java | 16 ++- .../hudi/keygen/SimpleKeyGenerator.java | 46 ++++--- .../keygen/TimestampBasedKeyGenerator.java | 18 ++- .../apache/hudi/util/SparkKeyGenUtils.scala | 2 +- .../hudi/keygen/AutoRecordKeyGenerator.java | 43 ++++--- .../apache/hudi/keygen/BaseKeyGenerator.java | 13 ++ .../keygen/constant/KeyGeneratorOptions.java | 18 ++- .../hudi/TestAutoRecordKeyGeneration.scala | 119 ++++++++++++++++++ .../TestHoodieDeltaStreamer.java | 27 ++++ 20 files changed, 348 insertions(+), 98 deletions(-) rename hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/{TestKeylessKeyGenerator.java => TestAutoRecordKeyGenerator.java} (79%) rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java => hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java (81%) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d114cc3d77881..8213f020f2a27 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2259,6 +2259,10 @@ public WriteConcurrencyMode getWriteConcurrencyMode() { return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE)); } + public Boolean doAutoGenerateRecordKeys() { + return getBooleanOrDefault(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS); + } + public boolean isEarlyConflictDetectionEnable() { return getBoolean(EARLY_CONFLICT_DETECTION_ENABLE); } @@ -2805,6 +2809,11 @@ public Builder doSkipDefaultPartitionValidation(boolean skipDefaultPartitionVali return this; } + public Builder withAutoGenerateRecordKeys(boolean autoGenerateRecordKeys) { + writeConfig.setValue(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS, String.valueOf(autoGenerateRecordKeys)); + return this; + } + public Builder withEarlyConflictDetectionEnable(boolean enable) { writeConfig.setValue(EARLY_CONFLICT_DETECTION_ENABLE, String.valueOf(enable)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java index 9ff5c522e4527..4c83c9f6df4b5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java @@ -22,6 +22,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import java.util.Arrays; +import java.util.Collections; import java.util.stream.Collectors; /** @@ -32,19 +33,25 @@ public class ComplexAvroKeyGenerator extends BaseKeyGenerator { public ComplexAvroKeyGenerator(TypedProperties props) { super(props); - this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")) + this.recordKeyFields = props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) + ? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")) .map(String::trim) .filter(s -> !s.isEmpty()) - .collect(Collectors.toList()); + .collect(Collectors.toList()) : Collections.EMPTY_LIST; this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")) .map(String::trim) .filter(s -> !s.isEmpty()) .collect(Collectors.toList()); + instantiateAutoRecordKeyGenerator(); } @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); + if (autoGenerateRecordKeys) { + return autoRecordKeyGenerator.getRecordKey(record); + } else { + return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); + } } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java index 77377de7ab8c7..2c829f6bb9894 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java @@ -43,9 +43,6 @@ */ public class CustomAvroKeyGenerator extends BaseKeyGenerator { - private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; - public static final String SPLIT_REGEX = ":"; - /** * Used as a part of config in CustomKeyGenerator.java. */ @@ -57,6 +54,7 @@ public CustomAvroKeyGenerator(TypedProperties props) { super(props); this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList()); this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList()); + instantiateAutoRecordKeyGenerator(); } @Override @@ -102,10 +100,14 @@ public String getPartitionPath(GenericRecord record) { @Override public String getRecordKey(GenericRecord record) { - validateRecordKeyFields(); - return getRecordKeyFieldNames().size() == 1 - ? new SimpleAvroKeyGenerator(config).getRecordKey(record) - : new ComplexAvroKeyGenerator(config).getRecordKey(record); + if (autoGenerateRecordKeys) { + return autoRecordKeyGenerator.getRecordKey(record); + } else { + validateRecordKeyFields(); + return getRecordKeyFieldNames().size() == 1 + ? new SimpleAvroKeyGenerator(config).getRecordKey(record) + : new ComplexAvroKeyGenerator(config).getRecordKey(record); + } } private void validateRecordKeyFields() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java index dc0bc3cef2f00..193259ab7e09d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java @@ -36,11 +36,17 @@ public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator { public GlobalAvroDeleteKeyGenerator(TypedProperties config) { super(config); this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")); + this.partitionPathFields = new ArrayList<>(); + instantiateAutoRecordKeyGenerator(); } @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); + if (autoGenerateRecordKeys) { + return autoRecordKeyGenerator.getRecordKey(record); + } else { + return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); + } } @Override @@ -50,7 +56,7 @@ public String getPartitionPath(GenericRecord record) { @Override public List getPartitionPathFields() { - return new ArrayList<>(); + return partitionPathFields; } public String getEmptyPartition() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java index 5b5cedcbf8855..41426140ab598 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java @@ -39,6 +39,7 @@ public NonpartitionedAvroKeyGenerator(TypedProperties props) { this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); this.partitionPathFields = EMPTY_PARTITION_FIELD_LIST; + instantiateAutoRecordKeyGenerator(); } @Override @@ -53,13 +54,17 @@ public List getPartitionPathFields() { @Override public String getRecordKey(GenericRecord record) { - // for backward compatibility, we need to use the right format according to the number of record key fields - // 1. if there is only one record key field, the format of record key is just "" - // 2. if there are multiple record key fields, the format is ":,:,..." - if (getRecordKeyFieldNames().size() == 1) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled()); + if (autoGenerateRecordKeys) { + return autoRecordKeyGenerator.getRecordKey(record); + } else { + // for backward compatibility, we need to use the right format according to the number of record key fields + // 1. if there is only one record key field, the format of record key is just "" + // 2. if there are multiple record key fields, the format is ":,:,..." + if (getRecordKeyFieldNames().size() == 1) { + return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled()); + } + return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); } - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); } public String getEmptyPartition() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java index c7398e94ecea0..cbfcf2b6e435e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java @@ -43,11 +43,16 @@ public SimpleAvroKeyGenerator(TypedProperties props) { ? Collections.emptyList() : Collections.singletonList(recordKeyField); this.partitionPathFields = Collections.singletonList(partitionPathField); + instantiateAutoRecordKeyGenerator(); } @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled()); + if (autoGenerateRecordKeys) { + return autoRecordKeyGenerator.getRecordKey(record); + } else { + return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled()); + } } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java index 60ccc694f947d..fa9ecf154b2a2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java @@ -65,6 +65,7 @@ public enum TimestampType implements Serializable { private final DateTimeZone outputDateTimeZone; protected final boolean encodePartitionPath; + private AutoRecordKeyGenerator autoRecordKeyGenerator; public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException { this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java similarity index 79% rename from hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java rename to hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java index af6b30e3f0929..351e779f0dafb 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java @@ -29,13 +29,13 @@ import java.io.IOException; -public class TestKeylessKeyGenerator { +public class TestAutoRecordKeyGenerator { private static final long TIME = 1672265446090L; private static final Schema SCHEMA; static { try { - SCHEMA = new Schema.Parser().parse(TestKeylessKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc")); + SCHEMA = new Schema.Parser().parse(TestAutoRecordKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc")); } catch (IOException ex) { throw new RuntimeException(ex); } @@ -43,7 +43,7 @@ public class TestKeylessKeyGenerator { @Test public void createKeyWithoutPartitionColumn() { - KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3)); + ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("", 3)); GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null); String actualForRecord = keyGenerator.getRecordKey(record); Assertions.assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord); @@ -51,7 +51,7 @@ public void createKeyWithoutPartitionColumn() { @Test public void createKeyWithPartition() { - KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("integer_field:SIMPLE,partition_field:SIMPLE,nested_struct.doubly_nested:SIMPLE", 3)); + ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("integer_field:SIMPLE,partition_field:SIMPLE,nested_struct.doubly_nested:SIMPLE", 3)); GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null); String actualForRecord = keyGenerator.getRecordKey(record); Assertions.assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord); @@ -59,7 +59,7 @@ public void createKeyWithPartition() { @Test public void nullFieldsProperlyHandled() { - KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3)); + ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("", 3)); GenericRecord record = createRecord("partition1", "value1", null, null, null, null); String actualForRecord = keyGenerator.getRecordKey(record); Assertions.assertEquals("22dee533-e64f-3694-8242-5ec5f25e6d11", actualForRecord); @@ -67,7 +67,7 @@ public void nullFieldsProperlyHandled() { @Test public void assertOnlySubsetOfFieldsUsed() { - KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3)); + ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("", 3)); GenericRecord record1 = createRecord("partition1", "value1", 123, 456L, TIME, null); String actualForRecord1 = keyGenerator.getRecordKey(record1); GenericRecord record2 = createRecord("partition2", "value2", 123, 456L, TIME, null); @@ -77,15 +77,15 @@ public void assertOnlySubsetOfFieldsUsed() { @Test public void numFieldsImpactsKeyGen() { - KeylessKeyGenerator keyGenerator1 = new KeylessKeyGenerator(getKeyGenProperties("", 3)); - KeylessKeyGenerator keyGenerator2 = new KeylessKeyGenerator(getKeyGenProperties("", 10)); + ComplexAvroKeyGenerator keyGenerator1 = new ComplexAvroKeyGenerator(getKeyGenProperties("", 3)); + ComplexAvroKeyGenerator keyGenerator2 = new ComplexAvroKeyGenerator(getKeyGenProperties("", 10)); GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null); Assertions.assertNotEquals(keyGenerator1.getRecordKey(record), keyGenerator2.getRecordKey(record)); } @Test public void nestedColumnsUsed() { - KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 10)); + ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("", 10)); GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, 20.1); String actualForRecord = keyGenerator.getRecordKey(record); Assertions.assertEquals("6bbd8811-6ea1-3ef1-840c-f7a51d8f378c", actualForRecord); @@ -112,8 +112,9 @@ protected GenericRecord createRecord(String partitionField, String stringValue, protected TypedProperties getKeyGenProperties(String partitionPathField, int numFieldsInKeyGen) { TypedProperties properties = new TypedProperties(); properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionPathField); - properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), numFieldsInKeyGen); + properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.key(), numFieldsInKeyGen); properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), ""); + properties.put(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(),"true"); return properties; } -} +} \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java index c9cff284e80e8..d826061fa1143 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java @@ -26,6 +26,7 @@ import org.apache.spark.unsafe.types.UTF8String; import java.util.Arrays; +import java.util.Collections; import java.util.stream.Collectors; /** @@ -41,10 +42,11 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator { public ComplexKeyGenerator(TypedProperties props) { super(props); - this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(FIELDS_SEP)) + this.recordKeyFields = props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) + ? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(FIELDS_SEP)) .map(String::trim) .filter(s -> !s.isEmpty()) - .collect(Collectors.toList()); + .collect(Collectors.toList()) : Collections.EMPTY_LIST; this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP)) .map(String::trim) .filter(s -> !s.isEmpty()) @@ -64,14 +66,22 @@ public String getPartitionPath(GenericRecord record) { @Override public String getRecordKey(Row row) { - tryInitRowAccessor(row.schema()); - return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row)); + if (autoGenerateRecordKeys) { + return super.getRecordKey(row); + } else { + tryInitRowAccessor(row.schema()); + return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row)); + } } @Override public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { - tryInitRowAccessor(schema); - return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow)); + if (autoGenerateRecordKeys) { + return super.getRecordKey(internalRow, schema); + } else { + tryInitRowAccessor(schema); + return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow)); + } } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java index fcd94bb4f1550..b60ee90d878d9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java @@ -85,9 +85,13 @@ public String getPartitionPath(GenericRecord record) { @Override public String getRecordKey(Row row) { - return getRecordKeyFieldNames().size() == 1 - ? new SimpleKeyGenerator(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), null).getRecordKey(row) - : new ComplexKeyGenerator(config).getRecordKey(row); + if (autoGenerateRecordKeys) { + return super.getRecordKey(row); + } else { + return getRecordKeyFieldNames().size() == 1 + ? new SimpleKeyGenerator(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), null).getRecordKey(row) + : new ComplexKeyGenerator(config).getRecordKey(row); + } } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java index 7fcc16094eadc..d5b3cbf39cc50 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java @@ -61,14 +61,22 @@ public String getPartitionPath(GenericRecord record) { @Override public String getRecordKey(Row row) { - tryInitRowAccessor(row.schema()); - return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row)); + if (autoGenerateRecordKeys) { + return super.getRecordKey(row); + } else { + tryInitRowAccessor(row.schema()); + return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row)); + } } @Override public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { - tryInitRowAccessor(schema); - return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow)); + if (autoGenerateRecordKeys) { + return super.getRecordKey(internalRow, schema); + } else { + tryInitRowAccessor(schema); + return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow)); + } } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java index 100bcc2cd7f2f..96cda37d378a7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java @@ -60,14 +60,22 @@ public String getRecordKey(GenericRecord record) { @Override public String getRecordKey(Row row) { - tryInitRowAccessor(row.schema()); - return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row))); + if (autoGenerateRecordKeys) { + return super.getRecordKey(row); + } else { + tryInitRowAccessor(row.schema()); + return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row))); + } } @Override public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { - tryInitRowAccessor(schema); - return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow))); + if (autoGenerateRecordKeys) { + return super.getRecordKey(internalRow, schema); + } else { + tryInitRowAccessor(schema); + return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow))); + } } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java index 4b1a5e5cb444e..b588739100c30 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java @@ -38,7 +38,7 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator { private final SimpleAvroKeyGenerator simpleAvroKeyGenerator; public SimpleKeyGenerator(TypedProperties props) { - this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), + this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null), props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())); } @@ -69,31 +69,39 @@ public String getPartitionPath(GenericRecord record) { @Override public String getRecordKey(Row row) { - tryInitRowAccessor(row.schema()); - - Object[] recordKeys = rowAccessor.getRecordKeyParts(row); - // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite) - // record-key field - if (recordKeys[0] == null) { - return handleNullRecordKey(null); + if (autoGenerateRecordKeys) { + return super.getRecordKey(row); } else { - return requireNonNullNonEmptyKey(recordKeys[0].toString()); + tryInitRowAccessor(row.schema()); + + Object[] recordKeys = rowAccessor.getRecordKeyParts(row); + // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite) + // record-key field + if (recordKeys[0] == null) { + return handleNullRecordKey(null); + } else { + return requireNonNullNonEmptyKey(recordKeys[0].toString()); + } } } @Override public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { - tryInitRowAccessor(schema); - - Object[] recordKeyValues = rowAccessor.getRecordKeyParts(internalRow); - // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite) - // record-key field - if (recordKeyValues[0] == null) { - return handleNullRecordKey(null); - } else if (recordKeyValues[0] instanceof UTF8String) { - return requireNonNullNonEmptyKey((UTF8String) recordKeyValues[0]); + if (autoGenerateRecordKeys) { + return super.getRecordKey(internalRow, schema); } else { - return requireNonNullNonEmptyKey(UTF8String.fromString(recordKeyValues[0].toString())); + tryInitRowAccessor(schema); + + Object[] recordKeyValues = rowAccessor.getRecordKeyParts(internalRow); + // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite) + // record-key field + if (recordKeyValues[0] == null) { + return handleNullRecordKey(null); + } else if (recordKeyValues[0] instanceof UTF8String) { + return requireNonNullNonEmptyKey((UTF8String) recordKeyValues[0]); + } else { + return requireNonNullNonEmptyKey(UTF8String.fromString(recordKeyValues[0].toString())); + } } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java index fa36f2152cbbf..1507cf85e144d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java @@ -41,7 +41,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator; public TimestampBasedKeyGenerator(TypedProperties config) throws IOException { - this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), + this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null), config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())); } @@ -61,14 +61,22 @@ public String getPartitionPath(GenericRecord record) { @Override public String getRecordKey(Row row) { - tryInitRowAccessor(row.schema()); - return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row))); + if (autoGenerateRecordKeys) { + return super.getRecordKey(row); + } else { + tryInitRowAccessor(row.schema()); + return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row))); + } } @Override public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { - tryInitRowAccessor(schema); - return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow))); + if (autoGenerateRecordKeys) { + return super.getRecordKey(internalRow, schema); + } else { + tryInitRowAccessor(schema); + return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow))); + } } @Override diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala index 4cdbbf7577abd..4e4c03402d1cf 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala @@ -48,7 +48,7 @@ object SparkKeyGenUtils { case c: BaseKeyGenerator if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] => c.getPartitionPathFields.asScala.map(pathField => - pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX) + pathField.split(BaseKeyGenerator.SPLIT_REGEX) .headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}")) .mkString(",") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java similarity index 81% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java rename to hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java index d487e7e1ff91f..787dedd18096c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java @@ -7,13 +7,14 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.hudi.keygen; @@ -25,6 +26,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; import java.util.Arrays; @@ -37,18 +39,10 @@ import java.util.stream.Collectors; /** - * This class is used to compute a deterministic key for a record based on the contents of the field. Unlike the other KeyGenerators in Hudi, this class does not take in any field names as args to - * create a "keyless" experience for insert only workloads. The keys are guaranteed to be deterministic but not unique, so they can only be used for insert workflows with deduplication disabled. - * The class attempts to get sufficient uniqueness for keys to prevent frequent collisions by choosing the fields it uses in order of decreasing likelihood for uniqueness. The ordering is: - *
    - *
  • timestamp
  • - *
  • numeric values
  • - *
  • string, byte arrays, other types not mentioned
  • - *
  • date, lists, maps, booleans
  • - *
- * The number of fields is capped to created predictable performance and the generator only uses non-null values to help increase uniqueness for sparse datasets. + * Class to assist in generation auto record keys for hudi records. */ -public class KeylessKeyGenerator extends CustomAvroKeyGenerator { +public class AutoRecordKeyGenerator implements Serializable { + private static final String HOODIE_PREFIX = "_hoodie"; private static final String DOT = "."; private final int maxFieldsToConsider; @@ -56,15 +50,20 @@ public class KeylessKeyGenerator extends CustomAvroKeyGenerator { private final Set partitionFieldNames; private int[][] fieldOrdering; - public KeylessKeyGenerator(TypedProperties props) { - super(props); - this.numFieldsForKey = props.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.defaultValue()); + public AutoRecordKeyGenerator(TypedProperties config, List partitionPathFields) { + this.numFieldsForKey = config.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.key(), + KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.defaultValue()); // cap the number of fields to order in case of large schemas this.maxFieldsToConsider = numFieldsForKey * 3; - this.partitionFieldNames = this.getPartitionPathFields().stream().map(field -> field.split(SPLIT_REGEX)[0]).collect(Collectors.toSet()); + this.partitionFieldNames = partitionPathFields.stream().map(field -> { + if (field.contains(BaseKeyGenerator.SPLIT_REGEX)) { + return field.split(BaseKeyGenerator.SPLIT_REGEX)[0]; + } else { + return field; + } + }).collect(Collectors.toSet()); } - @Override public String getRecordKey(GenericRecord record) { return buildKey(getFieldOrdering(record), record); } diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java index d0baa90391960..4a19dee9989db 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java @@ -32,11 +32,16 @@ */ public abstract class BaseKeyGenerator extends KeyGenerator { + protected static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + public static final String SPLIT_REGEX = ":"; + protected List recordKeyFields; protected List partitionPathFields; protected final boolean encodePartitionPath; protected final boolean hiveStylePartitioning; protected final boolean consistentLogicalTimestampEnabled; + protected final boolean autoGenerateRecordKeys; + protected AutoRecordKeyGenerator autoRecordKeyGenerator; protected BaseKeyGenerator(TypedProperties config) { super(config); @@ -46,6 +51,8 @@ protected BaseKeyGenerator(TypedProperties config) { Boolean.parseBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())); this.consistentLogicalTimestampEnabled = config.getBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); + this.autoGenerateRecordKeys = config.getBoolean(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(), + Boolean.parseBoolean(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue())); } /** @@ -81,4 +88,10 @@ public List getPartitionPathFields() { public boolean isConsistentLogicalTimestampEnabled() { return consistentLogicalTimestampEnabled; } + + protected void instantiateAutoRecordKeyGenerator() { + if (autoGenerateRecordKeys) { + autoRecordKeyGenerator = new AutoRecordKeyGenerator(config, getPartitionPathFields()); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java index b0a46ac067660..911fd66db46d5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java @@ -69,11 +69,21 @@ public class KeyGeneratorOptions extends HoodieConfig { + "`2016-12-29 09:54:00.0` in row-writer path, while it will be written as long value `1483023240000000` in non row-writer path. " + "If enabled, then the timestamp value will be written in both the cases."); - public static final ConfigProperty NUM_FIELDS_IN_KEYLESS_GENERATOR = ConfigProperty - .key("hoodie.datasource.write.recordkey.keyless.field.count") + public static final ConfigProperty AUTO_GENERATE_RECORD_KEYS = ConfigProperty + .key("hoodie.auto.generate.record.keys") + .defaultValue("false") + .sinceVersion("0.13.0") + .withDocumentation("When enabled, hudi will auto generate a deterministic key for a record based on the contents of the field. " + + "The keys are guaranteed to be deterministic but not unique, so they can only be used for insert workflows with deduplication disabled." + + "The class attempts to get sufficient uniqueness for keys to prevent frequent collisions by choosing the fields it uses in order of decreasing " + + "likelihood for uniqueness."); + + public static final ConfigProperty NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION = ConfigProperty + .key("hoodie.datasource.write.auto.recordkey.num.fields") .defaultValue(5) - .withDocumentation("When using the KeylessKeyGenerator, this sets the number of fields to use when computing the UUID for the record. " - + "Increasing the value will increase the randomness of the generated key but can impact performance."); + .withDocumentation("When enabling auto generation of record keys(hoodie.auto.generate.record.keys) , this sets the number of fields " + + "to use when computing the UUID for the record. Increasing the value will increase the randomness of the generated key but can " + + "impact performance."); /** * @deprecated Use {@link #URL_ENCODE_PARTITIONING} and its methods. diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala new file mode 100644 index 0000000000000..543b4f31c197d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + + +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.keygen.TimestampBasedKeyGenerator +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.spark.sql._ +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource + +import scala.collection.JavaConversions._ + +/** + * Tests auto generation of record keys. + */ +class TestAutoRecordKeyGeneration extends SparkClientFunctionalTestHarness { + + var commonOpts: Map[String, String] = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "4", + "hoodie.delete.shuffle.parallelism" -> "2", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) + + @ParameterizedTest + @CsvSource(value = Array( + "COPY_ON_WRITE|org.apache.hudi.keygen.SimpleKeyGenerator", + "COPY_ON_WRITE|org.apache.hudi.keygen.ComplexKeyGenerator", + "COPY_ON_WRITE|org.apache.hudi.keygen.TimestampBasedKeyGenerator", + "MERGE_ON_READ|org.apache.hudi.keygen.SimpleKeyGenerator", + "MERGE_ON_READ|org.apache.hudi.keygen.ComplexKeyGenerator", + "MERGE_ON_READ|org.apache.hudi.keygen.TimestampBasedKeyGenerator" + ), delimiter = '|') + def testRecordKeyGeneration(tableType: String, keyGenClass: String): Unit = { + var options: Map[String, String] = commonOpts + + (DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) + + (DataSourceWriteOptions.TABLE_TYPE.key() -> tableType) + + (KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key() -> "true") + + (HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "true") + + if (keyGenClass == classOf[TimestampBasedKeyGenerator].getName) { + options ++= Map(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING", + KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy-MM-dd", + KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd") + } + + val dataGen = new HoodieTestDataGenerator(0xDEED) + val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) + // Bulk Insert Operation + val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDf0 = spark.read.json(spark.sparkContext.parallelize(records0, 2)) + inputDf0.write.format("org.apache.hudi") + .options(options) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + + // Snapshot query + val snapshotDf0 = spark.read.format("org.apache.hudi") + .load(basePath) + assertEquals(100, snapshotDf0.count()) + + // Insert + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDf1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDf1.write.format("org.apache.hudi") + .options(options) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "001")) + + // Snapshot query + val snapshotDf1 = spark.read.format("org.apache.hudi") + .load(basePath) + assertEquals(200, snapshotDf1.count()) + + // even though we generate updates, since auto generation of record keys are enabled, it should result in new records + val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 50)).toList + val updateDf = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + updateDf.write.format("org.apache.hudi") + .options(options) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + + val snapshotDf2 = spark.read.format("hudi") + .load(basePath) + assertEquals(250, snapshotDf2.count()) + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index c2073c703d959..e95d8bab34442 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -62,6 +62,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncClient; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.metrics.Metrics; import org.apache.hudi.utilities.DummySchemaProvider; import org.apache.hudi.utilities.HoodieClusteringJob; @@ -2421,6 +2422,32 @@ public void testForceEmptyMetaSync() throws Exception { assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist"); } + @Test + public void testAutoGenerateRecordKeys() throws Exception { + boolean useSchemaProvider = false; + List transformerClassNames = null; + PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum; + int parquetRecordsCount = 100; + boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty(); + prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null); + prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET, + PARQUET_SOURCE_ROOT, false, "partition_path", ""); + + String tableBasePath = basePath + "/test_parquet_table" + testNum; + HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), + transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false, + useSchemaProvider, 100000, false, null, null, "timestamp", null); + config.configs.add(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key() + "=true"); + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); + + prepareParquetDFSFiles(200, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext); + testNum++; + } + class TestDeltaSync extends DeltaSync { public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, From 3f3e87b55fc3593e30020e4797695a86d190b34b Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 23 Jan 2023 14:29:02 -0800 Subject: [PATCH 2/3] addressing feedback --- .../TimestampBasedAvroKeyGenerator.java | 1 - .../keygen/TestAutoRecordKeyGenerator.java | 44 ++++++++++++------- .../hudi/keygen/ComplexKeyGenerator.java | 5 ++- .../hudi/keygen/CustomKeyGenerator.java | 11 ++--- .../hudi/keygen/GlobalDeleteKeyGenerator.java | 4 +- .../keygen/NonpartitionedKeyGenerator.java | 5 ++- 6 files changed, 42 insertions(+), 28 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java index fa9ecf154b2a2..60ccc694f947d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java @@ -65,7 +65,6 @@ public enum TimestampType implements Serializable { private final DateTimeZone outputDateTimeZone; protected final boolean encodePartitionPath; - private AutoRecordKeyGenerator autoRecordKeyGenerator; public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException { this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java index 351e779f0dafb..6dc3c8483fb0a 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java @@ -29,9 +29,12 @@ import java.io.IOException; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class TestAutoRecordKeyGenerator { private static final long TIME = 1672265446090L; private static final Schema SCHEMA; + private static final String PARTITION_PATH_STR = "partition1"; static { try { @@ -43,52 +46,59 @@ public class TestAutoRecordKeyGenerator { @Test public void createKeyWithoutPartitionColumn() { - ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("", 3)); - GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null); + ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3)); + GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null); String actualForRecord = keyGenerator.getRecordKey(record); - Assertions.assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord); + assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord); + assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record)); } @Test public void createKeyWithPartition() { - ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("integer_field:SIMPLE,partition_field:SIMPLE,nested_struct.doubly_nested:SIMPLE", 3)); - GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null); + ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("integer_field,partition_field,nested_struct.doubly_nested", 3)); + GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null); String actualForRecord = keyGenerator.getRecordKey(record); - Assertions.assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord); + assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord); + assertEquals("123/partition1/__HIVE_DEFAULT_PARTITION__", keyGenerator.getPartitionPath(record)); } @Test public void nullFieldsProperlyHandled() { - ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("", 3)); - GenericRecord record = createRecord("partition1", "value1", null, null, null, null); + ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3)); + GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", null, null, null, null); String actualForRecord = keyGenerator.getRecordKey(record); - Assertions.assertEquals("22dee533-e64f-3694-8242-5ec5f25e6d11", actualForRecord); + assertEquals("a107710e-4d3b-33a4-bbbf-d891c7147034", actualForRecord); + assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record)); } @Test public void assertOnlySubsetOfFieldsUsed() { - ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("", 3)); - GenericRecord record1 = createRecord("partition1", "value1", 123, 456L, TIME, null); + ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3)); + GenericRecord record1 = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null); String actualForRecord1 = keyGenerator.getRecordKey(record1); GenericRecord record2 = createRecord("partition2", "value2", 123, 456L, TIME, null); String actualForRecord2 = keyGenerator.getRecordKey(record2); - Assertions.assertEquals(actualForRecord2, actualForRecord1); + assertEquals(actualForRecord2, actualForRecord1); + assertEquals("partition2", keyGenerator.getPartitionPath(record2)); } @Test public void numFieldsImpactsKeyGen() { - ComplexAvroKeyGenerator keyGenerator1 = new ComplexAvroKeyGenerator(getKeyGenProperties("", 3)); - ComplexAvroKeyGenerator keyGenerator2 = new ComplexAvroKeyGenerator(getKeyGenProperties("", 10)); - GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null); + ComplexAvroKeyGenerator keyGenerator1 = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3)); + ComplexAvroKeyGenerator keyGenerator2 = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 10)); + GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null); Assertions.assertNotEquals(keyGenerator1.getRecordKey(record), keyGenerator2.getRecordKey(record)); + assertEquals(PARTITION_PATH_STR, keyGenerator1.getPartitionPath(record)); + assertEquals(PARTITION_PATH_STR, keyGenerator2.getPartitionPath(record)); } @Test public void nestedColumnsUsed() { - ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("", 10)); + ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 10)); GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, 20.1); String actualForRecord = keyGenerator.getRecordKey(record); - Assertions.assertEquals("6bbd8811-6ea1-3ef1-840c-f7a51d8f378c", actualForRecord); + assertEquals("569de5d6-55b8-38bf-9256-efc0f6e2ae84", actualForRecord); + assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record)); } protected GenericRecord createRecord(String partitionField, String stringValue, Integer integerValue, Long longValue, Long timestampValue, Double nestedDouble) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java index d826061fa1143..d7ba41b00f235 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java @@ -47,10 +47,11 @@ public ComplexKeyGenerator(TypedProperties props) { .map(String::trim) .filter(s -> !s.isEmpty()) .collect(Collectors.toList()) : Collections.EMPTY_LIST; - this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP)) + this.partitionPathFields = props.containsKey(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) + ? Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP)) .map(String::trim) .filter(s -> !s.isEmpty()) - .collect(Collectors.toList()); + .collect(Collectors.toList()) : Collections.EMPTY_LIST; this.complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java index b60ee90d878d9..d65f8f8b5a512 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java @@ -60,14 +60,15 @@ public CustomKeyGenerator(TypedProperties props) { // NOTE: We have to strip partition-path configuration, since it could only be interpreted by // this key-gen super(stripPartitionPathConfig(props)); - this.recordKeyFields = - Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")) + String recordKeyField = props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()); + this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : + Arrays.stream(recordKeyField.split(",")) .map(String::trim) .collect(Collectors.toList()); - String partitionPathFields = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()); - this.partitionPathFields = partitionPathFields == null + String partitionPathField = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()); + this.partitionPathFields = partitionPathField == null ? Collections.emptyList() - : Arrays.stream(partitionPathFields.split(",")).map(String::trim).collect(Collectors.toList()); + : Arrays.stream(partitionPathField.split(",")).map(String::trim).collect(Collectors.toList()); this.customAvroKeyGenerator = new CustomAvroKeyGenerator(props); validateRecordKeyFields(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java index d5b3cbf39cc50..a301b58c682ef 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; /** @@ -40,7 +41,8 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator { private final GlobalAvroDeleteKeyGenerator globalAvroDeleteKeyGenerator; public GlobalDeleteKeyGenerator(TypedProperties config) { super(config); - this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")); + this.recordKeyFields = config.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) + ? Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")) : Collections.emptyList(); this.globalAvroDeleteKeyGenerator = new GlobalAvroDeleteKeyGenerator(config); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java index 96cda37d378a7..c179285a2a160 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java @@ -40,10 +40,11 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator { public NonpartitionedKeyGenerator(TypedProperties props) { super(props); - this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) + this.recordKeyFields = config.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) + ? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) .split(",")) .map(String::trim) - .collect(Collectors.toList()); + .collect(Collectors.toList()) : Collections.emptyList(); this.partitionPathFields = Collections.emptyList(); this.nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(props); } From 7de21a1d14ad52d84841a31bc850341fd66033e5 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Tue, 24 Jan 2023 16:34:01 +0530 Subject: [PATCH 3/3] Fix tests --- .../org/apache/hudi/keygen/ComplexAvroKeyGenerator.java | 6 +++++- .../src/main/java/org/apache/hudi/keygen/KeyGenUtils.java | 6 ++++-- .../java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java | 6 +++++- .../java/org/apache/hudi/keygen/ComplexKeyGenerator.java | 2 +- .../java/org/apache/hudi/keygen/SimpleKeyGenerator.java | 4 +++- .../org/apache/hudi/keygen/TestComplexKeyGenerator.java | 3 ++- .../java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java | 3 ++- .../test/scala/org/apache/hudi/TestDataSourceDefaults.scala | 2 +- 8 files changed, 23 insertions(+), 9 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java index 4c83c9f6df4b5..91449a07b70ff 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java @@ -17,14 +17,17 @@ package org.apache.hudi.keygen; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.avro.generic.GenericRecord; + import java.util.Arrays; import java.util.Collections; import java.util.stream.Collectors; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; + /** * Avro complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs. */ @@ -50,6 +53,7 @@ public String getRecordKey(GenericRecord record) { if (autoGenerateRecordKeys) { return autoRecordKeyGenerator.getRecordKey(record); } else { + checkArgument(getRecordKeyFieldNames().size() > 0, "Record key fields cannot be empty"); return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index 5a8b2b01c5fcd..8c536de718df6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -108,10 +108,12 @@ public static String getRecordKey(GenericRecord record, List recordKeyFi keyIsNullEmpty = false; } } - recordKey.deleteCharAt(recordKey.length() - 1); + if (recordKey.length() > 0) { + recordKey.deleteCharAt(recordKey.length() - 1); + } if (keyIsNullEmpty) { throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: " - + recordKeyFields.toString() + " cannot be entirely null or empty."); + + recordKeyFields + " cannot be entirely null or empty."); } return recordKey.toString(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java index cbfcf2b6e435e..38123e39241ea 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java @@ -17,12 +17,15 @@ package org.apache.hudi.keygen; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.avro.generic.GenericRecord; + import java.util.Collections; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; + /** * Avro simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs. */ @@ -51,6 +54,7 @@ public String getRecordKey(GenericRecord record) { if (autoGenerateRecordKeys) { return autoRecordKeyGenerator.getRecordKey(record); } else { + checkArgument(getRecordKeyFieldNames().size() == 1, "Only 1 record key field allowed for SimpleKeyGenerator"); return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled()); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java index d7ba41b00f235..9e8550643f9b7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java @@ -47,7 +47,7 @@ public ComplexKeyGenerator(TypedProperties props) { .map(String::trim) .filter(s -> !s.isEmpty()) .collect(Collectors.toList()) : Collections.EMPTY_LIST; - this.partitionPathFields = props.containsKey(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) + this.partitionPathFields = props.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) ? Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP)) .map(String::trim) .filter(s -> !s.isEmpty()) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java index b588739100c30..ea4e58cf76906 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java @@ -18,9 +18,10 @@ package org.apache.hudi.keygen; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import org.apache.avro.generic.GenericRecord; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; @@ -77,6 +78,7 @@ public String getRecordKey(Row row) { Object[] recordKeys = rowAccessor.getRecordKeyParts(row); // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite) // record-key field + checkArgument(recordKeys != null && recordKeys.length > 0, "Record keys cannot be null or empty"); if (recordKeys[0] == null) { return handleNullRecordKey(null); } else { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java index caed61249a1d3..4cfa619d62746 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java @@ -77,7 +77,8 @@ public void testNullPartitionPathFields() { @Test public void testNullRecordKeyFields() { - Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp())); + ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()); + Assertions.assertThrows(IllegalArgumentException.class, () -> keyGenerator.getRecordKey(getRecord())); } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java index 77d1b34f1368e..5e7a9f0b8ea80 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java @@ -96,7 +96,8 @@ public void testNullPartitionPathFields() { @Test public void testNullRecordKeyFields() { - assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp())); + SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp()); + assertThrows(IllegalArgumentException.class, () -> keyGenerator.getRecordKey(getRecord())); } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala index 1bb81f7f92eea..12c1179fde28e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala @@ -95,7 +95,7 @@ class TestDataSourceDefaults extends ScalaAssertionSupport { props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "partitionField") assertThrows(classOf[IllegalArgumentException]) { - new SimpleKeyGenerator(props) + new SimpleKeyGenerator(props).getRecordKey(baseRow) } }