-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[MINOR] Fix logical type issue for timestamp columns #17601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: branch-0.x
Are you sure you want to change the base?
[MINOR] Fix logical type issue for timestamp columns #17601
Conversation
…4161) Co-authored-by: Jonathan Vexler <=> Co-authored-by: sivabalan <[email protected]> Co-authored-by: Vamsi <[email protected]> Co-authored-by: Y Ethan Guo <[email protected]> Co-authored-by: Lin Liu <[email protected]>
ac2916a to
5ef5773
Compare
408cc29 to
0c4e026
Compare
0c4e026 to
79c4a88
Compare
8583da1 to
0c7b7b9
Compare
6f17960 to
25fd997
Compare
fcbe23c to
20ada07
Compare
lokeshj1703
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@linliu-code Thanks for working on this! The PR contains a few changes which are not part of https://github.com/apache/hudi/pull/14161/files. Can we add description about how the fix works for older hudi tables. Also the original PR mentions a limitation.
However, we used the InternalSchema system to do various operations such as fix null ordering, reorder, and add columns. At the time, InternalSchema only had a single Timestamp type. When converting back to avro, this was assumed to be micros.
Is this limitation fixed in older hudi tables?
| <slf4j.version>2.0.7</slf4j.version> | ||
| <skip.hudi-spark2.unit.tests>true</skip.hudi-spark2.unit.tests> | ||
| <skipITs>true</skipITs> | ||
| <spark32orEarlier>false</spark32orEarlier> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The property is not used anywhere. We will also need to set it as false in other profiles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the schema repair only done for Spark 3.4+ versions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
| public void testRepairLongWithoutLogicalTypeToLocalTimestampMicros() { | ||
| Schema requestedSchema = Schema.create(Schema.Type.LONG); | ||
| Schema tableSchema = LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); | ||
|
|
||
| Schema result = AvroSchemaRepair.repairLogicalTypes(requestedSchema, tableSchema); | ||
|
|
||
| assertNotSame(requestedSchema, result, "Should create a new schema with logical type"); | ||
| assertEquals(Schema.Type.LONG, result.getType()); | ||
| assertEquals(LogicalTypes.localTimestampMicros(), result.getLogicalType()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we repairing in this scenario? What if actual file logical type is millis but it is not present in the schema?
| public static boolean hasTimestampMillisField(Schema tableSchema) { | ||
| switch (tableSchema.getType()) { | ||
| case RECORD: | ||
| for (Schema.Field field : tableSchema.getFields()) { | ||
| if (hasTimestampMillisField(field.schema())) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
|
|
||
| case ARRAY: | ||
| return hasTimestampMillisField(tableSchema.getElementType()); | ||
|
|
||
| case MAP: | ||
| return hasTimestampMillisField(tableSchema.getValueType()); | ||
|
|
||
| case UNION: | ||
| return hasTimestampMillisField(AvroSchemaUtils.getNonNullTypeFromUnion(tableSchema)); | ||
|
|
||
| default: | ||
| return tableSchema.getType() == Schema.Type.LONG | ||
| && (tableSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis || tableSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMillis); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Check if LogicalTypes.LocalTimestampMillis is supported in the current Avro version | ||
| * | ||
| * @return true if LocalTimestampMillis is available, false otherwise | ||
| */ | ||
| public static boolean isLocalTimestampMillisSupported() { | ||
| try { | ||
| return Arrays.stream(LogicalTypes.class.getDeclaredClasses()) | ||
| .anyMatch(c -> c.getSimpleName().equals("LocalTimestampMillis")); | ||
| } catch (Exception e) { | ||
| return false; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like these APIs are not used. Should we remove these?
| public static Option<Schema> findNestedFieldSchema(Schema schema, String fieldName) { | ||
| if (StringUtils.isNullOrEmpty(fieldName)) { | ||
| return Option.empty(); | ||
| } | ||
| String[] parts = fieldName.split("\\."); | ||
| for (String part : parts) { | ||
| Schema.Field foundField = getNonNullTypeFromUnion(schema).getField(part); | ||
| if (foundField == null) { | ||
| throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema); | ||
| } | ||
| schema = foundField.schema(); | ||
| } | ||
| return Option.of(getNonNullTypeFromUnion(schema)); | ||
| } | ||
|
|
||
| public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName) { | ||
| return findNestedFieldSchema(schema, fieldName).map(Schema::getType); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These APIs are not used anywhere.
| // NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2) | ||
| // Only add conversions if they're available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we validate the fix and added tests with spark 2? I am not sure if CI covers it by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we only make the conversion for Spark3.4+.
| // private static final TimeConversions.LocalTimestampMillisConversion LOCAL_TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.LocalTimestampMillisConversion(); | ||
| // private static final TimeConversions.LocalTimestampMicrosConversion LOCAL_TIMESTAMP_MICROS_CONVERSION = new TimeConversions.LocalTimestampMicrosConversion(); | ||
|
|
||
| // NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might not need changes in this class since it is more about adding support for logical types?
| case BYTES: | ||
| case STRING: | ||
| return oldValue; | ||
| case LONG: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even these changes can be revisited. They may not be required.
| private int readRecords = 0; | ||
|
|
||
| private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content) throws IOException { | ||
| private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content, boolean enableLogicalTimestampFieldRepair) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is always true. Can be omitted.
| return Instant.ofEpochSecond(epochSeconds, nanoAdjustment); | ||
| } | ||
|
|
||
| public static Instant nanosToInstant(long nanosFromEpoch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are unused
| } else if ( | ||
| logical instanceof LogicalTypes.TimeMillis | ||
| || logical instanceof LogicalTypes.TimeMicros) { | ||
| } else if (logical instanceof LogicalTypes.TimeMillis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes in this class can also be revisited since these may not be required for 0.x branch
20ada07 to
11dfcaf
Compare
…oKeyGenerator (apache#7913) Co-authored-by: Sydney Beal <[email protected]> Co-authored-by: Y Ethan Guo <[email protected]>
11dfcaf to
0926f6a
Compare
d264474 to
a62e355
Compare
a62e355 to
54fd3db
Compare
CI report:
Bot commands@hudi-bot supports the following commands:
|
Change Logs
This pr #9743 adds more schema evolution functionality and schema processing. However, we used the InternalSchema system to do various operations such as fix null ordering, reorder, and add columns. At the time, InternalSchema only had a single Timestamp type. When converting back to avro, this was assumed to be micros. Therefore, if the schema provider had any millis columns, the processed schema would end up with those columns as micros.
In this pr to update column stats with better support for logical types: #13711, the schema issues were fixed, as well as additional issues with handling and conversion of timestamps during ingestion.
this pr aims to add functionality to spark and hive readers and writers to automatically repair affected tables.
After switching to use the 1.1 binary, the affected columns will undergo evolution from timestamp-micros to timestamp-mills. Normally a lossy evolution that is not supported, this evolution is ok because the data is actually still timestamp-millis it is just mislabeled as micros in the parquet and table schemas
Impact
When reading from a hudi table using spark or hive reader if the table schema has a column as millis, but the data schema is micros, we will assume that this column is affected and read it as a millis value instead of a micros value. This correction is also applied to all readers that the default write paths use. As a table is rewritten the parquet files will be correct. A table's latest snapshot can be immediately fixed by writing one commit with the 1.1 binary, and then clustering the entire table.
Risk level (write none, low medium or high below)
High,
extensive testing was done and functional tests were added.
Documentation Update
#14100
Contributor's checklist