-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-37705][SQL] Rebase timestamps in the session time zone saved in Parquet/Avro metadata #34973
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #146440 has finished for PR 34973 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
@cloud-fan @sadikovi Could you have a look at this PR. I am still working on it but existing tests have passed already. Need to just add more tests for the cases when JVM and the session time zone are different in the |
|
Kubernetes integration test status failure |
|
Test build #146474 has finished for PR 34973 at commit
|
|
Test build #146481 has finished for PR 34973 at commit
|
|
Thanks, I will take a look later. I noticed that we are changing OSS Spark method signature. Would it be possible to somehow avoid breaking binary compatibility between OSS classes/methods? At least, it would be good to assess the changes from this point. |
|
Test build #146538 has finished for PR 34973 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Test build #146551 has finished for PR 34973 at commit
|
|
Kubernetes integration test status failure |
|
Test build #146561 has finished for PR 34973 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #146566 has finished for PR 34973 at commit
|
| "2_4_5" -> failInRead _, | ||
| "2_4_6" -> successInRead _ | ||
| "2_4_6" -> successInRead _, | ||
| "3_2_2" -> successInRead _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3.2.2? We have not released 3.2.1 yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used branch-3.2 to generate the golden files. See the version in PR description:
$ java -jar parquet-tools-1.12.0.jar meta sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v3_2_2.snappy.parquet
file: file:/Users/maximgekk/proj/parquet-rebase-save-tz/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v3_2_2.snappy.parquet
creator: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra: org.apache.spark.version = 3.2.2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have checked out branch-3.2 and ran:
$ ./build/mvn -Phive -Phive-thriftserver -Dskip -DskipTests package
and got core/target/extra-resources/spark-version-info.properties:
$ cat ./core/target/extra-resources/spark-version-info.properties
version=3.2.2-SNAPSHOT
user=maximgekk
revision=d1cd110c20817eb1ccd716e099be5712df1f670c
branch=branch-3.2
date=2021-12-23T19:12:40Z
url=https://github.com/apache/spark.git
should be 3.2.1-SNAPSHOT Not? cc @gengliangwang @cloud-fan ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me take the commit 5d45a41 and generate the golden files using it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it should be 3.2.1-SNAPSHOT..The RC script aborted sometimes. This should be a mistake when I rerun the script.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang can we fix it? otherwise, we will have a problem when releasing 3.2.1.
| * in Proleptic Gregorian calendar. It can be negative. | ||
| * @return The rebased microseconds since the epoch in Julian calendar. | ||
| */ | ||
| def rebaseGregorianToJulianMicros(micros: Long): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it used in test only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not only, see
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
Line 328 in f7dabd8
| val julianMicros = rebaseGregorianToJulianMicros(timestamp) |
| * in the Julian calendar. It can be negative. | ||
| * @return The rebased microseconds since the epoch in Proleptic Gregorian calendar. | ||
| */ | ||
| def rebaseJulianToGregorianMicros(micros: Long): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it should be used everywhere except rebasing in Parquet/Avro like timestamp formatting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how many places do we have? we should understand the places that rely on JVM timezone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rebaseJulianToGregorianMicros is used from:
fromJavaTimestamp()which is used (55 times) fromCatalystTypeConverter.TimestampConverterCatalogColumnStat.fromExternalString()Literal.apply()LegacyFastTimestampFormatter.format()LegacySimpleTimestampFormatter.parse()HiveInspectorsHadoopTableReaderKafkaRecordToRowConverter.toInternalRowWithoutHeaders&toInternalRowWithHeadersJdbcUtils.makeGetter()OrcAtomicColumnVectorParquetFilters.timestampToMicros<-- NEED TO RE-CHECK THIS ONE- In many tests
LegacyFastTimestampFormatter.parse()which is used (20 times) from:CSVInferSchemaUnivocityGeneratorUnivocityParserJacksonGeneratorJacksonParserJsonInferSchema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rebaseGregorianToJulianMicros is used from:
toJavaTimestampwhich is used (33 times) from:CatalystTypeConverter.TimestampConverterIso8601TimestampFormatter.format()LegacySimpleTimestampFormatter.formatHiveInspectorsBaseScriptTransformationExec.outputFieldWritersJdbcUtils.makeSetterOrcFilters.castLiteralValueOrcSerializer.newConverter
LegacyFastTimestampFormatter.format()(the same as forparse()above).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we are fine:
- when converting between java.sql.Timestamp, we must rely on JVM timezone
- when reading/writing JSON/CSV with legacy mode, we need to follow the legacy behavior and rebase the timestamp. We should have used the session timezone to rebase, but the ship is already sailed and we have to keep using JVM timezone now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ParquetFilters.timestampToMicros <-- NEED TO RE-CHECK THIS ONE
This is fine too since the rebasing happens locally on the same JVM. Also users can enable Java 8 for filters, see #28696
| for (int i = 0; i < total; i += 1) { | ||
| c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong())); | ||
| c.putLong( | ||
| rowId + i, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 spaces indentation
| // "SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps" | ||
| ignore("SPARK-31855: generate test files for checking compatibility with Spark 2.4") { | ||
| // "SPARK-31183, SPARK-37705: compatibility with Spark 2.4/3.2 in reading dates/timestamps" | ||
| ignore("SPARK-31855: generate test files for checking compatibility with Spark 2.4/3.2") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do you use this test to generate the new testing data files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just replace ignore -> test, and run it. For example, I checkout branch-3.2 and ran the test to generate the golden files. After that, I copied new files to PR's branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, looking at the code, seems the version is fixed as 2_4_6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, made it more flexible by using SPARK_VERSION_SHORT
| "2_4_5" -> failInRead _, | ||
| "2_4_6" -> successInRead _).foreach { case (version, checkDefaultRead) => | ||
| "2_4_6" -> successInRead _, | ||
| "3_2_2" -> successInRead _).foreach { case (version, checkDefaultRead) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, should be 3_2_1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generated files have 3.2.2 in metadata, in fact. I would keep it as is. For us, more important that the files have the keys:
extra: org.apache.spark.legacyINT96 =
extra: org.apache.spark.legacyDateTime =
but not any time zone.
|
thanks, merging to master! |
|
@MaxGekk can you open a backport PR for 3.2? There are conflicts. |
…n Parquet/Avro metadata In the PR, I propose to add new metadata key `org.apache.spark.timeZone` which Spark writes to Parquet/Avro matadata while performing of datetimes rebase in the `LEGACY` mode (see the SQL configs: - `spark.sql.parquet.datetimeRebaseModeInWrite`, - `spark.sql.parquet.int96RebaseModeInWrite` and - `spark.sql.avro.datetimeRebaseModeInWrite`). The writers uses the current session time zone (see the SQL config `spark.sql.session.timeZone`) in rebasing of Parquet/Avro timestamp columns. At the reader side, Spark tries to get info about the writer's time zone from the new metadata property: ``` $ java -jar ~parquet-tools-1.12.0.jar meta ./part-00000-b0d90bf0-ce60-4b4f-b453-b33f61ab2b2a-c000.snappy.parquet ... extra: org.apache.spark.timeZone = America/Los_Angeles extra: org.apache.spark.legacyDateTime = ``` and use it in rebasing timestamps to the Proleptic Gregorian calendar. In the case when the reader cannot retrieve the original time zone from Parquet/Avro metadata, it uses the default JVM time zone for backward compatibility. Before the changes, Spark assumes that a writer uses the default JVM time zone while rebasing of dates/timestamps. And if a reader and the writer have different JVM time zone settings, the reader cannot load such columns in the `LEGACY` mode correctly. So, the reader will have full info about writer settings after the changes. Yes. After the changes, Parquet/Avro writers use the session time zone while timestamp rebasing in the `LEGACY` mode instead of the default JVM time zone. Need to highlight that the session time zone is set to the JVM time zone by default. 1. By running new tests: ``` $ build/sbt "test:testOnly *ParquetRebaseDatetimeV1Suite" $ build/sbt "test:testOnly *ParquetRebaseDatetimeV2Suite" $ build/sbt "test:testOnly *AvroV1Suite" $ build/sbt "test:testOnly *AvroV2Suite" ``` 2. And related existing test suites: ``` $ build/sbt "test:testOnly *DateTimeUtilsSuite" $ build/sbt "test:testOnly *RebaseDateTimeSuite" $ build/sbt "test:testOnly *TimestampFormatterSuite" $ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroCatalystDataConversionSuite" $ build/sbt "test:testOnly *AvroRowReaderSuite" $ build/sbt "test:testOnly *AvroSerdeSuite" $ build/sbt "test:testOnly *ParquetVectorizedSuite" ``` 3. Also modified the test `SPARK-31159: rebasing timestamps in write` to check loading timestamps in the LEGACY mode when the session time zone and JVM time zone are different. 4. Generated parquet files by Spark 3.2.0 (the commit apache@5d45a41) using the test `"SPARK-31806: generate test files for checking compatibility with Spark 2.4"`. The parquet files don't contain info about the original time zone: ``` $ java -jar ~/Downloads/parquet-tools-1.12.0.jar meta sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v3_2_0.snappy.parquet file: file:/Users/maximgekk/proj/parquet-rebase-save-tz/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v3_2_0.snappy.parquet creator: parquet-mr version 1.12.1 (build 2a5c06c58fa987f85aa22170be14d927d5ff6e7d) extra: org.apache.spark.version = 3.2.0 extra: org.apache.spark.legacyINT96 = extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"dict","type":"timestamp","nullable":true,"metadata":{}},{"name":"plain","type":"timestamp","nullable":true,"metadata":{}}]} extra: org.apache.spark.legacyDateTime = file schema: spark_schema -------------------------------------------------------------------------------- dict: OPTIONAL INT64 L:TIMESTAMP(MICROS,true) R:0 D:1 plain: OPTIONAL INT64 L:TIMESTAMP(MICROS,true) R:0 D:1 ``` By running the test `"SPARK-31159, SPARK-37705: compatibility with Spark 2.4/3.2 in reading dates/timestamps"`, check loading of mixed parquet files generated by Spark 2.4.5/2.4.6 and 3.2.0/master. 5. Generated avro files by Spark 3.2.0 (the commit apache@5d45a41) using the test `"SPARK-31855: generate test files for checking compatibility with Spark 2.4"`. The avro files don't contain info about the original time zone: ``` $ java -jar ~/Downloads/avro-tools-1.9.2.jar getmeta external/avro/src/test/resources/before_1582_timestamp_micros_v3_2_0.avro avro.schema {"type":"record","name":"topLevelRecord","fields":[{"name":"dt","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]}]} org.apache.spark.version 3.2.0 avro.codec snappy org.apache.spark.legacyDateTime ``` By running the test `"SPARK-31159, SPARK-37705: compatibility with Spark 2.4/3.2 in reading dates/timestamps"`, check loading of mixed avro files generated by Spark 2.4.5/2.4.6 and 3.2.0/master. Closes apache#34973 from MaxGekk/parquet-rebase-save-tz. Authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit ef3a470) Signed-off-by: Max Gekk <[email protected]>
What changes were proposed in this pull request?
In the PR, I propose to add new metadata key
org.apache.spark.timeZonewhich Spark writes to Parquet/Avro matadata while performing of datetimes rebase in theLEGACYmode (see the SQL configs:spark.sql.parquet.datetimeRebaseModeInWrite,spark.sql.parquet.int96RebaseModeInWriteandspark.sql.avro.datetimeRebaseModeInWrite).The writers uses the current session time zone (see the SQL config
spark.sql.session.timeZone) in rebasing of Parquet/Avro timestamp columns.At the reader side, Spark tries to get info about the writer's time zone from the new metadata property:
and use it in rebasing timestamps to the Proleptic Gregorian calendar. In the case when the reader cannot retrieve the original time zone from Parquet/Avro metadata, it uses the default JVM time zone for backward compatibility.
Why are the changes needed?
Before the changes, Spark assumes that a writer uses the default JVM time zone while rebasing of dates/timestamps. And if a reader and the writer have different JVM time zone settings, the reader cannot load such columns in the
LEGACYmode correctly. So, the reader will have full info about writer settings after the changes.Does this PR introduce any user-facing change?
Yes. After the changes, Parquet/Avro writers use the session time zone while timestamp rebasing in the
LEGACYmode instead of the default JVM time zone. Need to highlight that the session time zone is set to the JVM time zone by default.How was this patch tested?
Also modified the test
SPARK-31159: rebasing timestamps in writeto check loading timestamps in the LEGACY mode when the session time zone and JVM time zone are different.Generated parquet files by Spark 3.2.0 (the commit 5d45a41) using the test
"SPARK-31806: generate test files for checking compatibility with Spark 2.4". The parquet files don't contain info about the original time zone:By running the test
"SPARK-31159, SPARK-37705: compatibility with Spark 2.4/3.2 in reading dates/timestamps", check loading of mixed parquet files generated by Spark 2.4.5/2.4.6 and 3.2.0/master."SPARK-31855: generate test files for checking compatibility with Spark 2.4". The avro files don't contain info about the original time zone:By running the test
"SPARK-31159, SPARK-37705: compatibility with Spark 2.4/3.2 in reading dates/timestamps", check loading of mixed avro files generated by Spark 2.4.5/2.4.6 and 3.2.0/master.