Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
import org.apache.avro.Schema.Type._
import org.apache.avro.generic._
import org.apache.avro.util.Utf8
Expand Down Expand Up @@ -71,7 +72,15 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
private def newWriter(
avroType: Schema,
catalystType: DataType,
path: List[String]): (CatalystDataUpdater, Int, Any) => Unit =
path: List[String]): (CatalystDataUpdater, Int, Any) => Unit = {
(avroType.getLogicalType, catalystType) match {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this like:

      case (LONG, TimestampType) => avroType.getLogicalType match {
        case _: TimestampMillis => (updater, ordinal, value) =>
          updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
        case _: TimestampMicros => (updater, ordinal, value) =>
          updater.setLong(ordinal, value.asInstanceOf[Long])
        case _ => (updater, ordinal, value) =>
          updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
      }

? Looks they have Avro long type anyway. Thought it's better to read and actually safer and correct.

case (_: TimestampMillis, TimestampType) => return (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
case (_: TimestampMicros, TimestampType) => return (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])
case _ =>
}

(avroType.getType, catalystType) match {
case (NULL, NullType) => (updater, ordinal, _) =>
updater.setNullAt(ordinal)
Expand Down Expand Up @@ -246,6 +255,7 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
s"Source Avro schema: $rootAvroType.\n" +
s"Target Catalyst type: $rootCatalystType")
}
}

private def getRecordWriter(
avroType: Schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql.avro

import scala.collection.JavaConverters._

import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.{LogicalType, Schema, SchemaBuilder}
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
import org.apache.avro.Schema.Type._

import org.apache.spark.sql.types._
Expand All @@ -35,6 +36,12 @@ object SchemaConverters {
* This function takes an avro schema and returns a sql schema.
*/
def toSqlType(avroSchema: Schema): SchemaType = {
avroSchema.getLogicalType match {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

case _: TimestampMillis | _: TimestampMicros =>
return SchemaType(TimestampType, nullable = false)
case _ =>
}

avroSchema.getType match {
case INT => SchemaType(IntegerType, nullable = false)
case STRING => SchemaType(StringType, nullable = false)
Expand Down Expand Up @@ -114,7 +121,10 @@ object SchemaConverters {
case ByteType | ShortType | IntegerType => builder.intType()
case LongType => builder.longType()
case DateType => builder.longType()
case TimestampType => builder.longType()
case TimestampType =>
// To be consistent with the previous behavior of writing Timestamp type with Avro 1.7,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the previous behavior is: we can't write out timestamp data, isn't it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also we should follow parquet and have a config spark.sql.avro.outputTimestampType to control it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we write timestamp as Long and divide the value by 1000(millisecond precision).
Maybe I need to revise the comment.
+1 on the new config.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I think writing out timestamp micros should be good

// the default output Avro Timestamp type is with millisecond precision.
builder.longBuilder().prop(LogicalType.LOGICAL_TYPE_PROP, "timestamp-millis").endLong()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a better API for it? hardcoding a string is hacky.

case FloatType => builder.floatType()
case DoubleType => builder.doubleType()
case _: DecimalType | StringType => builder.stringType()
Expand Down
Binary file added external/avro/src/test/resources/timestamp.avro
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.types._
class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val episodesAvro = testFile("episodes.avro")
val testAvro = testFile("test.avro")
val timestampAvro = testFile("timestamp.avro")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least we should provide how the binary file is generated, or just do roundtrip test: Spark write avro files and then read it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema and data is stated in https://github.com/apache/spark/pull/21935/files#diff-9364b0610f92b3cc35a4bc43a80751bfR397
It should be easy to get from test cases.
The other test file episodesAvro also doesn't provide how it is generated.


override protected def beforeAll(): Unit = {
super.beforeAll()
Expand Down Expand Up @@ -331,6 +332,63 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}

test("Logical type: timestamp_millis") {
val sparkSession = spark
import sparkSession.implicits._

val expected =
Seq(1L, 666L).toDF("timestamp_millis").select('timestamp_millis.cast(TimestampType)).collect()
val df = spark.read.format("avro").load(timestampAvro).select('timestamp_millis)

checkAnswer(df, expected)

withTempPath { dir =>
df.write.format("avro").save(dir.toString)
checkAnswer(spark.read.format("avro").load(dir.toString), expected)
}
}

test("Logical type: timestamp_micros") {
val sparkSession = spark
import sparkSession.implicits._

val expected =
Seq(2L, 999L).toDF("timestamp_micros").select('timestamp_micros.cast(TimestampType)).collect()
val df = spark.read.format("avro").load(timestampAvro).select('timestamp_micros)

checkAnswer(df, expected)

withTempPath { dir =>
df.write.format("avro").save(dir.toString)
checkAnswer(spark.read.format("avro").load(dir.toString), expected)
}
}

test("Logical type: user specified schema") {
val sparkSession = spark
import sparkSession.implicits._

val expected = Seq((1L, 2L), (666L, 999L))
.toDF("timestamp_millis", "timestamp_micros")
.select('timestamp_millis.cast(TimestampType), 'timestamp_micros.cast(TimestampType))
.collect()

val avroSchema = s"""
{
"namespace": "logical",
"type": "record",
"name": "test",
"fields": [
{"name": "timestamp_millis", "type": {"type": "long","logicalType": "timestamp-millis"}},
{"name": "timestamp_micros", "type": {"type": "long","logicalType": "timestamp-micros"}}
]
}
"""
val df = spark.read.format("avro").option("avroSchema", avroSchema).load(timestampAvro)

checkAnswer(df, expected)
}

test("Array data types") {
withTempPath { dir =>
val testSchema = StructType(Seq(
Expand Down Expand Up @@ -511,7 +569,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {

// TimesStamps are converted to longs
val times = spark.read.format("avro").load(avroDir).select("Time").collect()
assert(times.map(_(0)).toSet == Set(666, 777, 42))
assert(times.map(_(0)).toSet ==
Set(new Timestamp(666), new Timestamp(777), new Timestamp(42)))

// DecimalType should be converted to string
val decimals = spark.read.format("avro").load(avroDir).select("Decimal").collect()
Expand Down