[SPARK-24773] Avro: support logical timestamp type with different precisions#21935
[SPARK-24773] Avro: support logical timestamp type with different precisions#21935gengliangwang wants to merge 8 commits intoapache:masterfrom
Conversation
|
@gengliangwang, thanks! I am a bot who has found some folks who might be able to help with the review:@cloud-fan, @gatorsmile and @HyukjinKwon |
|
Test build #93838 has finished for PR 21935 at commit
|
| case DateType => builder.longType() | ||
| case TimestampType => builder.longType() | ||
| case TimestampType => | ||
| // To be consistent with the previous behavior of writing Timestamp type with Avro 1.7, |
There was a problem hiding this comment.
the previous behavior is: we can't write out timestamp data, isn't it?
There was a problem hiding this comment.
also we should follow parquet and have a config spark.sql.avro.outputTimestampType to control it.
There was a problem hiding this comment.
Previously we write timestamp as Long and divide the value by 1000(millisecond precision).
Maybe I need to revise the comment.
+1 on the new config.
There was a problem hiding this comment.
For now I think writing out timestamp micros should be good
| case TimestampType => | ||
| // To be consistent with the previous behavior of writing Timestamp type with Avro 1.7, | ||
| // the default output Avro Timestamp type is with millisecond precision. | ||
| builder.longBuilder().prop(LogicalType.LOGICAL_TYPE_PROP, "timestamp-millis").endLong() |
There was a problem hiding this comment.
is there a better API for it? hardcoding a string is hacky.
|
Test build #93859 has finished for PR 21935 at commit
|
| catalystType: DataType, | ||
| path: List[String]): (CatalystDataUpdater, Int, Any) => Unit = | ||
| path: List[String]): (CatalystDataUpdater, Int, Any) => Unit = { | ||
| (avroType.getLogicalType, catalystType) match { |
There was a problem hiding this comment.
Can we do this like:
case (LONG, TimestampType) => avroType.getLogicalType match {
case _: TimestampMillis => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
case _: TimestampMicros => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])
case _ => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
}? Looks they have Avro long type anyway. Thought it's better to read and actually safer and correct.
| * This function takes an avro schema and returns a sql schema. | ||
| */ | ||
| def toSqlType(avroSchema: Schema): SchemaType = { | ||
| avroSchema.getLogicalType match { |
| case _: TimestampMicros => (updater, ordinal, value) => | ||
| updater.setLong(ordinal, value.asInstanceOf[Long]) | ||
| case _ => (updater, ordinal, value) => | ||
| updater.setLong(ordinal, value.asInstanceOf[Long] * 1000) |
There was a problem hiding this comment.
Let's add a comment to say it's for backward compatibility reasons. Also we should only do it when logical type is null. For other logical types, we should fail here.
| (getter, ordinal) => avroType.getLogicalType match { | ||
| case _: TimestampMillis => getter.getLong(ordinal) / 1000 | ||
| case _: TimestampMicros => getter.getLong(ordinal) | ||
| case _ => getter.getLong(ordinal) |
| case LONG => SchemaType(LongType, nullable = false) | ||
| case LONG => avroSchema.getLogicalType match { | ||
| case _: TimestampMillis | _: TimestampMicros => | ||
| return SchemaType(TimestampType, nullable = false) |
| case TimestampType => builder.longType() | ||
| case TimestampType => | ||
| val timestampType = outputTimestampType match { | ||
| case "TIMESTAMP_MILLIS" => LogicalTypes.timestampMillis() |
There was a problem hiding this comment.
don't hardcode the strings, we can write
if (outputTimestampType == AvroOutputTimestampType.TIMESTAMP_MICROS.toString) ...
|
Test build #93884 has finished for PR 21935 at commit
|
| prevNameSpace: String = ""): Schema = { | ||
| prevNameSpace: String = "", | ||
| outputTimestampType: AvroOutputTimestampType.Value = AvroOutputTimestampType.TIMESTAMP_MICROS | ||
| ): Schema = { |
There was a problem hiding this comment.
Not sure if the indent here is correct.
There was a problem hiding this comment.
I believe
outputTimestampType: AvroOutputTimestampType.Value = AvroOutputTimestampType.TIMESTAMP_MICROS)
: Schema = {is more correct per https://github.com/databricks/scala-style-guide#spacing-and-indentation
|
Test build #93899 has finished for PR 21935 at commit
|
|
retest this please |
| * from the Unix epoch. TIMESTAMP_MILLIS is also logical, but with millisecond precision, | ||
| * which means Spark has to truncate the microsecond portion of its timestamp value. | ||
| */ | ||
| val outputTimestampType: AvroOutputTimestampType.Value = { |
There was a problem hiding this comment.
Hm, I wouldn't expose this as an option for now - that at least matches to Parquet's.
There was a problem hiding this comment.
I'm ok with it, I think parquet should also follow this.
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.sql.types.{StructType, _} |
| // For backward compatibility, if the Avro type is Long and it is not logical type, | ||
| // the value is processed as timestamp type with millisecond precision. | ||
| updater.setLong(ordinal, value.asInstanceOf[Long] * 1000) | ||
| } |
There was a problem hiding this comment.
we should add a default case and throw IncompatibleSchemaException, in case avro add more logical types for long type in the future.
| (getter, ordinal) => getter.getInt(ordinal) * DateTimeUtils.MILLIS_PER_DAY | ||
| case TimestampType => | ||
| (getter, ordinal) => getter.getLong(ordinal) / 1000 | ||
| (getter, ordinal) => avroType.getLogicalType match { |
There was a problem hiding this comment.
do not do pattern match per record, we should
avroType.getLogicalType match {
case _: TimestampMillis => (getter, ordinal) => ...
| case _: TimestampMicros => getter.getLong(ordinal) | ||
| // For backward compatibility, if the Avro type is Long and it is not logical type, | ||
| // output the timestamp value as with millisecond precision. | ||
| case null => getter.getLong(ordinal) / 1000 |
There was a problem hiding this comment.
ditto, add a default case.
| recordName: String = "topLevelRecord", | ||
| prevNameSpace: String = ""): Schema = { | ||
| prevNameSpace: String = "", | ||
| outputTimestampType: AvroOutputTimestampType.Value = AvroOutputTimestampType.TIMESTAMP_MICROS |
There was a problem hiding this comment.
do we really need the default value? Seems only one call site excluding the recursive ones.
There was a problem hiding this comment.
It is also used in CatalystDataToAvro
| updater.setLong(ordinal, value.asInstanceOf[Long] * 1000) | ||
| case _: TimestampMicros => (updater, ordinal, value) => | ||
| updater.setLong(ordinal, value.asInstanceOf[Long]) | ||
| case null => (updater, ordinal, value) => |
|
Test build #93920 has finished for PR 21935 at commit
|
| class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { | ||
| val episodesAvro = testFile("episodes.avro") | ||
| val testAvro = testFile("test.avro") | ||
| val timestampAvro = testFile("timestamp.avro") |
There was a problem hiding this comment.
at least we should provide how the binary file is generated, or just do roundtrip test: Spark write avro files and then read it.
There was a problem hiding this comment.
The schema and data is stated in https://github.com/apache/spark/pull/21935/files#diff-9364b0610f92b3cc35a4bc43a80751bfR397
It should be easy to get from test cases.
The other test file episodesAvro also doesn't provide how it is generated.
|
Test build #93947 has finished for PR 21935 at commit
|
|
Test build #93959 has finished for PR 21935 at commit
|
|
retest this please. |
|
LGTM |
|
Test build #93985 has finished for PR 21935 at commit
|
|
Test build #93998 has finished for PR 21935 at commit
|
|
Test build #94004 has finished for PR 21935 at commit
|
|
retest this please |
|
Test build #94020 has finished for PR 21935 at commit
|
|
retest this please |
|
Test build #94052 has finished for PR 21935 at commit
|
|
Merged to master. |
In PR apache#21984 and apache#21935 , the related test cases are using binary files created by Python scripts. Generate the binary files in test suite to make it more transparent. Also we can Also move the related test cases to a new file `AvroLogicalTypeSuite.scala`. Unit test. Closes apache#22091 from gengliangwang/logicalType_suite. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> RB=2651977 BUG=LIHADOOP-59243 G=spark-reviewers R=ekrogen A=ekrogen
What changes were proposed in this pull request?
Support reading/writing Avro logical timestamp type with different precisions
https://avro.apache.org/docs/1.8.2/spec.html#Timestamp+%28millisecond+precision%29
To specify the output timestamp type, use Dataframe option
outputTimestampTypeor SQL configspark.sql.avro.outputTimestampType. The supported values areTIMESTAMP_MICROSTIMESTAMP_MILLISThe default output type is
TIMESTAMP_MICROSHow was this patch tested?
Unit test