Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
case (INT, IntegerType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])

case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])

case (LONG, LongType) => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])

Expand All @@ -100,6 +103,8 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
}

// Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date.
// For backward compatibility, we still keep this conversion.
case (LONG, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, (value.asInstanceOf[Long] / DateTimeUtils.MILLIS_PER_DAY).toInt)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
case BinaryType =>
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
case DateType =>
(getter, ordinal) => getter.getInt(ordinal) * DateTimeUtils.MILLIS_PER_DAY
(getter, ordinal) => getter.getInt(ordinal)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the write path, let's drop the previous conversion to Long

Copy link
Copy Markdown
Member

@HyukjinKwon HyukjinKwon Aug 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this cause a behaviour change comparing to the third party one?

Copy link
Copy Markdown
Member Author

@gengliangwang gengliangwang Aug 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a real behavior change. The only concern is that the Avro file with date type column is written with this built-in package, and read by third party one with user specify schema. The case should be very trivial and we can ignore that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 kinds of compatibilities:

  1. the file written by old avro data source can be read by the new avro data source
  2. the file written by new avro data source can be read by the old avro data source

I think we should focus on 1) and ignore 2)

case TimestampType => avroType.getLogicalType match {
case _: TimestampMillis => (getter, ordinal) => getter.getLong(ordinal) / 1000
case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.avro
import scala.collection.JavaConverters._

import org.apache.avro.{LogicalType, LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
import org.apache.avro.LogicalTypes.{Date, TimestampMicros, TimestampMillis}
import org.apache.avro.Schema.Type._

import org.apache.spark.sql.internal.SQLConf.AvroOutputTimestampType
Expand All @@ -38,7 +38,10 @@ object SchemaConverters {
*/
def toSqlType(avroSchema: Schema): SchemaType = {
avroSchema.getType match {
case INT => SchemaType(IntegerType, nullable = false)
case INT => avroSchema.getLogicalType match {
case _: Date => SchemaType(DateType, nullable = false)
case _ => SchemaType(IntegerType, nullable = false)
}
case STRING => SchemaType(StringType, nullable = false)
case BOOLEAN => SchemaType(BooleanType, nullable = false)
case BYTES => SchemaType(BinaryType, nullable = false)
Expand Down Expand Up @@ -121,7 +124,10 @@ object SchemaConverters {
case BooleanType => builder.booleanType()
case ByteType | ShortType | IntegerType => builder.intType()
case LongType => builder.longType()
case DateType => builder.longType()
case DateType => builder
.intBuilder()
.prop(LogicalType.LOGICAL_TYPE_PROP, LogicalTypes.date().getName)
.endInt()
case TimestampType =>
val timestampType = outputTimestampType match {
case AvroOutputTimestampType.TIMESTAMP_MILLIS => LogicalTypes.timestampMillis()
Expand Down
Binary file added external/avro/src/test/resources/date.avro
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.commons.io.FileUtils

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
Expand Down Expand Up @@ -67,6 +68,27 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
// writer.close()
val timestampAvro = testFile("timestamp.avro")

// The test file date.avro is generated via following Python code:
// import json
// import avro.schema
// from avro.datafile import DataFileWriter
// from avro.io import DatumWriter
//
// write_schema = avro.schema.parse(json.dumps({
// "namespace": "logical",
// "type": "record",
// "name": "test",
// "fields": [
// {"name": "date", "type": {"type": "int", "logicalType": "date"}}
// ]
// }))
//
// writer = DataFileWriter(open("date.avro", "wb"), DatumWriter(), write_schema)
// writer.append({"date": 7})
// writer.append({"date": 365})
// writer.close()
val dateAvro = testFile("date.avro")

override protected def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set("spark.sql.files.maxPartitionBytes", 1024)
Expand Down Expand Up @@ -350,9 +372,35 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val df = spark.createDataFrame(rdd, schema)
df.write.format("avro").save(dir.toString)
assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
assert(
spark.read.format("avro").load(dir.toString).select("date").collect().map(_(0)).toSet ==
Array(null, 1451865600000L, 1459987200000L).toSet)
checkAnswer(
spark.read.format("avro").load(dir.toString).select("date"),
Seq(Row(null), Row(new Date(1451865600000L)), Row(new Date(1459987200000L))))
}
}

test("Logical type: date") {
val expected = Seq(7, 365).map(t => Row(DateTimeUtils.toJavaDate(t)))
val df = spark.read.format("avro").load(dateAvro)

checkAnswer(df, expected)

val avroSchema = s"""
{
"namespace": "logical",
"type": "record",
"name": "test",
"fields": [
{"name": "date", "type": {"type": "int", "logicalType": "date"}}
]
}
"""

checkAnswer(spark.read.format("avro").option("avroSchema", avroSchema).load(dateAvro),
expected)

withTempPath { dir =>
df.write.format("avro").save(dir.toString)
checkAnswer(spark.read.format("avro").load(dir.toString), expected)
}
}

Expand Down