diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 969d82f28cd8e..b04e3801e1533 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -49,6 +49,7 @@ import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; @@ -889,6 +890,10 @@ public String getKeyGeneratorClass() { return getString(KEYGENERATOR_CLASS_NAME); } + public boolean isConsistentLogicalTimestampEnabled() { + return getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED); + } + public Boolean shouldAutoCommit() { return getBoolean(AUTO_COMMIT_ENABLE); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java index 375225f48ed69..fc87a83e36a81 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java @@ -40,11 +40,11 @@ public ComplexAvroKeyGenerator(TypedProperties props) { @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFields()); + return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled()); } @Override public String getPartitionPath(GenericRecord record) { - return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath); + return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath, isConsistentLogicalTimestampEnabled()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java index 674e28647b945..10a57602386fc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java @@ -40,7 +40,7 @@ public GlobalAvroDeleteKeyGenerator(TypedProperties config) { @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFields()); + return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled()); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index 8038afe4e5687..d9de544d29b29 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -65,7 +65,7 @@ public static String getPartitionPathFromGenericRecord(GenericRecord genericReco /** * Extracts the record key fields in strings out of the given record key, - * this is the reverse operation of {@link #getRecordKey(GenericRecord, String)}. + * this is the reverse operation of {@link #getRecordKey(GenericRecord, String, boolean)}. * * @see SimpleAvroKeyGenerator * @see org.apache.hudi.keygen.ComplexAvroKeyGenerator @@ -89,11 +89,11 @@ public static String[] extractRecordKeys(String recordKey) { } } - public static String getRecordKey(GenericRecord record, List recordKeyFields) { + public static String getRecordKey(GenericRecord record, List recordKeyFields, boolean consistentLogicalTimestampEnabled) { boolean keyIsNullEmpty = true; StringBuilder recordKey = new StringBuilder(); for (String recordKeyField : recordKeyFields) { - String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true); + String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, consistentLogicalTimestampEnabled); if (recordKeyValue == null) { recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + ","); } else if (recordKeyValue.isEmpty()) { @@ -112,14 +112,14 @@ public static String getRecordKey(GenericRecord record, List recordKeyFi } public static String getRecordPartitionPath(GenericRecord record, List partitionPathFields, - boolean hiveStylePartitioning, boolean encodePartitionPath) { + boolean hiveStylePartitioning, boolean encodePartitionPath, boolean consistentLogicalTimestampEnabled) { if (partitionPathFields.isEmpty()) { return ""; } StringBuilder partitionPath = new StringBuilder(); for (String partitionPathField : partitionPathFields) { - String fieldVal = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true); + String fieldVal = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true, consistentLogicalTimestampEnabled); if (fieldVal == null || fieldVal.isEmpty()) { partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + HUDI_DEFAULT_PARTITION_PATH : HUDI_DEFAULT_PARTITION_PATH); @@ -135,8 +135,8 @@ public static String getRecordPartitionPath(GenericRecord record, List p return partitionPath.toString(); } - public static String getRecordKey(GenericRecord record, String recordKeyField) { - String recordKey = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true); + public static String getRecordKey(GenericRecord record, String recordKeyField, boolean consistentLogicalTimestampEnabled) { + String recordKey = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, consistentLogicalTimestampEnabled); if (recordKey == null || recordKey.isEmpty()) { throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); } @@ -144,8 +144,8 @@ public static String getRecordKey(GenericRecord record, String recordKeyField) { } public static String getPartitionPath(GenericRecord record, String partitionPathField, - boolean hiveStylePartitioning, boolean encodePartitionPath) { - String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true); + boolean hiveStylePartitioning, boolean encodePartitionPath, boolean consistentLogicalTimestampEnabled) { + String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true, consistentLogicalTimestampEnabled); if (partitionPath == null || partitionPath.isEmpty()) { partitionPath = HUDI_DEFAULT_PARTITION_PATH; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java index 4384f171dc96c..3f6aaadf6e803 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java @@ -57,9 +57,9 @@ public String getRecordKey(GenericRecord record) { // 1. if there is only one record key field, the format of record key is just "" // 2. if there are multiple record key fields, the format is ":,:,..." if (getRecordKeyFieldNames().size() == 1) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0)); + return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled()); } - return KeyGenUtils.getRecordKey(record, getRecordKeyFields()); + return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled()); } public String getEmptyPartition() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java index dae55c5f8afce..943091225a3e1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java @@ -47,11 +47,11 @@ public SimpleAvroKeyGenerator(TypedProperties props) { @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0)); + return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled()); } @Override public String getPartitionPath(GenericRecord record) { - return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath); + return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath, isConsistentLogicalTimestampEnabled()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java index 430cde81d2e32..bc84ece503487 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.io.Serializable; import java.math.BigDecimal; +import java.sql.Timestamp; import java.util.TimeZone; import java.util.concurrent.TimeUnit; @@ -125,7 +126,7 @@ public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException @Override public String getPartitionPath(GenericRecord record) { - Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, getPartitionPathFields().get(0), true); + Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, getPartitionPathFields().get(0), true, isConsistentLogicalTimestampEnabled()); if (partitionVal == null) { partitionVal = getDefaultPartitionVal(); } @@ -191,6 +192,8 @@ public String getPartitionPath(Object partitionVal) { timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue()); } else if (partitionVal instanceof Long) { timeMs = convertLongTimeToMillis((Long) partitionVal); + } else if (partitionVal instanceof Timestamp && isConsistentLogicalTimestampEnabled()) { + timeMs = ((Timestamp) partitionVal).getTime(); } else if (partitionVal instanceof Integer) { timeMs = convertLongTimeToMillis(((Integer) partitionVal).longValue()); } else if (partitionVal instanceof BigDecimal) { @@ -225,5 +228,4 @@ private long convertLongTimeToMillis(Long partitionVal) { } return MILLISECONDS.convert(partitionVal, timeUnit); } - } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataKeyGenerator.java index e9d7aec8c8e2a..4ec143bf06789 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataKeyGenerator.java @@ -42,7 +42,7 @@ public HoodieTableMetadataKeyGenerator(TypedProperties config) { @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY); + return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY, isConsistentLogicalTimestampEnabled()); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index c830925419b57..a7dc4a3c0fa73 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -124,7 +124,8 @@ protected Option> getPartitioner(Map st if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { return Option.of(new JavaCustomColumnsSortPartitioner( strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), - HoodieAvroUtils.addMetadataFields(schema))); + HoodieAvroUtils.addMetadataFields(schema), + getWriteConfig().isConsistentLogicalTimestampEnabled())); } else { return Option.empty(); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java index bb7cd5e23a5b9..eb3d4ef312e99 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java @@ -39,18 +39,20 @@ public class JavaCustomColumnsSortPartitioner private final String[] sortColumnNames; private final Schema schema; + private final boolean consistentLogicalTimestampEnabled; - public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema) { + public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, boolean consistentLogicalTimestampEnabled) { this.sortColumnNames = columnNames; this.schema = schema; + this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled; } @Override public List> repartitionRecords( List> records, int outputSparkPartitions) { return records.stream().sorted((o1, o2) -> { - Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema); - Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema); + Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema, consistentLogicalTimestampEnabled); + Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema, consistentLogicalTimestampEnabled); return values1.toString().compareTo(values2.toString()); }).collect(Collectors.toList()); } diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java index 5d6f21164589d..ee507b6045f6c 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java @@ -64,13 +64,13 @@ public void testCustomColumnSortPartitioner(String sortColumnString) throws Exce List records = generateTestRecordsForBulkInsert(1000); testBulkInsertInternalPartitioner( - new JavaCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA), + new JavaCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false), records, true, generatePartitionNumRecords(records), Option.of(columnComparator)); } private Comparator getCustomColumnComparator(Schema schema, String[] sortColumns) { return Comparator.comparing( - record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema).toString()); + record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema, false).toString()); } private void verifyRecordAscendingOrder(List records, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 4824c757cd9df..c88b848ddf8cf 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -108,7 +108,6 @@ public HoodieWriteMetadata> performClustering(final HoodieC return writeMetadata; } - /** * Execute clustering to write inputRecords into new files as defined by rules in strategy parameters. * The number of new file groups created is bounded by numOutputGroups. @@ -141,7 +140,7 @@ protected Option> getPartitioner(Map st getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema))); } else if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), - HoodieAvroUtils.addMetadataFields(schema))); + HoodieAvroUtils.addMetadataFields(schema), getWriteConfig().isConsistentLogicalTimestampEnabled())); } else { return Option.empty(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java index fb243f5696ef7..2fe6fe969c482 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java @@ -39,15 +39,18 @@ public class RDDCustomColumnsSortPartitioner private final String[] sortColumnNames; private final SerializableSchema serializableSchema; + private final boolean consistentLogicalTimestampEnabled; public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) { this.serializableSchema = new SerializableSchema(new Schema.Parser().parse(config.getSchema())); this.sortColumnNames = getSortColumnName(config); + this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled(); } - public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema) { + public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, boolean consistentLogicalTimestampEnabled) { this.sortColumnNames = columnNames; this.serializableSchema = new SerializableSchema(schema); + this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled; } @Override @@ -55,9 +58,10 @@ public JavaRDD> repartitionRecords(JavaRDD> reco int outputSparkPartitions) { final String[] sortColumns = this.sortColumnNames; final SerializableSchema schema = this.serializableSchema; + final boolean consistentLogicalTimestampEnabled = this.consistentLogicalTimestampEnabled; return records.sortBy( record -> { - Object recordValue = HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema); + Object recordValue = HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema, consistentLogicalTimestampEnabled); // null values are replaced with empty string for null_first order if (recordValue == null) { return StringUtils.EMPTY_STRING; @@ -66,7 +70,6 @@ record -> { } }, true, outputSparkPartitions); - } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index f4c5622d64b6f..712f40568aae2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -147,9 +147,9 @@ public void testCustomColumnSortPartitioner() throws Exception { JavaRDD records1 = generateTestRecordsForBulkInsert(jsc); JavaRDD records2 = generateTripleTestRecordsForBulkInsert(jsc); - testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA), + testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false), records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); - testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA), + testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false), records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); HoodieWriteConfig config = HoodieWriteConfig diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java index 6cf29df5140a4..879d9933978a0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java @@ -45,7 +45,7 @@ public void testBucketIdWithSimpleRecordKey() { String indexKeyField = "_row_key"; GenericRecord record = KeyGeneratorTestUtilities.getRecord(); HoodieRecord hoodieRecord = new HoodieRecord( - new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField), ""), null); + new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null); int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8); assert bucketId == BucketIdentifier.getBucketId( Arrays.asList(record.get(indexKeyField).toString()), 8); @@ -57,7 +57,7 @@ public void testBucketIdWithComplexRecordKey() { String indexKeyField = "_row_key"; GenericRecord record = KeyGeneratorTestUtilities.getRecord(); HoodieRecord hoodieRecord = new HoodieRecord( - new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField), ""), null); + new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null); int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8); assert bucketId == BucketIdentifier.getBucketId( Arrays.asList(record.get(indexKeyField).toString()), 8); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java index 88a54c8cadc5c..6f3c1a39f81ff 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java @@ -238,6 +238,40 @@ public void testScalar() throws IOException { assertEquals("2021-04-19", keyGen.getPartitionPath(baseRow)); } + @Test + public void testScalarWithLogicalType() throws IOException { + schema = SchemaTestUtil.getTimestampWithLogicalTypeSchema(); + structType = AvroConversionUtils.convertAvroSchemaToStructType(schema); + baseRecord = SchemaTestUtil.generateAvroRecordFromJson(schema, 1, "001", "f1"); + baseRecord.put("createTime", 1638513806000000L); + + properties = getBaseKeyConfig("SCALAR", "yyyy/MM/dd", "GMT", "MICROSECONDS"); + properties.setProperty(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), "true"); + TimestampBasedKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); + HoodieKey hk1 = keyGen.getKey(baseRecord); + assertEquals("2021/12/03", hk1.getPartitionPath()); + + // test w/ Row + baseRow = genericRecordToRow(baseRecord); + assertEquals("2021/12/03", keyGen.getPartitionPath(baseRow)); + internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow); + assertEquals("2021/12/03", keyGen.getPartitionPath(internalRow, baseRow.schema())); + + // timezone is GMT, createTime is null + baseRecord.put("createTime", null); + properties = getBaseKeyConfig("SCALAR", "yyyy/MM/dd", "GMT", "MICROSECONDS"); + properties.setProperty(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), "true"); + keyGen = new TimestampBasedKeyGenerator(properties); + HoodieKey hk2 = keyGen.getKey(baseRecord); + assertEquals("1970/01/01", hk2.getPartitionPath()); + + // test w/ Row + baseRow = genericRecordToRow(baseRecord); + assertEquals("1970/01/01", keyGen.getPartitionPath(baseRow)); + internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow); + assertEquals("1970/01/01", keyGen.getPartitionPath(internalRow, baseRow.schema())); + } + @Test public void test_ExpectsMatch_SingleInputFormat_ISO8601WithMsZ_OutputTimezoneAsUTC() throws IOException { baseRecord.put("createTime", "2020-04-01T13:01:33.428Z"); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index fdaa466c3cc91..7dfeb582ec40c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -59,6 +59,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; @@ -444,15 +445,15 @@ public static Schema generateProjectionSchema(Schema originalSchema, List getNullableValAsString(GenericRecord rec, String fi * @param fieldValue avro field value * @return field value either converted (for certain data types) or as it is. */ - public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue) { + public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) { if (fieldSchema == null) { return fieldValue; } @@ -518,11 +519,11 @@ public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object if (fieldSchema.getType() == Schema.Type.UNION) { for (Schema schema : fieldSchema.getTypes()) { if (schema.getType() != Schema.Type.NULL) { - return convertValueForAvroLogicalTypes(schema, fieldValue); + return convertValueForAvroLogicalTypes(schema, fieldValue, consistentLogicalTimestampEnabled); } } } - return convertValueForAvroLogicalTypes(fieldSchema, fieldValue); + return convertValueForAvroLogicalTypes(fieldSchema, fieldValue, consistentLogicalTimestampEnabled); } /** @@ -538,9 +539,13 @@ public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object * @param fieldValue avro field value * @return field value either converted (for certain data types) or as it is. */ - private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue) { + private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) { if (fieldSchema.getLogicalType() == LogicalTypes.date()) { return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString())); + } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMillis() && consistentLogicalTimestampEnabled) { + return new Timestamp(Long.parseLong(fieldValue.toString())); + } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMicros() && consistentLogicalTimestampEnabled) { + return new Timestamp(Long.parseLong(fieldValue.toString()) / 1000); } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) { Decimal dc = (Decimal) fieldSchema.getLogicalType(); DecimalConversion decimalConversion = new DecimalConversion(); @@ -585,15 +590,15 @@ public static String sanitizeName(String name) { */ public static Object getRecordColumnValues(HoodieRecord record, String[] columns, - Schema schema) { + Schema schema, boolean consistentLogicalTimestampEnabled) { try { GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); if (columns.length == 1) { - return HoodieAvroUtils.getNestedFieldVal(genericRecord, columns[0], true); + return HoodieAvroUtils.getNestedFieldVal(genericRecord, columns[0], true, consistentLogicalTimestampEnabled); } else { StringBuilder sb = new StringBuilder(); for (String col : columns) { - sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true)); + sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true, consistentLogicalTimestampEnabled)); } return sb.toString(); @@ -613,7 +618,7 @@ public static Object getRecordColumnValues(HoodieRecord record, String[] columns, - SerializableSchema schema) { - return getRecordColumnValues(record, columns, schema.get()); + SerializableSchema schema, boolean consistentLogicalTimestampEnabled) { + return getRecordColumnValues(record, columns, schema.get(), consistentLogicalTimestampEnabled); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index 07ab5df9c7dbd..3e7971b1b26f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.model; import org.apache.hudi.common.util.Option; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -87,7 +88,10 @@ public Option getInsertValue(Schema schema, Properties properties } private static Option updateEventTime(GenericRecord record, Properties properties) { - return Option.ofNullable(getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true)); + boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); + return Option.ofNullable(getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled)); } @Override @@ -110,10 +114,13 @@ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue, * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation type do not hit this code path * and need to be dealt with separately. */ + boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue, - properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true); + properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled); Comparable incomingOrderingVal = (Comparable) getNestedFieldVal((GenericRecord) incomingRecord, - properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true); + properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled); return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java index 88092dacbb752..2afd7df3fb204 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java @@ -32,6 +32,7 @@ public abstract class BaseKeyGenerator extends KeyGenerator { protected List partitionPathFields; protected final boolean encodePartitionPath; protected final boolean hiveStylePartitioning; + protected final boolean consistentLogicalTimestampEnabled; protected BaseKeyGenerator(TypedProperties config) { super(config); @@ -39,6 +40,8 @@ protected BaseKeyGenerator(TypedProperties config) { Boolean.parseBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.defaultValue())); this.hiveStylePartitioning = config.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), Boolean.parseBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())); + this.consistentLogicalTimestampEnabled = config.getBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); } /** @@ -78,4 +81,8 @@ public List getRecordKeyFields() { public List getPartitionPathFields() { return partitionPathFields; } + + public boolean isConsistentLogicalTimestampEnabled() { + return consistentLogicalTimestampEnabled; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java index d8535a21450cc..6a1f761219221 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java @@ -56,6 +56,16 @@ public class KeyGeneratorOptions extends HoodieConfig { .withDocumentation("Partition path field. Value to be used at the partitionPath component of HoodieKey. " + "Actual value ontained by invoking .toString()"); + public static final ConfigProperty KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED = ConfigProperty + .key("hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled") + .defaultValue("false") + .withDocumentation("When set to true, consistent value will be generated for a logical timestamp type column, " + + "like timestamp-millis and timestamp-micros, irrespective of whether row-writer is enabled. Disabled by default so " + + "as not to break the pipeline that deploy either fully row-writer path or non row-writer path. For example, " + + "if it is kept disabled then record key of timestamp type with value `2016-12-29 09:54:00` will be written as timestamp " + + "`2016-12-29 09:54:00.0` in row-writer path, while it will be written as long value `1483023240000000` in non row-writer path. " + + "If enabled, then the timestamp value will be written in both the cases."); + /** * @deprecated Use {@link #URL_ENCODE_PARTITIONING} and its methods. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index b4304a4d5dce2..e64964ed94e9c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -225,15 +225,15 @@ public void testGetNestedFieldVal() { rec.put("non_pii_col", "val1"); rec.put("pii_col", "val2"); - Object rowKey = HoodieAvroUtils.getNestedFieldVal(rec, "_row_key", true); + Object rowKey = HoodieAvroUtils.getNestedFieldVal(rec, "_row_key", true, false); assertEquals("key1", rowKey); - Object rowKeyNotExist = HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", true); + Object rowKeyNotExist = HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", true, false); assertNull(rowKeyNotExist); // Field does not exist try { - HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false); + HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false); } catch (Exception e) { assertEquals("fake_key(Part -fake_key) field not found in record. Acceptable fields were :[timestamp, _row_key, non_pii_col, pii_col]", e.getMessage()); @@ -241,7 +241,7 @@ public void testGetNestedFieldVal() { // Field exist while value not try { - HoodieAvroUtils.getNestedFieldVal(rec, "timestamp", false); + HoodieAvroUtils.getNestedFieldVal(rec, "timestamp", false, false); } catch (Exception e) { assertEquals("The value of timestamp can not be null", e.getMessage()); } @@ -255,7 +255,7 @@ public void testGetNestedFieldValWithDecimalFiled() { ByteBuffer byteBuffer = ByteBuffer.wrap(bigDecimal.unscaledValue().toByteArray()); rec.put("decimal_col", byteBuffer); - Object decimalCol = HoodieAvroUtils.getNestedFieldVal(rec, "decimal_col", true); + Object decimalCol = HoodieAvroUtils.getNestedFieldVal(rec, "decimal_col", true, false); assertEquals(bigDecimal, decimalCol); Object obj = rec.get(1); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java index 6016008e4d417..bd1e3b764e1bc 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java @@ -202,6 +202,10 @@ public static Schema getTimestampEvolvedSchema() throws IOException { return new Schema.Parser().parse(SchemaTestUtil.class.getResourceAsStream("/timestamp-test-evolved.avsc")); } + public static Schema getTimestampWithLogicalTypeSchema() throws IOException { + return new Schema.Parser().parse(SchemaTestUtil.class.getResourceAsStream("/timestamp-logical-type.avsc")); + } + public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber, String instantTime, String fileId) throws IOException { return generateAvroRecordFromJson(schema, recordNumber, instantTime, fileId, true); diff --git a/hudi-common/src/test/resources/timestamp-logical-type.avsc b/hudi-common/src/test/resources/timestamp-logical-type.avsc new file mode 100644 index 0000000000000..6720523be9927 --- /dev/null +++ b/hudi-common/src/test/resources/timestamp-logical-type.avsc @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "field1", "type": ["null", "string"], "default": null}, + {"name": "createTime", "type": ["null", {"type" : "long", "logicalType" : "timestamp-micros"}], "default": null} + ] +} \ No newline at end of file diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java index 64facf3b16f1b..fb850bace7d48 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java @@ -77,7 +77,7 @@ public HoodieRecordPayload createPayload(GenericRecord record) throws Excepti if (shouldCombine) { ValidationUtils.checkState(preCombineField != null); Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(record, - preCombineField, false); + preCombineField, false, false); return (HoodieRecordPayload) constructor.newInstance(record, orderingVal); } else { return (HoodieRecordPayload) this.constructor.newInstance(Option.of(record)); diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java index dde19b84825f3..8f36fd18bbb22 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java @@ -94,7 +94,7 @@ void testRowDataToAvroStringToRowData() { (GenericRecord) converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData); StringToRowDataConverter stringToRowDataConverter = new StringToRowDataConverter(rowType.getChildren().toArray(new LogicalType[0])); - final String recordKey = KeyGenUtils.getRecordKey(avroRecord, rowType.getFieldNames()); + final String recordKey = KeyGenUtils.getRecordKey(avroRecord, rowType.getFieldNames(), false); final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey); Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index dbcc847fc1c1a..be05dd8e2b146 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -300,6 +300,8 @@ object DataSourceWriteOptions { .withInferFunction(keyGeneraterInferFunc) .withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator`") + val KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED: ConfigProperty[String] = KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED + val ENABLE_ROW_WRITER: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.row.writer.enable") .defaultValue("true") diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java index 560b590183cf5..a0204b256ed26 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.avro.generic.GenericRecord; @@ -68,7 +69,9 @@ public JavaRDD generateInputRecords(String tableName, String sourc Option.empty()); return genericRecords.toJavaRDD().map(gr -> { String orderingVal = HoodieAvroUtils.getNestedFieldValAsString( - gr, props.getString("hoodie.datasource.write.precombine.field"), false); + gr, props.getString("hoodie.datasource.write.precombine.field"), false, props.getBoolean( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))); try { return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), props.getString("hoodie.datasource.write.payload.class")); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index d35cd49a482f4..5414b4d75404e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -237,7 +237,9 @@ object HoodieSparkSqlWriter { val hoodieAllIncomingRecords = genericRecords.map(gr => { val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns) val hoodieRecord = if (shouldCombine) { - val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false) + val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse( + DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean) .asInstanceOf[Comparable[_]] DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, keyGenerator.getKey(gr), diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 282de54f65259..9a940ebcebf02 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -84,6 +84,7 @@ object HoodieWriterUtils { hoodieConfig.setDefaultValue(ENABLE_ROW_WRITER) hoodieConfig.setDefaultValue(RECONCILE_SCHEMA) hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS) + hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED) Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala index e069df97aff55..e44b838fe7fb0 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.hudi.command -import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS} - import org.apache.avro.generic.GenericRecord import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.util.PartitionPathEncodeUtils @@ -27,7 +25,10 @@ import org.apache.hudi.keygen._ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, TimestampType} -import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} +import org.joda.time.format.DateTimeFormat + +import java.sql.Timestamp +import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS} /** * A complex key generator for sql command which do some process for the @@ -96,7 +97,11 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) val timeMs = if (rowType) { // In RowType, the partitionPathValue is the time format string, convert to millis SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue) } else { - MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS) + if (isConsistentLogicalTimestampEnabled) { + Timestamp.valueOf(_partitionValue).getTime + } else { + MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS) + } } val timestampFormat = PartitionPathEncodeUtils.escapePathName( SqlKeyGenerator.timestampTimeFormat.print(timeMs)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index ae89e8af7547f..a130c3afd85bb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -141,16 +141,16 @@ public void testAvroRecordsFieldConversion() { record.put("event_cost3", genericFixed); assertEquals(LocalDate.ofEpochDay(18000).toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_date1", - true)); + true, false)); assertEquals(LocalDate.ofEpochDay(18001).toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_date2", - true)); + true, false)); assertEquals(LocalDate.ofEpochDay(18002).toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_date3", - true)); - assertEquals("Hudi Meetup", HoodieAvroUtils.getNestedFieldValAsString(record, "event_name", true)); - assertEquals("Hudi PMC", HoodieAvroUtils.getNestedFieldValAsString(record, "event_organizer", true)); - assertEquals(bigDecimal.toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost1", true)); - assertEquals(bigDecimal.toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost2", true)); - assertEquals(bigDecimal.toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost3", true)); + true, false)); + assertEquals("Hudi Meetup", HoodieAvroUtils.getNestedFieldValAsString(record, "event_name", true, false)); + assertEquals("Hudi PMC", HoodieAvroUtils.getNestedFieldValAsString(record, "event_organizer", true, false)); + assertEquals(bigDecimal.toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost1", true, false)); + assertEquals(bigDecimal.toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost2", true, false)); + assertEquals(bigDecimal.toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost3", true, false)); } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala index 3036d50315e45..d3be8c9b3e209 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala @@ -246,8 +246,8 @@ class TestDataSourceDefaults { var converterFn: Function1[Any, Any] = _ override def getKey(record: GenericRecord): HoodieKey = { - new HoodieKey(HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyProp, true), - HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathProp, true)) + new HoodieKey(HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyProp, true, false), + HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathProp, true, false)) } override def getRecordKey(row: Row): String = { @@ -579,12 +579,12 @@ class TestDataSourceDefaults { val props = new TypedProperties() props.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "favoriteIntNumber"); - val basePayload = new OverwriteWithLatestAvroPayload(baseRecord, HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, baseOrderingVal).asInstanceOf[Comparable[_]]) + val basePayload = new OverwriteWithLatestAvroPayload(baseRecord, HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, baseOrderingVal, false).asInstanceOf[Comparable[_]]) val laterRecord = SchemaTestUtil .generateAvroRecordFromJson(schema, 2, "001", "f1") val laterOrderingVal: Object = laterRecord.get("favoriteIntNumber") - val newerPayload = new OverwriteWithLatestAvroPayload(laterRecord, HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, laterOrderingVal).asInstanceOf[Comparable[_]]) + val newerPayload = new OverwriteWithLatestAvroPayload(laterRecord, HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, laterOrderingVal, false).asInstanceOf[Comparable[_]]) // it will provide the record with greatest combine value val preCombinedPayload = basePayload.preCombine(newerPayload) @@ -606,10 +606,10 @@ class TestDataSourceDefaults { val earlierOrderingVal: Object = earlierRecord.get("favoriteIntNumber") val laterPayload = new DefaultHoodieRecordPayload(laterRecord, - HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, laterOrderingVal).asInstanceOf[Comparable[_]]) + HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, laterOrderingVal, false).asInstanceOf[Comparable[_]]) val earlierPayload = new DefaultHoodieRecordPayload(earlierRecord, - HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, earlierOrderingVal).asInstanceOf[Comparable[_]]) + HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, earlierOrderingVal, false).asInstanceOf[Comparable[_]]) // it will provide the record with greatest combine value val preCombinedPayload = laterPayload.preCombine(earlierPayload) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala new file mode 100644 index 0000000000000..e86c540133b97 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hudi + +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} + +import java.sql.{Date, Timestamp} + +class TestGenericRecordAndRowConsistency extends HoodieClientTestBase { + + var spark: SparkSession = _ + val commonOpts = Map( + HoodieWriteConfig.TBL_NAME.key -> "hoodie_type_consistency_tbl", + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "str,eventTime", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "typeId", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "typeId", + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.ComplexKeyGenerator", + DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key -> "true" + ) + + /** + * Setup method running before each test. + */ + @BeforeEach override def setUp(): Unit = { + setTableName("hoodie_type_consistency_tbl") + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + } + + @AfterEach override def tearDown(): Unit = { + cleanupSparkContexts() + } + + @Test + def testTimestampTypeConsistency(): Unit = { + val _spark = spark + import _spark.implicits._ + + val df = Seq( + (1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"), + (1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"), + (2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"), + (2, Timestamp.valueOf("2016-05-09 10:12:43"), "def") + ).toDF("typeId", "eventTime", "str") + + testConsistencyBetweenGenericRecordAndRow(df) + } + + @Test + def testDateTypeConsistency(): Unit = { + val _spark = spark + import _spark.implicits._ + + val df = Seq( + (1, Date.valueOf("2014-01-01"), "abc"), + (1, Date.valueOf("2014-11-30"), "abc"), + (2, Date.valueOf("2016-12-29"), "def"), + (2, Date.valueOf("2016-05-09"), "def") + ).toDF("typeId", "eventTime", "str") + + testConsistencyBetweenGenericRecordAndRow(df) + } + + private def testConsistencyBetweenGenericRecordAndRow(df: DataFrame): Unit = { + val _spark = spark + import _spark.implicits._ + + // upsert operation generate recordKey by GenericRecord + val tempRecordPath = basePath + "/record_tbl/" + df.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, "upsert") + .mode(org.apache.spark.sql.SaveMode.Overwrite) + .save(tempRecordPath) + + val data1 = spark.read.format("hudi") + .load(tempRecordPath) + .select("_hoodie_record_key") + .map(_.toString()).collect().sorted + + // bulk_insert operation generate recordKey by Row + val tempRowPath = basePath + "/row_tbl/" + df.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, "bulk_insert") + .mode(org.apache.spark.sql.SaveMode.Overwrite) + .save(tempRowPath) + + val data2 = spark.read.format("hudi") + .load(tempRowPath) + .select("_hoodie_record_key") + .map(_.toString()).collect().sorted + + assert(data1 sameElements data2) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 35f4a61fe1bb2..bd520c91f4fa5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -25,9 +25,8 @@ import org.apache.hudi.common.config.HoodieConfig import org.apache.hudi.common.model._ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator -import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} -import org.apache.hudi.exception.{ExceptionUtil, HoodieException} +import org.apache.hudi.exception.HoodieException import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode import org.apache.hudi.functional.TestBootstrap import org.apache.hudi.hive.HiveSyncConfig @@ -35,13 +34,12 @@ import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql._ import org.apache.spark.sql.functions.{expr, lit} import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.hudi.command.SqlKeyGenerator import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} -import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail} -import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource} @@ -115,6 +113,13 @@ class TestHoodieSparkSqlWriter { sqlContext.clearCache(); sqlContext = null; } + if (sc != null) { + sc.stop() + sc = null + } + if (spark != null) { + spark.close() + } } /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index ee1e2e6f42cff..c34ed79105bf5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.hudi import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.exception.HoodieDuplicateKeyException +import java.io.File + class TestInsertTable extends TestHoodieSqlBase { test("Test Insert Into") { @@ -238,29 +240,46 @@ class TestInsertTable extends TestHoodieSqlBase { ) typeAndValue.foreach { case (partitionType, partitionValue) => val tableName = generateTableName - // Create table - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | dt $partitionType - |) using hudi - | tblproperties (primaryKey = 'id') - | partitioned by (dt) - | location '${tmp.getCanonicalPath}/$tableName' - """.stripMargin) - spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10") - spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue") - checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")( - Seq(1, "a1", 10, removeQuotes(partitionValue).toString), - Seq(2, "a2", 10, removeQuotes(partitionValue).toString) - ) + validateDifferentTypesOfPartitionColumn(tmp, partitionType, partitionValue, tableName) + } + } + } + + test("Test TimestampType Partition Column With Consistent Logical Timestamp Enabled") { + withTempDir { tmp => + val typeAndValue = Seq( + ("timestamp", "'2021-05-20 00:00:00'"), + ("date", "'2021-05-20'") + ) + typeAndValue.foreach { case (partitionType, partitionValue) => + val tableName = generateTableName + spark.sql(s"set hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled=true") + validateDifferentTypesOfPartitionColumn(tmp, partitionType, partitionValue, tableName) } } } + private def validateDifferentTypesOfPartitionColumn(tmp: File, partitionType: String, partitionValue: Any, tableName: String) = { + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | dt $partitionType + |) using hudi + | tblproperties (primaryKey = 'id') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName' + """.stripMargin) + spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10") + spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue") + checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")( + Seq(1, "a1", 10, removeQuotes(partitionValue).toString), + Seq(2, "a2", 10, removeQuotes(partitionValue).toString) + ) + } + test("Test insert for uppercase table name") { withTempDir{ tmp => val tableName = s"H_$generateTableName" diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 96a7c25de8819..6cb3ed1847750 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -55,6 +55,7 @@ import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.hudi.utilities.UtilHelpers; @@ -451,7 +452,9 @@ public Pair>> readFromSource( JavaRDD avroRDD = avroRDDOptional.get(); JavaRDD records = avroRDD.map(gr -> { HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr, - (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false)) + (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())))) : DataSourceUtils.createPayload(cfg.payloadClassName, gr); return new HoodieRecord<>(keyGenerator.getKey(gr), payload); });