diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 8213f020f2a27..d114cc3d77881 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2259,10 +2259,6 @@ public WriteConcurrencyMode getWriteConcurrencyMode() { return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE)); } - public Boolean doAutoGenerateRecordKeys() { - return getBooleanOrDefault(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS); - } - public boolean isEarlyConflictDetectionEnable() { return getBoolean(EARLY_CONFLICT_DETECTION_ENABLE); } @@ -2809,11 +2805,6 @@ public Builder doSkipDefaultPartitionValidation(boolean skipDefaultPartitionVali return this; } - public Builder withAutoGenerateRecordKeys(boolean autoGenerateRecordKeys) { - writeConfig.setValue(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS, String.valueOf(autoGenerateRecordKeys)); - return this; - } - public Builder withEarlyConflictDetectionEnable(boolean enable) { writeConfig.setValue(EARLY_CONFLICT_DETECTION_ENABLE, String.valueOf(enable)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java index 91449a07b70ff..9ff5c522e4527 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java @@ -17,17 +17,13 @@ package org.apache.hudi.keygen; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; -import org.apache.avro.generic.GenericRecord; - import java.util.Arrays; -import java.util.Collections; import java.util.stream.Collectors; -import static org.apache.hudi.common.util.ValidationUtils.checkArgument; - /** * Avro complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs. */ @@ -36,26 +32,19 @@ public class ComplexAvroKeyGenerator extends BaseKeyGenerator { public ComplexAvroKeyGenerator(TypedProperties props) { super(props); - this.recordKeyFields = props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) - ? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")) + this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")) .map(String::trim) .filter(s -> !s.isEmpty()) - .collect(Collectors.toList()) : Collections.EMPTY_LIST; + .collect(Collectors.toList()); this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")) .map(String::trim) .filter(s -> !s.isEmpty()) .collect(Collectors.toList()); - instantiateAutoRecordKeyGenerator(); } @Override public String getRecordKey(GenericRecord record) { - if (autoGenerateRecordKeys) { - return autoRecordKeyGenerator.getRecordKey(record); - } else { - checkArgument(getRecordKeyFieldNames().size() > 0, "Record key fields cannot be empty"); - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); - } + return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java index 2c829f6bb9894..77377de7ab8c7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java @@ -43,6 +43,9 @@ */ public class CustomAvroKeyGenerator extends BaseKeyGenerator { + private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + public static final String SPLIT_REGEX = ":"; + /** * Used as a part of config in CustomKeyGenerator.java. */ @@ -54,7 +57,6 @@ public CustomAvroKeyGenerator(TypedProperties props) { super(props); this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList()); this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList()); - instantiateAutoRecordKeyGenerator(); } @Override @@ -100,14 +102,10 @@ public String getPartitionPath(GenericRecord record) { @Override public String getRecordKey(GenericRecord record) { - if (autoGenerateRecordKeys) { - return autoRecordKeyGenerator.getRecordKey(record); - } else { - validateRecordKeyFields(); - return getRecordKeyFieldNames().size() == 1 - ? new SimpleAvroKeyGenerator(config).getRecordKey(record) - : new ComplexAvroKeyGenerator(config).getRecordKey(record); - } + validateRecordKeyFields(); + return getRecordKeyFieldNames().size() == 1 + ? new SimpleAvroKeyGenerator(config).getRecordKey(record) + : new ComplexAvroKeyGenerator(config).getRecordKey(record); } private void validateRecordKeyFields() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java index 193259ab7e09d..dc0bc3cef2f00 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java @@ -36,17 +36,11 @@ public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator { public GlobalAvroDeleteKeyGenerator(TypedProperties config) { super(config); this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")); - this.partitionPathFields = new ArrayList<>(); - instantiateAutoRecordKeyGenerator(); } @Override public String getRecordKey(GenericRecord record) { - if (autoGenerateRecordKeys) { - return autoRecordKeyGenerator.getRecordKey(record); - } else { - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); - } + return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); } @Override @@ -56,7 +50,7 @@ public String getPartitionPath(GenericRecord record) { @Override public List getPartitionPathFields() { - return partitionPathFields; + return new ArrayList<>(); } public String getEmptyPartition() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index 8c536de718df6..5a8b2b01c5fcd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -108,12 +108,10 @@ public static String getRecordKey(GenericRecord record, List recordKeyFi keyIsNullEmpty = false; } } - if (recordKey.length() > 0) { - recordKey.deleteCharAt(recordKey.length() - 1); - } + recordKey.deleteCharAt(recordKey.length() - 1); if (keyIsNullEmpty) { throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: " - + recordKeyFields + " cannot be entirely null or empty."); + + recordKeyFields.toString() + " cannot be entirely null or empty."); } return recordKey.toString(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java index 41426140ab598..5b5cedcbf8855 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java @@ -39,7 +39,6 @@ public NonpartitionedAvroKeyGenerator(TypedProperties props) { this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); this.partitionPathFields = EMPTY_PARTITION_FIELD_LIST; - instantiateAutoRecordKeyGenerator(); } @Override @@ -54,17 +53,13 @@ public List getPartitionPathFields() { @Override public String getRecordKey(GenericRecord record) { - if (autoGenerateRecordKeys) { - return autoRecordKeyGenerator.getRecordKey(record); - } else { - // for backward compatibility, we need to use the right format according to the number of record key fields - // 1. if there is only one record key field, the format of record key is just "" - // 2. if there are multiple record key fields, the format is ":,:,..." - if (getRecordKeyFieldNames().size() == 1) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled()); - } - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); + // for backward compatibility, we need to use the right format according to the number of record key fields + // 1. if there is only one record key field, the format of record key is just "" + // 2. if there are multiple record key fields, the format is ":,:,..." + if (getRecordKeyFieldNames().size() == 1) { + return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled()); } + return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); } public String getEmptyPartition() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java index 38123e39241ea..c7398e94ecea0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java @@ -17,15 +17,12 @@ package org.apache.hudi.keygen; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; -import org.apache.avro.generic.GenericRecord; - import java.util.Collections; -import static org.apache.hudi.common.util.ValidationUtils.checkArgument; - /** * Avro simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs. */ @@ -46,17 +43,11 @@ public SimpleAvroKeyGenerator(TypedProperties props) { ? Collections.emptyList() : Collections.singletonList(recordKeyField); this.partitionPathFields = Collections.singletonList(partitionPathField); - instantiateAutoRecordKeyGenerator(); } @Override public String getRecordKey(GenericRecord record) { - if (autoGenerateRecordKeys) { - return autoRecordKeyGenerator.getRecordKey(record); - } else { - checkArgument(getRecordKeyFieldNames().size() == 1, "Only 1 record key field allowed for SimpleKeyGenerator"); - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled()); - } + return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled()); } @Override diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java deleted file mode 100644 index 6dc3c8483fb0a..0000000000000 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.keygen; - -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.keygen.constant.KeyGeneratorOptions; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericRecordBuilder; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.io.IOException; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class TestAutoRecordKeyGenerator { - private static final long TIME = 1672265446090L; - private static final Schema SCHEMA; - private static final String PARTITION_PATH_STR = "partition1"; - - static { - try { - SCHEMA = new Schema.Parser().parse(TestAutoRecordKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc")); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - @Test - public void createKeyWithoutPartitionColumn() { - ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3)); - GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null); - String actualForRecord = keyGenerator.getRecordKey(record); - assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord); - assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record)); - } - - @Test - public void createKeyWithPartition() { - ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("integer_field,partition_field,nested_struct.doubly_nested", 3)); - GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null); - String actualForRecord = keyGenerator.getRecordKey(record); - assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord); - assertEquals("123/partition1/__HIVE_DEFAULT_PARTITION__", keyGenerator.getPartitionPath(record)); - } - - @Test - public void nullFieldsProperlyHandled() { - ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3)); - GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", null, null, null, null); - String actualForRecord = keyGenerator.getRecordKey(record); - assertEquals("a107710e-4d3b-33a4-bbbf-d891c7147034", actualForRecord); - assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record)); - } - - @Test - public void assertOnlySubsetOfFieldsUsed() { - ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3)); - GenericRecord record1 = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null); - String actualForRecord1 = keyGenerator.getRecordKey(record1); - GenericRecord record2 = createRecord("partition2", "value2", 123, 456L, TIME, null); - String actualForRecord2 = keyGenerator.getRecordKey(record2); - assertEquals(actualForRecord2, actualForRecord1); - assertEquals("partition2", keyGenerator.getPartitionPath(record2)); - } - - @Test - public void numFieldsImpactsKeyGen() { - ComplexAvroKeyGenerator keyGenerator1 = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3)); - ComplexAvroKeyGenerator keyGenerator2 = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 10)); - GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null); - Assertions.assertNotEquals(keyGenerator1.getRecordKey(record), keyGenerator2.getRecordKey(record)); - assertEquals(PARTITION_PATH_STR, keyGenerator1.getPartitionPath(record)); - assertEquals(PARTITION_PATH_STR, keyGenerator2.getPartitionPath(record)); - } - - @Test - public void nestedColumnsUsed() { - ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 10)); - GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, 20.1); - String actualForRecord = keyGenerator.getRecordKey(record); - assertEquals("569de5d6-55b8-38bf-9256-efc0f6e2ae84", actualForRecord); - assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record)); - } - - protected GenericRecord createRecord(String partitionField, String stringValue, Integer integerValue, Long longValue, Long timestampValue, Double nestedDouble) { - GenericRecord nestedRecord = null; - if (nestedDouble != null) { - nestedRecord = new GenericRecordBuilder(SCHEMA.getField("nested_struct").schema().getTypes().get(1)) - .set("doubly_nested", nestedDouble) - .build(); - } - - return new GenericRecordBuilder(SCHEMA) - .set("partition_field", partitionField) - .set("string_field", stringValue) - .set("integer_field", integerValue) - .set("long_field", longValue) - .set("timestamp_field", timestampValue) - .set("nested_struct", nestedRecord) - .build(); - } - - protected TypedProperties getKeyGenProperties(String partitionPathField, int numFieldsInKeyGen) { - TypedProperties properties = new TypedProperties(); - properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionPathField); - properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.key(), numFieldsInKeyGen); - properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), ""); - properties.put(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(),"true"); - return properties; - } -} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/test/resources/keyless_schema.avsc b/hudi-client/hudi-client-common/src/test/resources/keyless_schema.avsc deleted file mode 100644 index 2966841eef6da..0000000000000 --- a/hudi-client/hudi-client-common/src/test/resources/keyless_schema.avsc +++ /dev/null @@ -1,44 +0,0 @@ -{ - "namespace": "keyless", - "type": "record", - "name": "message", - "fields": [ - { - "name": "partition_field", - "type": "string" - }, - { - "name": "string_field", - "type": "string" - }, - { - "name": "integer_field", - "type": ["null", "int"], - "default": null - }, - { - "name": "long_field", - "type": ["null", "long"], - "default": null - }, - { - "name": "timestamp_field", - "type": ["null", {"type":"long","logicalType":"timestamp-millis"}], - "default": null - }, - { - "name": "nested_struct", - "type": ["null", { - "type": "record", - "name": "nested", - "fields": [ - { - "name": "doubly_nested", - "type": "double" - } - ] - }], - "default": null - } - ] -} \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java index 9e8550643f9b7..c9cff284e80e8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java @@ -26,7 +26,6 @@ import org.apache.spark.unsafe.types.UTF8String; import java.util.Arrays; -import java.util.Collections; import java.util.stream.Collectors; /** @@ -42,16 +41,14 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator { public ComplexKeyGenerator(TypedProperties props) { super(props); - this.recordKeyFields = props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) - ? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(FIELDS_SEP)) + this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(FIELDS_SEP)) .map(String::trim) .filter(s -> !s.isEmpty()) - .collect(Collectors.toList()) : Collections.EMPTY_LIST; - this.partitionPathFields = props.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) - ? Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP)) + .collect(Collectors.toList()); + this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP)) .map(String::trim) .filter(s -> !s.isEmpty()) - .collect(Collectors.toList()) : Collections.EMPTY_LIST; + .collect(Collectors.toList()); this.complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props); } @@ -67,22 +64,14 @@ public String getPartitionPath(GenericRecord record) { @Override public String getRecordKey(Row row) { - if (autoGenerateRecordKeys) { - return super.getRecordKey(row); - } else { - tryInitRowAccessor(row.schema()); - return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row)); - } + tryInitRowAccessor(row.schema()); + return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row)); } @Override public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { - if (autoGenerateRecordKeys) { - return super.getRecordKey(internalRow, schema); - } else { - tryInitRowAccessor(schema); - return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow)); - } + tryInitRowAccessor(schema); + return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow)); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java index d65f8f8b5a512..fcd94bb4f1550 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java @@ -60,15 +60,14 @@ public CustomKeyGenerator(TypedProperties props) { // NOTE: We have to strip partition-path configuration, since it could only be interpreted by // this key-gen super(stripPartitionPathConfig(props)); - String recordKeyField = props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()); - this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : - Arrays.stream(recordKeyField.split(",")) + this.recordKeyFields = + Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")) .map(String::trim) .collect(Collectors.toList()); - String partitionPathField = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()); - this.partitionPathFields = partitionPathField == null + String partitionPathFields = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()); + this.partitionPathFields = partitionPathFields == null ? Collections.emptyList() - : Arrays.stream(partitionPathField.split(",")).map(String::trim).collect(Collectors.toList()); + : Arrays.stream(partitionPathFields.split(",")).map(String::trim).collect(Collectors.toList()); this.customAvroKeyGenerator = new CustomAvroKeyGenerator(props); validateRecordKeyFields(); @@ -86,13 +85,9 @@ public String getPartitionPath(GenericRecord record) { @Override public String getRecordKey(Row row) { - if (autoGenerateRecordKeys) { - return super.getRecordKey(row); - } else { - return getRecordKeyFieldNames().size() == 1 - ? new SimpleKeyGenerator(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), null).getRecordKey(row) - : new ComplexKeyGenerator(config).getRecordKey(row); - } + return getRecordKeyFieldNames().size() == 1 + ? new SimpleKeyGenerator(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), null).getRecordKey(row) + : new ComplexKeyGenerator(config).getRecordKey(row); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java index a301b58c682ef..7fcc16094eadc 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java @@ -29,7 +29,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; /** @@ -41,8 +40,7 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator { private final GlobalAvroDeleteKeyGenerator globalAvroDeleteKeyGenerator; public GlobalDeleteKeyGenerator(TypedProperties config) { super(config); - this.recordKeyFields = config.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) - ? Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")) : Collections.emptyList(); + this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")); this.globalAvroDeleteKeyGenerator = new GlobalAvroDeleteKeyGenerator(config); } @@ -63,22 +61,14 @@ public String getPartitionPath(GenericRecord record) { @Override public String getRecordKey(Row row) { - if (autoGenerateRecordKeys) { - return super.getRecordKey(row); - } else { - tryInitRowAccessor(row.schema()); - return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row)); - } + tryInitRowAccessor(row.schema()); + return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row)); } @Override public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { - if (autoGenerateRecordKeys) { - return super.getRecordKey(internalRow, schema); - } else { - tryInitRowAccessor(schema); - return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow)); - } + tryInitRowAccessor(schema); + return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow)); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java index c179285a2a160..100bcc2cd7f2f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java @@ -40,11 +40,10 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator { public NonpartitionedKeyGenerator(TypedProperties props) { super(props); - this.recordKeyFields = config.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) - ? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) + this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) .split(",")) .map(String::trim) - .collect(Collectors.toList()) : Collections.emptyList(); + .collect(Collectors.toList()); this.partitionPathFields = Collections.emptyList(); this.nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(props); } @@ -61,22 +60,14 @@ public String getRecordKey(GenericRecord record) { @Override public String getRecordKey(Row row) { - if (autoGenerateRecordKeys) { - return super.getRecordKey(row); - } else { - tryInitRowAccessor(row.schema()); - return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row))); - } + tryInitRowAccessor(row.schema()); + return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row))); } @Override public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { - if (autoGenerateRecordKeys) { - return super.getRecordKey(internalRow, schema); - } else { - tryInitRowAccessor(schema); - return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow))); - } + tryInitRowAccessor(schema); + return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow))); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java index ea4e58cf76906..4b1a5e5cb444e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java @@ -18,10 +18,9 @@ package org.apache.hudi.keygen; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; - -import org.apache.avro.generic.GenericRecord; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; @@ -39,7 +38,7 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator { private final SimpleAvroKeyGenerator simpleAvroKeyGenerator; public SimpleKeyGenerator(TypedProperties props) { - this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null), + this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())); } @@ -70,40 +69,31 @@ public String getPartitionPath(GenericRecord record) { @Override public String getRecordKey(Row row) { - if (autoGenerateRecordKeys) { - return super.getRecordKey(row); + tryInitRowAccessor(row.schema()); + + Object[] recordKeys = rowAccessor.getRecordKeyParts(row); + // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite) + // record-key field + if (recordKeys[0] == null) { + return handleNullRecordKey(null); } else { - tryInitRowAccessor(row.schema()); - - Object[] recordKeys = rowAccessor.getRecordKeyParts(row); - // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite) - // record-key field - checkArgument(recordKeys != null && recordKeys.length > 0, "Record keys cannot be null or empty"); - if (recordKeys[0] == null) { - return handleNullRecordKey(null); - } else { - return requireNonNullNonEmptyKey(recordKeys[0].toString()); - } + return requireNonNullNonEmptyKey(recordKeys[0].toString()); } } @Override public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { - if (autoGenerateRecordKeys) { - return super.getRecordKey(internalRow, schema); + tryInitRowAccessor(schema); + + Object[] recordKeyValues = rowAccessor.getRecordKeyParts(internalRow); + // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite) + // record-key field + if (recordKeyValues[0] == null) { + return handleNullRecordKey(null); + } else if (recordKeyValues[0] instanceof UTF8String) { + return requireNonNullNonEmptyKey((UTF8String) recordKeyValues[0]); } else { - tryInitRowAccessor(schema); - - Object[] recordKeyValues = rowAccessor.getRecordKeyParts(internalRow); - // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite) - // record-key field - if (recordKeyValues[0] == null) { - return handleNullRecordKey(null); - } else if (recordKeyValues[0] instanceof UTF8String) { - return requireNonNullNonEmptyKey((UTF8String) recordKeyValues[0]); - } else { - return requireNonNullNonEmptyKey(UTF8String.fromString(recordKeyValues[0].toString())); - } + return requireNonNullNonEmptyKey(UTF8String.fromString(recordKeyValues[0].toString())); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java index 1507cf85e144d..fa36f2152cbbf 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java @@ -41,7 +41,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator; public TimestampBasedKeyGenerator(TypedProperties config) throws IOException { - this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null), + this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())); } @@ -61,22 +61,14 @@ public String getPartitionPath(GenericRecord record) { @Override public String getRecordKey(Row row) { - if (autoGenerateRecordKeys) { - return super.getRecordKey(row); - } else { - tryInitRowAccessor(row.schema()); - return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row))); - } + tryInitRowAccessor(row.schema()); + return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row))); } @Override public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { - if (autoGenerateRecordKeys) { - return super.getRecordKey(internalRow, schema); - } else { - tryInitRowAccessor(schema); - return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow))); - } + tryInitRowAccessor(schema); + return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow))); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala index 4e4c03402d1cf..4cdbbf7577abd 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala @@ -48,7 +48,7 @@ object SparkKeyGenUtils { case c: BaseKeyGenerator if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] => c.getPartitionPathFields.asScala.map(pathField => - pathField.split(BaseKeyGenerator.SPLIT_REGEX) + pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX) .headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}")) .mkString(",") diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java deleted file mode 100644 index 787dedd18096c..0000000000000 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.keygen; - -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.keygen.constant.KeyGeneratorOptions; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; - -import java.io.Serializable; -import java.nio.charset.StandardCharsets; -import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; - -/** - * Class to assist in generation auto record keys for hudi records. - */ -public class AutoRecordKeyGenerator implements Serializable { - - private static final String HOODIE_PREFIX = "_hoodie"; - private static final String DOT = "."; - private final int maxFieldsToConsider; - private final int numFieldsForKey; - private final Set partitionFieldNames; - private int[][] fieldOrdering; - - public AutoRecordKeyGenerator(TypedProperties config, List partitionPathFields) { - this.numFieldsForKey = config.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.key(), - KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.defaultValue()); - // cap the number of fields to order in case of large schemas - this.maxFieldsToConsider = numFieldsForKey * 3; - this.partitionFieldNames = partitionPathFields.stream().map(field -> { - if (field.contains(BaseKeyGenerator.SPLIT_REGEX)) { - return field.split(BaseKeyGenerator.SPLIT_REGEX)[0]; - } else { - return field; - } - }).collect(Collectors.toSet()); - } - - public String getRecordKey(GenericRecord record) { - return buildKey(getFieldOrdering(record), record); - } - - int[][] getFieldOrdering(GenericRecord genericRecord) { - if (fieldOrdering == null) { - fieldOrdering = buildFieldOrdering(genericRecord.getSchema().getFields()); - } - return fieldOrdering; - } - - /** - * Deterministically builds a key for the input value based on the provided fieldOrdering. The first {@link #numFieldsForKey} non-null values will be used to generate a string that is passed to - * {@link UUID#nameUUIDFromBytes(byte[])}. - * @param fieldOrdering an array of integer arrays. The integer arrays represent paths to a single field within the input object. - * @param input the input object that needs a key - * @return a deterministically generated {@link UUID} - * @param the input object type - */ - private String buildKey(int[][] fieldOrdering, GenericRecord input) { - StringBuilder key = new StringBuilder(); - int nonNullFields = 0; - for (int[] index : fieldOrdering) { - Object value = getFieldForRecord(input, index); - if (value == null) { - continue; - } - nonNullFields++; - key.append(value.hashCode()); - if (nonNullFields >= numFieldsForKey) { - break; - } - } - return UUID.nameUUIDFromBytes(key.toString().getBytes(StandardCharsets.UTF_8)).toString(); - } - - /** - * Gets the value of the field at the specified path within the record. - * @param record the input record - * @param fieldPath the path to the field as an array of integers representing the field position within the object - * @return value at the path - */ - private static Object getFieldForRecord(GenericRecord record, int[] fieldPath) { - Object value = record; - for (Integer index : fieldPath) { - if (value == null) { - return null; - } - value = ((GenericRecord) value).get(index); - } - return value; - } - - private int[][] buildFieldOrdering(List initialFields) { - PriorityQueue> queue = new PriorityQueue<>(maxFieldsToConsider + 1, RankingComparator.getInstance()); - Queue fieldsToProcess = new ArrayDeque<>(); - for (int j = 0; j < initialFields.size(); j++) { - fieldsToProcess.offer(new FieldToProcess(new int[]{j}, initialFields.get(j), initialFields.get(j).name())); - } - while (!fieldsToProcess.isEmpty()) { - FieldToProcess fieldToProcess = fieldsToProcess.poll(); - int[] existingPath = fieldToProcess.getIndexPath(); - Schema fieldSchema = fieldToProcess.getField().schema(); - if (fieldSchema.getType() == Schema.Type.UNION) { - fieldSchema = fieldSchema.getTypes().get(1); - } - if (fieldSchema.getType() == Schema.Type.RECORD) { - List nestedFields = fieldSchema.getFields(); - for (int i = 0; i < nestedFields.size(); i++) { - int[] path = Arrays.copyOf(existingPath, existingPath.length + 1); - path[existingPath.length] = i; - Schema.Field nestedField = nestedFields.get(i); - fieldsToProcess.add(new FieldToProcess(path, nestedField, fieldToProcess.getNamePath() + DOT + nestedField.name())); - } - } else { - // check that field is not used in partitioning - if (!partitionFieldNames.contains(fieldToProcess.getNamePath())) { - queue.offer(Pair.of(existingPath, getSchemaRanking(fieldToProcess.getField()))); - if (queue.size() > maxFieldsToConsider) { - queue.poll(); - } - } - } - } - Pair[] sortedPairs = queue.toArray(new Pair[queue.size()]); - Arrays.sort(sortedPairs, RankingComparator.getInstance().reversed()); - int[][] output = new int[sortedPairs.length][]; - for (int k = 0; k < sortedPairs.length; k++) { - output[k] = sortedPairs[k].getLeft(); - } - return output; - } - - private static class FieldToProcess { - final int[] indexPath; - final Schema.Field field; - final String namePath; - - public FieldToProcess(int[] indexPath, Schema.Field field, String namePath) { - this.indexPath = indexPath; - this.field = field; - this.namePath = namePath; - } - - public int[] getIndexPath() { - return indexPath; - } - - public Schema.Field getField() { - return field; - } - - public String getNamePath() { - return namePath; - } - } - - /** - * Ranks the fields by their type. - * @param field input field - * @return a score of 0 to 4 - */ - private int getSchemaRanking(Schema.Field field) { - if (field.name().startsWith(HOODIE_PREFIX)) { - return 0; - } - Schema schema = field.schema(); - if (schema.getType() == Schema.Type.UNION) { - schema = schema.getTypes().get(0).getType() == Schema.Type.NULL ? schema.getTypes().get(1) : schema.getTypes().get(0); - } - Schema.Type type = schema.getType(); - switch (type) { - case LONG: - // assumes long with logical type will be a timestamp - return schema.getLogicalType() != null ? 4 : 3; - case INT: - // assumes long with logical type will be a date which will have low variance in a batch - return schema.getLogicalType() != null ? 1 : 3; - case DOUBLE: - case FLOAT: - return 3; - case BOOLEAN: - case MAP: - case ARRAY: - return 1; - default: - return 2; - } - } - - private static class RankingComparator implements Comparator> { - private static final RankingComparator INSTANCE = new RankingComparator(); - - static RankingComparator getInstance() { - return INSTANCE; - } - - @Override - public int compare(Pair o1, Pair o2) { - int initialResult = o1.getRight().compareTo(o2.getRight()); - if (initialResult == 0) { - // favor the smaller list (less nested value) on ties - int sizeResult = Integer.compare(o2.getLeft().length, o1.getLeft().length); - if (sizeResult == 0) { - return Integer.compare(o2.getLeft()[0], o1.getLeft()[0]); - } - return sizeResult; - } - return initialResult; - } - } -} diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java index 4a19dee9989db..d0baa90391960 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java @@ -32,16 +32,11 @@ */ public abstract class BaseKeyGenerator extends KeyGenerator { - protected static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; - public static final String SPLIT_REGEX = ":"; - protected List recordKeyFields; protected List partitionPathFields; protected final boolean encodePartitionPath; protected final boolean hiveStylePartitioning; protected final boolean consistentLogicalTimestampEnabled; - protected final boolean autoGenerateRecordKeys; - protected AutoRecordKeyGenerator autoRecordKeyGenerator; protected BaseKeyGenerator(TypedProperties config) { super(config); @@ -51,8 +46,6 @@ protected BaseKeyGenerator(TypedProperties config) { Boolean.parseBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())); this.consistentLogicalTimestampEnabled = config.getBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); - this.autoGenerateRecordKeys = config.getBoolean(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(), - Boolean.parseBoolean(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue())); } /** @@ -88,10 +81,4 @@ public List getPartitionPathFields() { public boolean isConsistentLogicalTimestampEnabled() { return consistentLogicalTimestampEnabled; } - - protected void instantiateAutoRecordKeyGenerator() { - if (autoGenerateRecordKeys) { - autoRecordKeyGenerator = new AutoRecordKeyGenerator(config, getPartitionPathFields()); - } - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java index 911fd66db46d5..99d40439b7c61 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java @@ -69,22 +69,6 @@ public class KeyGeneratorOptions extends HoodieConfig { + "`2016-12-29 09:54:00.0` in row-writer path, while it will be written as long value `1483023240000000` in non row-writer path. " + "If enabled, then the timestamp value will be written in both the cases."); - public static final ConfigProperty AUTO_GENERATE_RECORD_KEYS = ConfigProperty - .key("hoodie.auto.generate.record.keys") - .defaultValue("false") - .sinceVersion("0.13.0") - .withDocumentation("When enabled, hudi will auto generate a deterministic key for a record based on the contents of the field. " - + "The keys are guaranteed to be deterministic but not unique, so they can only be used for insert workflows with deduplication disabled." - + "The class attempts to get sufficient uniqueness for keys to prevent frequent collisions by choosing the fields it uses in order of decreasing " - + "likelihood for uniqueness."); - - public static final ConfigProperty NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION = ConfigProperty - .key("hoodie.datasource.write.auto.recordkey.num.fields") - .defaultValue(5) - .withDocumentation("When enabling auto generation of record keys(hoodie.auto.generate.record.keys) , this sets the number of fields " - + "to use when computing the UUID for the record. Increasing the value will increase the randomness of the generated key but can " - + "impact performance."); - /** * @deprecated Use {@link #URL_ENCODE_PARTITIONING} and its methods. */ diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java index 4cfa619d62746..caed61249a1d3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java @@ -77,8 +77,7 @@ public void testNullPartitionPathFields() { @Test public void testNullRecordKeyFields() { - ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()); - Assertions.assertThrows(IllegalArgumentException.class, () -> keyGenerator.getRecordKey(getRecord())); + Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp())); } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java index 5e7a9f0b8ea80..77d1b34f1368e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java @@ -96,8 +96,7 @@ public void testNullPartitionPathFields() { @Test public void testNullRecordKeyFields() { - SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp()); - assertThrows(IllegalArgumentException.class, () -> keyGenerator.getRecordKey(getRecord())); + assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp())); } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala deleted file mode 100644 index 543b4f31c197d..0000000000000 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi - - -import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.testutils.HoodieTestDataGenerator -import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings -import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.keygen.TimestampBasedKeyGenerator -import org.apache.hudi.keygen.constant.KeyGeneratorOptions -import org.apache.hudi.testutils.SparkClientFunctionalTestHarness -import org.apache.spark.sql._ -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.CsvSource - -import scala.collection.JavaConversions._ - -/** - * Tests auto generation of record keys. - */ -class TestAutoRecordKeyGeneration extends SparkClientFunctionalTestHarness { - - var commonOpts: Map[String, String] = Map( - "hoodie.insert.shuffle.parallelism" -> "4", - "hoodie.upsert.shuffle.parallelism" -> "4", - "hoodie.bulkinsert.shuffle.parallelism" -> "4", - "hoodie.delete.shuffle.parallelism" -> "2", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", - HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" - ) - - @ParameterizedTest - @CsvSource(value = Array( - "COPY_ON_WRITE|org.apache.hudi.keygen.SimpleKeyGenerator", - "COPY_ON_WRITE|org.apache.hudi.keygen.ComplexKeyGenerator", - "COPY_ON_WRITE|org.apache.hudi.keygen.TimestampBasedKeyGenerator", - "MERGE_ON_READ|org.apache.hudi.keygen.SimpleKeyGenerator", - "MERGE_ON_READ|org.apache.hudi.keygen.ComplexKeyGenerator", - "MERGE_ON_READ|org.apache.hudi.keygen.TimestampBasedKeyGenerator" - ), delimiter = '|') - def testRecordKeyGeneration(tableType: String, keyGenClass: String): Unit = { - var options: Map[String, String] = commonOpts + - (DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) + - (DataSourceWriteOptions.TABLE_TYPE.key() -> tableType) + - (KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key() -> "true") + - (HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "true") - - if (keyGenClass == classOf[TimestampBasedKeyGenerator].getName) { - options ++= Map(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING", - KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy-MM-dd", - KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd") - } - - val dataGen = new HoodieTestDataGenerator(0xDEED) - val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) - // Bulk Insert Operation - val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList - val inputDf0 = spark.read.json(spark.sparkContext.parallelize(records0, 2)) - inputDf0.write.format("org.apache.hudi") - .options(options) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Overwrite) - .save(basePath) - assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) - - // Snapshot query - val snapshotDf0 = spark.read.format("org.apache.hudi") - .load(basePath) - assertEquals(100, snapshotDf0.count()) - - // Insert - val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList - val inputDf1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) - inputDf1.write.format("org.apache.hudi") - .options(options) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Append) - .save(basePath) - assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "001")) - - // Snapshot query - val snapshotDf1 = spark.read.format("org.apache.hudi") - .load(basePath) - assertEquals(200, snapshotDf1.count()) - - // even though we generate updates, since auto generation of record keys are enabled, it should result in new records - val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 50)).toList - val updateDf = spark.read.json(spark.sparkContext.parallelize(records2, 2)) - updateDf.write.format("org.apache.hudi") - .options(options) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Append) - .save(basePath) - - val snapshotDf2 = spark.read.format("hudi") - .load(basePath) - assertEquals(250, snapshotDf2.count()) - } -} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala index 12c1179fde28e..1bb81f7f92eea 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala @@ -95,7 +95,7 @@ class TestDataSourceDefaults extends ScalaAssertionSupport { props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "partitionField") assertThrows(classOf[IllegalArgumentException]) { - new SimpleKeyGenerator(props).getRecordKey(baseRow) + new SimpleKeyGenerator(props) } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index e95d8bab34442..c2073c703d959 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -62,7 +62,6 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncClient; import org.apache.hudi.keygen.SimpleKeyGenerator; -import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.metrics.Metrics; import org.apache.hudi.utilities.DummySchemaProvider; import org.apache.hudi.utilities.HoodieClusteringJob; @@ -2422,32 +2421,6 @@ public void testForceEmptyMetaSync() throws Exception { assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist"); } - @Test - public void testAutoGenerateRecordKeys() throws Exception { - boolean useSchemaProvider = false; - List transformerClassNames = null; - PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum; - int parquetRecordsCount = 100; - boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty(); - prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null); - prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET, - PARQUET_SOURCE_ROOT, false, "partition_path", ""); - - String tableBasePath = basePath + "/test_parquet_table" + testNum; - HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), - transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false, - useSchemaProvider, 100000, false, null, null, "timestamp", null); - config.configs.add(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key() + "=true"); - HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc); - deltaStreamer.sync(); - TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); - - prepareParquetDFSFiles(200, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null); - deltaStreamer.sync(); - TestHelpers.assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext); - testNum++; - } - class TestDeltaSync extends DeltaSync { public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,