Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.AvroOutputTimestampType

/**
* Options for Avro Reader and Writer stored in case insensitive manner.
Expand Down Expand Up @@ -80,14 +79,4 @@ class AvroOptions(
val compression: String = {
parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec)
}

/**
* Avro timestamp type used when Spark writes data to Avro files.
* Currently supported types are `TIMESTAMP_MICROS` and `TIMESTAMP_MILLIS`.
* TIMESTAMP_MICROS is a logical timestamp type in Avro, which stores number of microseconds
* from the Unix epoch. TIMESTAMP_MILLIS is also logical, but with millisecond precision,
* which means Spark has to truncate the microsecond portion of its timestamp value.
* The related configuration is set via SQLConf, and it is not exposed as an option.
*/
val outputTimestampType: AvroOutputTimestampType.Value = SQLConf.get.avroOutputTimestampType
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,11 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:

private def newStructConverter(
catalystStruct: StructType, avroStruct: Schema): InternalRow => Record = {
if (avroStruct.getType != RECORD) {
if (avroStruct.getType != RECORD || avroStruct.getFields.size() != catalystStruct.length) {
throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " +
s"Avro type $avroStruct.")
}
val avroFields = avroStruct.getFields
assert(avroFields.size() == catalystStruct.length)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check redundant? Why remove it now?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he moved this condition above.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yea, strictly sounds unrelated tho.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I miss it. :)

val fieldConverters = catalystStruct.zip(avroFields.asScala).map {
val fieldConverters = catalystStruct.zip(avroStruct.getFields.asScala).map {
case (f1, f2) => newConverter(f1.dataType, resolveNullableType(f2.schema(), f1.nullable))
}
val numFields = catalystStruct.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ package org.apache.spark.sql.avro
import scala.collection.JavaConverters._
import scala.util.Random

import com.fasterxml.jackson.annotation.ObjectIdGenerators.UUIDGenerator
import org.apache.avro.{LogicalType, LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.LogicalTypes.{Date, Decimal, TimestampMicros, TimestampMillis}
import org.apache.avro.Schema.Type._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.util.RandomUUIDGenerator
import org.apache.spark.sql.internal.SQLConf.AvroOutputTimestampType
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.Decimal.{maxPrecisionForBytes, minBytesForPrecision}

Expand Down Expand Up @@ -126,8 +123,7 @@ object SchemaConverters {
catalystType: DataType,
nullable: Boolean = false,
recordName: String = "topLevelRecord",
prevNameSpace: String = "",
outputTimestampType: AvroOutputTimestampType.Value = AvroOutputTimestampType.TIMESTAMP_MICROS)
prevNameSpace: String = "")
: Schema = {
val builder = SchemaBuilder.builder()

Expand All @@ -138,13 +134,7 @@ object SchemaConverters {
case DateType =>
LogicalTypes.date().addToSchema(builder.intType())
case TimestampType =>
val timestampType = outputTimestampType match {
case AvroOutputTimestampType.TIMESTAMP_MILLIS => LogicalTypes.timestampMillis()
case AvroOutputTimestampType.TIMESTAMP_MICROS => LogicalTypes.timestampMicros()
case other =>
throw new IncompatibleSchemaException(s"Unexpected output timestamp type $other.")
}
timestampType.addToSchema(builder.longType())
LogicalTypes.timestampMicros().addToSchema(builder.longType())

case FloatType => builder.floatType()
case DoubleType => builder.doubleType()
Expand All @@ -162,10 +152,10 @@ object SchemaConverters {
case BinaryType => builder.bytesType()
case ArrayType(et, containsNull) =>
builder.array()
.items(toAvroType(et, containsNull, recordName, prevNameSpace, outputTimestampType))
.items(toAvroType(et, containsNull, recordName, prevNameSpace))
case MapType(StringType, vt, valueContainsNull) =>
builder.map()
.values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace, outputTimestampType))
.values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace))
case st: StructType =>
val nameSpace = prevNameSpace match {
case "" => recordName
Expand All @@ -175,7 +165,7 @@ object SchemaConverters {
val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields()
st.foreach { f =>
val fieldAvroType =
toAvroType(f.dataType, f.nullable, f.name, nameSpace, outputTimestampType)
toAvroType(f.dataType, f.nullable, f.name, nameSpace)
fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
}
fieldsAssembler.endRecord()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,34 @@ class AvroLogicalTypeSuite extends QueryTest with SharedSQLContext with SQLTestU
}
}

test("Logical type: specify different output timestamp types") {
test("Logical type: user specified output schema with different timestamp types") {
withTempDir { dir =>
val timestampAvro = timestampFile(dir.getAbsolutePath)
val df =
spark.read.format("avro").load(timestampAvro).select('timestamp_millis, 'timestamp_micros)

val expected = timestampInputData.map(t => Row(new Timestamp(t._1), new Timestamp(t._2)))

Seq("TIMESTAMP_MILLIS", "TIMESTAMP_MICROS").foreach { timestampType =>
withSQLConf(SQLConf.AVRO_OUTPUT_TIMESTAMP_TYPE.key -> timestampType) {
withTempPath { path =>
df.write.format("avro").save(path.toString)
checkAnswer(spark.read.format("avro").load(path.toString), expected)
}
}
val userSpecifiedTimestampSchema = s"""
{
"namespace": "logical",
"type": "record",
"name": "test",
"fields": [
{"name": "timestamp_millis",
"type": [{"type": "long","logicalType": "timestamp-micros"}, "null"]},
{"name": "timestamp_micros",
"type": [{"type": "long","logicalType": "timestamp-millis"}, "null"]}
]
}
"""

withTempPath { path =>
df.write
.format("avro")
.option("avroSchema", userSpecifiedTimestampSchema)
.save(path.toString)
checkAnswer(spark.read.format("avro").load(path.toString), expected)
}
}
}
Expand All @@ -179,7 +192,7 @@ class AvroLogicalTypeSuite extends QueryTest with SharedSQLContext with SQLTestU
}
}

test("Logical type: user specified schema") {
test("Logical type: user specified read schema") {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Logical type: user specified read schema with different timestamp types.

withTempDir { dir =>
val timestampAvro = timestampFile(dir.getAbsolutePath)
val expected = timestampInputData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1444,21 +1444,6 @@ object SQLConf {
.intConf
.createWithDefault(20)

object AvroOutputTimestampType extends Enumeration {
val TIMESTAMP_MICROS, TIMESTAMP_MILLIS = Value
}

val AVRO_OUTPUT_TIMESTAMP_TYPE = buildConf("spark.sql.avro.outputTimestampType")
.doc("Sets which Avro timestamp type to use when Spark writes data to Avro files. " +
"TIMESTAMP_MICROS is a logical timestamp type in Avro, which stores number of " +
"microseconds from the Unix epoch. TIMESTAMP_MILLIS is also logical, but with " +
"millisecond precision, which means Spark has to truncate the microsecond portion of its " +
"timestamp value.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(AvroOutputTimestampType.values.map(_.toString))
.createWithDefault(AvroOutputTimestampType.TIMESTAMP_MICROS.toString)

val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
.doc("Compression codec used in writing of AVRO files. Supported codecs: " +
"uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.")
Expand Down Expand Up @@ -1882,9 +1867,6 @@ class SQLConf extends Serializable with Logging {

def replEagerEvalTruncate: Int = getConf(SQLConf.REPL_EAGER_EVAL_TRUNCATE)

def avroOutputTimestampType: AvroOutputTimestampType.Value =
AvroOutputTimestampType.withName(getConf(SQLConf.AVRO_OUTPUT_TIMESTAMP_TYPE))

def avroCompressionCodec: String = getConf(SQLConf.AVRO_COMPRESSION_CODEC)

def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)
Expand Down