Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hudi.common.config.TimestampKeyGeneratorConfig.DATE_TIME_PARSER;
Expand All @@ -54,7 +55,7 @@
*/
public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
public enum TimestampType implements Serializable {
UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR
UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, EPOCHMICROSECONDS, SCALAR
}

private final TimeUnit timeUnit;
Expand Down Expand Up @@ -93,6 +94,9 @@ public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException
case EPOCHMILLISECONDS:
timeUnit = MILLISECONDS;
break;
case EPOCHMICROSECONDS:
timeUnit = MICROSECONDS;
break;
case UNIX_TIMESTAMP:
timeUnit = SECONDS;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class TimestampKeyGeneratorConfig extends HoodieConfig {
.withAlternatives(OLD_TIMESTAMP_KEYGEN_CONFIG_PREFIX + "timestamp.type")
.markAdvanced()
.withDocumentation("Timestamp type of the field, which should be one of the timestamp types "
+ "supported: `UNIX_TIMESTAMP`, `DATE_STRING`, `MIXED`, `EPOCHMILLISECONDS`, `SCALAR`.");
+ "supported: `UNIX_TIMESTAMP`, `DATE_STRING`, `MIXED`, `EPOCHMILLISECONDS`,"
+ " `EPOCHMICROSECONDS`, `SCALAR`.");

public static final ConfigProperty<String> INPUT_TIME_UNIT = ConfigProperty
.key(TIMESTAMP_KEYGEN_CONFIG_PREFIX + "timestamp.scalar.time.unit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,10 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
val tableConfig = metaClient.getTableConfig
if (null != tableConfig.getKeyGeneratorClassName
&& tableConfig.getKeyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP.getClassName)
&& tableConfig.propsMap.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key()).matches("SCALAR|UNIX_TIMESTAMP|EPOCHMILLISECONDS")) {
// For TIMESTAMP key generator when TYPE is SCALAR, UNIX_TIMESTAMP or EPOCHMILLISECONDS,
&& tableConfig.propsMap.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
.matches("SCALAR|UNIX_TIMESTAMP|EPOCHMILLISECONDS|EPOCHMICROSECONDS")) {
// For TIMESTAMP key generator when TYPE is SCALAR, UNIX_TIMESTAMP,
// EPOCHMILLISECONDS, or EPOCHMICROSECONDS,
// we couldn't reconstruct initial partition column values from partition paths due to lost data after formatting in most cases.
// But the output for these cases is in a string format, so we can pass partitionPath as UTF8String
Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,31 @@ public void testTimestampBasedKeyGenerator() throws IOException {
assertEquals("1970-01-01 12:00:00", keyGen.getPartitionPath(baseRow));
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
assertEquals(UTF8String.fromString("1970-01-01 12:00:00"), keyGen.getPartitionPath(internalRow, baseRow.schema()));

// Timestamp field is in long type, with `EPOCHMICROSECONDS` timestamp type in the key generator
baseRecord.put("createTime", 1578283932123456L);
properties = getBaseKeyConfig("createTime", "EPOCHMICROSECONDS", "yyyy-MM-dd hh", "GMT+8:00", null);
keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey key = keyGen.getKey(baseRecord);
assertEquals("2020-01-06 12", key.getPartitionPath());
baseRow = genericRecordToRow(baseRecord);
assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow));
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
assertEquals(UTF8String.fromString("2020-01-06 12"), keyGen.getPartitionPath(internalRow, baseRow.schema()));

// Timestamp field is in decimal type, with `EPOCHMICROSECONDS` timestamp type in the key generator
decimal = new BigDecimal("1578283932123456.0001");
resolvedNullableSchema = AvroSchemaUtils.resolveNullableSchema(
schema.getField("createTimeDecimal").schema());
avroDecimal = conversion.toFixed(decimal, resolvedNullableSchema, LogicalTypes.decimal(20, 4));
baseRecord.put("createTimeDecimal", avroDecimal);
properties = getBaseKeyConfig(
"createTimeDecimal", "EPOCHMICROSECONDS", "yyyy-MM-dd hh", "GMT+8:00", null);
keyGen = new TimestampBasedKeyGenerator(properties);
bigDecimalKey = keyGen.getKey(baseRecord);
assertEquals("2020-01-06 12", bigDecimalKey.getPartitionPath());
baseRow = genericRecordToRow(baseRecord);
assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow));
}

@Test
Expand Down