diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java index 1990b2dab44ef..ea2e0911d3010 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java @@ -41,6 +41,7 @@ import java.util.TimeZone; import java.util.concurrent.TimeUnit; +import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.hudi.common.config.TimestampKeyGeneratorConfig.DATE_TIME_PARSER; @@ -54,7 +55,7 @@ */ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator { public enum TimestampType implements Serializable { - UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR + UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, EPOCHMICROSECONDS, SCALAR } private final TimeUnit timeUnit; @@ -93,6 +94,9 @@ public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException case EPOCHMILLISECONDS: timeUnit = MILLISECONDS; break; + case EPOCHMICROSECONDS: + timeUnit = MICROSECONDS; + break; case UNIX_TIMESTAMP: timeUnit = SECONDS; break; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/TimestampKeyGeneratorConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/TimestampKeyGeneratorConfig.java index 46b66371b3112..367b7593ca6af 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/TimestampKeyGeneratorConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/TimestampKeyGeneratorConfig.java @@ -42,7 +42,8 @@ public class TimestampKeyGeneratorConfig extends HoodieConfig { .withAlternatives(OLD_TIMESTAMP_KEYGEN_CONFIG_PREFIX + "timestamp.type") .markAdvanced() .withDocumentation("Timestamp type of the field, which should be one of the timestamp types " - + "supported: `UNIX_TIMESTAMP`, `DATE_STRING`, `MIXED`, `EPOCHMILLISECONDS`, `SCALAR`."); + + "supported: `UNIX_TIMESTAMP`, `DATE_STRING`, `MIXED`, `EPOCHMILLISECONDS`," + + " `EPOCHMICROSECONDS`, `SCALAR`."); public static final ConfigProperty INPUT_TIME_UNIT = ConfigProperty .key(TIMESTAMP_KEYGEN_CONFIG_PREFIX + "timestamp.scalar.time.unit") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index c6d24974b0799..e394c030df02d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -371,8 +371,10 @@ class SparkHoodieTableFileIndex(spark: SparkSession, val tableConfig = metaClient.getTableConfig if (null != tableConfig.getKeyGeneratorClassName && tableConfig.getKeyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP.getClassName) - && tableConfig.propsMap.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key()).matches("SCALAR|UNIX_TIMESTAMP|EPOCHMILLISECONDS")) { - // For TIMESTAMP key generator when TYPE is SCALAR, UNIX_TIMESTAMP or EPOCHMILLISECONDS, + && tableConfig.propsMap.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key()) + .matches("SCALAR|UNIX_TIMESTAMP|EPOCHMILLISECONDS|EPOCHMICROSECONDS")) { + // For TIMESTAMP key generator when TYPE is SCALAR, UNIX_TIMESTAMP, + // EPOCHMILLISECONDS, or EPOCHMICROSECONDS, // we couldn't reconstruct initial partition column values from partition paths due to lost data after formatting in most cases. // But the output for these cases is in a string format, so we can pass partitionPath as UTF8String Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java index ec93ea229accb..d5618fbe8abd6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java @@ -209,6 +209,31 @@ public void testTimestampBasedKeyGenerator() throws IOException { assertEquals("1970-01-01 12:00:00", keyGen.getPartitionPath(baseRow)); internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow); assertEquals(UTF8String.fromString("1970-01-01 12:00:00"), keyGen.getPartitionPath(internalRow, baseRow.schema())); + + // Timestamp field is in long type, with `EPOCHMICROSECONDS` timestamp type in the key generator + baseRecord.put("createTime", 1578283932123456L); + properties = getBaseKeyConfig("createTime", "EPOCHMICROSECONDS", "yyyy-MM-dd hh", "GMT+8:00", null); + keyGen = new TimestampBasedKeyGenerator(properties); + HoodieKey key = keyGen.getKey(baseRecord); + assertEquals("2020-01-06 12", key.getPartitionPath()); + baseRow = genericRecordToRow(baseRecord); + assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow)); + internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow); + assertEquals(UTF8String.fromString("2020-01-06 12"), keyGen.getPartitionPath(internalRow, baseRow.schema())); + + // Timestamp field is in decimal type, with `EPOCHMICROSECONDS` timestamp type in the key generator + decimal = new BigDecimal("1578283932123456.0001"); + resolvedNullableSchema = AvroSchemaUtils.resolveNullableSchema( + schema.getField("createTimeDecimal").schema()); + avroDecimal = conversion.toFixed(decimal, resolvedNullableSchema, LogicalTypes.decimal(20, 4)); + baseRecord.put("createTimeDecimal", avroDecimal); + properties = getBaseKeyConfig( + "createTimeDecimal", "EPOCHMICROSECONDS", "yyyy-MM-dd hh", "GMT+8:00", null); + keyGen = new TimestampBasedKeyGenerator(properties); + bigDecimalKey = keyGen.getKey(baseRecord); + assertEquals("2020-01-06 12", bigDecimalKey.getPartitionPath()); + baseRow = genericRecordToRow(baseRecord); + assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow)); } @Test