From b02d7edbf39f9ad4c4a2fdae6eeec1104097057e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 16 Jul 2018 02:47:29 +0800 Subject: [PATCH 1/9] add from_avro and to_avro --- .../spark/sql/avro/AvroDataToCatalyst.scala | 58 +++++++++++++++++ .../spark/sql/avro/CatalystDataToAvro.scala | 62 +++++++++++++++++++ .../org/apache/spark/sql/avro/package.scala | 27 ++++++++ 3 files changed, 147 insertions(+) create mode 100644 external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala create mode 100644 external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala new file mode 100644 index 000000000000..44f3bb5c9340 --- /dev/null +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.avro.generic.GenericDatumReader +import org.apache.avro.io.{BinaryDecoder, DecoderFactory} + +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters, SerializableSchema} +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} + +case class AvroDataToCatalyst(child: Expression, avroType: SerializableSchema) + extends UnaryExpression with CodegenFallback with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) + + override lazy val dataType: DataType = + SchemaConverters.toSqlType(avroType.value).dataType + + override def nullable: Boolean = true + + @transient private lazy val reader = new GenericDatumReader[Any](avroType.value) + + @transient private lazy val deserializer = new AvroDeserializer(avroType.value, dataType) + + @transient private var decoder: BinaryDecoder = _ + + @transient private var result: Any = _ + + override def nullSafeEval(input: Any): Any = { + val binary = input.asInstanceOf[Array[Byte]] + decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) + result = reader.read(result, decoder) + deserializer.deserialize(result) + } + + override def simpleString: String = { + s"from_avro(${child.sql}, ${dataType.simpleString})" + } + + override def sql: String = simpleString +} diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala new file mode 100644 index 000000000000..0d4f63b4fb8c --- /dev/null +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.ByteArrayOutputStream + +import org.apache.avro.generic.GenericDatumWriter +import org.apache.avro.io.{BinaryEncoder, EncoderFactory} + +import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters} +import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.{BinaryType, DataType} + +case class CatalystDataToAvro(child: Expression) extends UnaryExpression with CodegenFallback { + + override lazy val dataType: DataType = BinaryType + + @transient private lazy val avroType = + SchemaConverters.toAvroType(child.dataType, child.nullable) + + @transient private lazy val serializer = + new AvroSerializer(child.dataType, avroType, child.nullable) + + @transient private lazy val writer = + new GenericDatumWriter[Any](avroType) + + @transient private var encoder: BinaryEncoder = _ + + @transient private lazy val out = new ByteArrayOutputStream + + override def nullSafeEval(input: Any): Any = { + out.reset() + encoder = EncoderFactory.get().directBinaryEncoder(out, encoder) + val avroData = serializer.serialize(input) + writer.write(avroData, encoder) + encoder.flush() + out.toByteArray + } + + + override def simpleString: String = { + s"to_avro(${child.sql}, ${child.dataType.simpleString})" + } + + override def sql: String = simpleString +} diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala index b3c8a669cf82..3942f6500cb3 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql +import org.apache.avro.Schema + +import org.apache.spark.annotation.Experimental + package object avro { /** * Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using @@ -36,4 +40,27 @@ package object avro { @scala.annotation.varargs def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*) } + + /** + * Converts a binary column of avro format into its corresponding catalyst value. The specified + * schema must match the read data, otherwise the behavior is undefined: it may fail or return + * arbitrary result. + * + * @param data the binary column. + * @param avroType the avro type. + */ + @Experimental + def from_avro(data: Column, avroType: Schema): Column = { + new Column(AvroDataToCatalyst(data.expr, new SerializableSchema(avroType))) + } + + /** + * Converts a column into binary of avro format. + * + * @param data the data column. + */ + @Experimental + def to_avro(data: Column): Column = { + new Column(CatalystDataToAvro(data.expr)) + } } From 8aa2b31fcfa7d3a36e47f5c0939c575971e95df3 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 17 Jul 2018 22:35:27 +0800 Subject: [PATCH 2/9] add test suite --- .../AvroCatalystDataConversionSuite.scala | 175 ++++++++++++++++++ .../expressions/ExpressionEvalHelper.scala | 6 + 2 files changed, 181 insertions(+) create mode 100644 external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala new file mode 100644 index 000000000000..cc1033fe33c6 --- /dev/null +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.avro.Schema + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{AvroDataToCatalyst, CatalystDataToAvro, RandomDataGenerator} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalHelper { + + private def roundTripTest(data: Literal): Unit = { + val avroType = SchemaConverters.toAvroType(data.dataType, data.nullable) + checkResult(data, avroType, data.eval()) + } + + private def checkResult(data: Literal, avroType: Schema, expected: Any): Unit = { + checkEvaluation( + AvroDataToCatalyst(CatalystDataToAvro(data), new SerializableSchema(avroType)), + prepareExpectedResult(expected)) + } + + private def assertFail(data: Literal, avroType: Schema): Unit = { + intercept[java.io.EOFException] { + AvroDataToCatalyst(CatalystDataToAvro(data), new SerializableSchema(avroType)).eval() + } + } + + private val testingTypes = Seq( + BooleanType, + ByteType, + ShortType, + IntegerType, + LongType, + FloatType, + DoubleType, + DecimalType(8, 0), // 32 bits decimal without fraction + DecimalType(8, 4), // 32 bits decimal + DecimalType(16, 0), // 64 bits decimal without fraction + DecimalType(16, 11), // 64 bits decimal + DecimalType(38, 0), + DecimalType(38, 38), + StringType, + BinaryType) + + protected def prepareExpectedResult(expected: Any): Any = expected match { + // Spark decimal is converted to avro string= + case d: Decimal => UTF8String.fromString(d.toString) + // Spark byte and short both map to avro int + case b: Byte => b.toInt + case s: Short => s.toInt + case row: GenericInternalRow => InternalRow.fromSeq(row.values.map(prepareExpectedResult)) + case array: GenericArrayData => new GenericArrayData(array.array.map(prepareExpectedResult)) + case map: MapData => + val keys = new GenericArrayData( + map.keyArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult)) + val values = new GenericArrayData( + map.valueArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult)) + new ArrayBasedMapData(keys, values) + case other => other + } + + testingTypes.foreach { dt => + val seed = scala.util.Random.nextLong() + test(s"single $dt with seed $seed") { + val rand = new scala.util.Random(seed) + val data = RandomDataGenerator.forType(dt, rand = rand).get.apply() + val converter = CatalystTypeConverters.createToCatalystConverter(dt) + val input = Literal.create(converter(data), dt) + roundTripTest(input) + } + } + + for (_ <- 1 to 5) { + val seed = scala.util.Random.nextLong() + val rand = new scala.util.Random(seed) + val schema = RandomDataGenerator.randomSchema(rand, 5, testingTypes) + test(s"flat schema ${schema.catalogString} with seed $seed") { + val data = RandomDataGenerator.randomRow(rand, schema) + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + val input = Literal.create(converter(data), schema) + roundTripTest(input) + } + } + + for (_ <- 1 to 5) { + val seed = scala.util.Random.nextLong() + val rand = new scala.util.Random(seed) + val schema = RandomDataGenerator.randomNestedSchema(rand, 10, testingTypes) + test(s"nested schema ${schema.catalogString} with seed $seed") { + val data = RandomDataGenerator.randomRow(rand, schema) + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + val input = Literal.create(converter(data), schema) + roundTripTest(input) + } + } + + test("read int as string") { + val data = Literal(1) + val avroTypeJson = + s""" + |{ + | "type": "string", + | "name": "my_string" + |} + """.stripMargin + + // When read int as string, avro reader is not able to parse the binary and fail. + assertFail(data, new Schema.Parser().parse(avroTypeJson)) + } + + test("read string as int") { + val data = Literal("abc") + val avroTypeJson = + s""" + |{ + | "type": "int", + | "name": "my_int" + |} + """.stripMargin + + // When read string data as int, avro reader is not able to find the type mismatch and read + // the string length as int value. + checkResult(data, new Schema.Parser().parse(avroTypeJson), 3) + } + + test("read float as double") { + val data = Literal(1.23f) + val avroTypeJson = + s""" + |{ + | "type": "double", + | "name": "my_double" + |} + """.stripMargin + + // When read float data as double, avro reader fails(trying to read 8 bytes while the data have + // only 4 bytes), `AvroDataToCatalyst` catches the exception and returns null. + assertFail(data, new Schema.Parser().parse(avroTypeJson)) + } + + test("read double as float") { + val data = Literal(1.23) + val avroTypeJson = + s""" + |{ + | "type": "float", + | "name": "my_float" + |} + """.stripMargin + + // avro reader reads the first 4 bytes of a double as a float, the result is totally undefined. + checkResult(data, new Schema.Parser().parse(avroTypeJson), 5.848603E35f) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 14bfa212b549..d045267ef5d9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -79,6 +79,12 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa java.util.Arrays.equals(result, expected) case (result: Double, expected: Spread[Double @unchecked]) => expected.asInstanceOf[Spread[Double]].isWithin(result) + case (result: InternalRow, expected: InternalRow) => + val st = dataType.asInstanceOf[StructType] + assert(result.numFields == st.length && expected.numFields == st.length) + st.zipWithIndex.forall { case (f, i) => + checkResult(result.get(i, f.dataType), expected.get(i, f.dataType), f.dataType) + } case (result: ArrayData, expected: ArrayData) => result.numElements == expected.numElements && { val et = dataType.asInstanceOf[ArrayType].elementType From 54861323bec556c66e94ca9919760ac9b3a3c11e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 18 Jul 2018 17:43:27 +0800 Subject: [PATCH 3/9] add AvroFunctionsSuite --- .../spark/sql/avro/AvroFunctionsSuite.scala | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala new file mode 100644 index 000000000000..e400802eab9f --- /dev/null +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.avro.Schema + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.functions.struct +import org.apache.spark.sql.test.SharedSQLContext + +class AvroFunctionsSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + test("roundtrip in to_avro and from_avro - int and string") { + val df = spark.range(10).select('id, 'id.cast("string").as("str")) + + val avroDF = df.select(to_avro('id).as("a"), to_avro('str).as("b")) + val avroTypeLong = new Schema.Parser().parse( + s""" + |{ + | "type": "long", + | "name": "id" + |} + """.stripMargin) + val avroTypeStr = new Schema.Parser().parse( + s""" + |{ + | "type": "string", + | "name": "str" + |} + """.stripMargin) + checkAnswer(avroDF.select(from_avro('a, avroTypeLong), from_avro('b, avroTypeStr)), df) + } + + test("roundtrip in to_avro and from_avro - struct") { + val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct")) + val avroStructDF = df.select(to_avro('struct).as("avro")) + val avroTypeStruct = new Schema.Parser().parse( + s""" + |{ + | "type": "record", + | "name": "struct", + | "fields": [ + | {"name": "col1", "type": "long"}, + | {"name": "col2", "type": "string"} + | ] + |} + """.stripMargin) + checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), df) + } + + test("roundtrip in to_avro and from_avro - array with null") { + val dfOne = Seq(Tuple1(Tuple1(1) :: Nil), Tuple1(null :: Nil)).toDF("array") + val avroTypeArrStruct = new Schema.Parser().parse( + s""" + |[ { + | "type" : "array", + | "items" : [ { + | "type" : "record", + | "name" : "x", + | "fields" : [ { + | "name" : "y", + | "type" : "int" + | } ] + | }, "null" ] + |}, "null" ] + """.stripMargin) + val readBackOne = dfOne.select(to_avro($"array").as("avro")) + .select(from_avro($"avro", avroTypeArrStruct).as("array")) + checkAnswer(dfOne, readBackOne) + } +} From 76763da105ae6b6d05802c7ce846981ee9b29d3b Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 19 Jul 2018 15:28:01 +0800 Subject: [PATCH 4/9] address some comments --- .../scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala | 2 +- .../apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala index 0d4f63b4fb8c..08bc7d45eaf3 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.types.{BinaryType, DataType} case class CatalystDataToAvro(child: Expression) extends UnaryExpression with CodegenFallback { - override lazy val dataType: DataType = BinaryType + override def dataType: DataType = BinaryType @transient private lazy val avroType = SchemaConverters.toAvroType(child.dataType, child.nullable) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index cc1033fe33c6..59122727d2d1 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -155,7 +155,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH """.stripMargin // When read float data as double, avro reader fails(trying to read 8 bytes while the data have - // only 4 bytes), `AvroDataToCatalyst` catches the exception and returns null. + // only 4 bytes). assertFail(data, new Schema.Parser().parse(avroTypeJson)) } From 81614c23de0b1b7717b5b61dc81ab15e51671f3e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 20 Jul 2018 00:37:44 +0800 Subject: [PATCH 5/9] codegen --- .../apache/spark/sql/avro/AvroDataToCatalyst.scala | 10 ++++++++-- .../apache/spark/sql/avro/CatalystDataToAvro.scala | 11 ++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index 44f3bb5c9340..dc092a0411f3 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -22,11 +22,11 @@ import org.apache.avro.io.{BinaryDecoder, DecoderFactory} import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters, SerializableSchema} import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} case class AvroDataToCatalyst(child: Expression, avroType: SerializableSchema) - extends UnaryExpression with CodegenFallback with ExpectsInputTypes { + extends UnaryExpression with ExpectsInputTypes { override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) @@ -55,4 +55,10 @@ case class AvroDataToCatalyst(child: Expression, avroType: SerializableSchema) } override def sql: String = simpleString + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val expr = ctx.addReferenceObj("this", this) + defineCodeGen(ctx, ev, input => + s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)") + } } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala index 08bc7d45eaf3..11d9a5e7ff6c 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala @@ -24,10 +24,10 @@ import org.apache.avro.io.{BinaryEncoder, EncoderFactory} import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters} import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.types.{BinaryType, DataType} -case class CatalystDataToAvro(child: Expression) extends UnaryExpression with CodegenFallback { +case class CatalystDataToAvro(child: Expression) extends UnaryExpression { override def dataType: DataType = BinaryType @@ -53,10 +53,15 @@ case class CatalystDataToAvro(child: Expression) extends UnaryExpression with Co out.toByteArray } - override def simpleString: String = { s"to_avro(${child.sql}, ${child.dataType.simpleString})" } override def sql: String = simpleString + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val expr = ctx.addReferenceObj("this", this) + defineCodeGen(ctx, ev, input => + s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)") + } } From e5c170281429e0a3b2981eef9c08a2a757c89d0d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 20 Jul 2018 03:01:54 +0800 Subject: [PATCH 6/9] address comments --- .../scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala | 2 +- .../scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala index 11d9a5e7ff6c..3c1843bdb59d 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala @@ -62,6 +62,6 @@ case class CatalystDataToAvro(child: Expression) extends UnaryExpression { override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val expr = ctx.addReferenceObj("this", this) defineCodeGen(ctx, ev, input => - s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)") + s"(byte[]) $expr.nullSafeEval($input)") } } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala index e400802eab9f..ebe318d48567 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -33,7 +33,7 @@ class AvroFunctionsSuite extends QueryTest with SharedSQLContext { val avroTypeLong = new Schema.Parser().parse( s""" |{ - | "type": "long", + | "type": "int", | "name": "id" |} """.stripMargin) From 8421f445160eac368a90f21ef091bdf1101eb390 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 20 Jul 2018 15:45:31 +0800 Subject: [PATCH 7/9] address comments --- .../spark/sql/avro/AvroDataToCatalyst.scala | 16 ++-- .../spark/sql/avro/CatalystDataToAvro.scala | 2 +- .../org/apache/spark/sql/avro/package.scala | 10 ++- .../AvroCatalystDataConversionSuite.scala | 18 ++--- .../spark/sql/avro/AvroFunctionsSuite.scala | 74 +++++++++---------- 5 files changed, 61 insertions(+), 59 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index dc092a0411f3..7e741625958f 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -17,27 +17,29 @@ package org.apache.spark.sql +import org.apache.avro.Schema import org.apache.avro.generic.GenericDatumReader import org.apache.avro.io.{BinaryDecoder, DecoderFactory} -import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters, SerializableSchema} +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters} import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} -case class AvroDataToCatalyst(child: Expression, avroType: SerializableSchema) +case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String) extends UnaryExpression with ExpectsInputTypes { override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) - override lazy val dataType: DataType = - SchemaConverters.toSqlType(avroType.value).dataType + override lazy val dataType: DataType = SchemaConverters.toSqlType(avroSchema).dataType override def nullable: Boolean = true - @transient private lazy val reader = new GenericDatumReader[Any](avroType.value) + @transient private lazy val avroSchema = new Schema.Parser().parse(jsonFormatSchema) - @transient private lazy val deserializer = new AvroDeserializer(avroType.value, dataType) + @transient private lazy val reader = new GenericDatumReader[Any](avroSchema) + + @transient private lazy val deserializer = new AvroDeserializer(avroSchema, dataType) @transient private var decoder: BinaryDecoder = _ @@ -51,7 +53,7 @@ case class AvroDataToCatalyst(child: Expression, avroType: SerializableSchema) } override def simpleString: String = { - s"from_avro(${child.sql}, ${dataType.simpleString})" + s"from_avro(${child.sql}, ${dataType.catalogString})" } override def sql: String = simpleString diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala index 3c1843bdb59d..10ed731eef39 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala @@ -54,7 +54,7 @@ case class CatalystDataToAvro(child: Expression) extends UnaryExpression { } override def simpleString: String = { - s"to_avro(${child.sql}, ${child.dataType.simpleString})" + s"to_avro(${child.sql}, ${child.dataType.catalogString})" } override def sql: String = simpleString diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala index 3942f6500cb3..e82651d96a03 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala @@ -47,17 +47,21 @@ package object avro { * arbitrary result. * * @param data the binary column. - * @param avroType the avro type. + * @param jsonFormatSchema the avro schema in JSON string format. + * + * @since 2.4.0 */ @Experimental - def from_avro(data: Column, avroType: Schema): Column = { - new Column(AvroDataToCatalyst(data.expr, new SerializableSchema(avroType))) + def from_avro(data: Column, jsonFormatSchema: String): Column = { + new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema)) } /** * Converts a column into binary of avro format. * * @param data the data column. + * + * @since 2.4.0 */ @Experimental def to_avro(data: Column): Column = { diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index 59122727d2d1..06d5477b2ea4 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -31,18 +31,18 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH private def roundTripTest(data: Literal): Unit = { val avroType = SchemaConverters.toAvroType(data.dataType, data.nullable) - checkResult(data, avroType, data.eval()) + checkResult(data, avroType.toString, data.eval()) } - private def checkResult(data: Literal, avroType: Schema, expected: Any): Unit = { + private def checkResult(data: Literal, schema: String, expected: Any): Unit = { checkEvaluation( - AvroDataToCatalyst(CatalystDataToAvro(data), new SerializableSchema(avroType)), + AvroDataToCatalyst(CatalystDataToAvro(data), schema), prepareExpectedResult(expected)) } - private def assertFail(data: Literal, avroType: Schema): Unit = { + private def assertFail(data: Literal, schema: String): Unit = { intercept[java.io.EOFException] { - AvroDataToCatalyst(CatalystDataToAvro(data), new SerializableSchema(avroType)).eval() + AvroDataToCatalyst(CatalystDataToAvro(data), schema).eval() } } @@ -126,7 +126,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH """.stripMargin // When read int as string, avro reader is not able to parse the binary and fail. - assertFail(data, new Schema.Parser().parse(avroTypeJson)) + assertFail(data, avroTypeJson) } test("read string as int") { @@ -141,7 +141,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH // When read string data as int, avro reader is not able to find the type mismatch and read // the string length as int value. - checkResult(data, new Schema.Parser().parse(avroTypeJson), 3) + checkResult(data, avroTypeJson, 3) } test("read float as double") { @@ -156,7 +156,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH // When read float data as double, avro reader fails(trying to read 8 bytes while the data have // only 4 bytes). - assertFail(data, new Schema.Parser().parse(avroTypeJson)) + assertFail(data, avroTypeJson) } test("read double as float") { @@ -170,6 +170,6 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH """.stripMargin // avro reader reads the first 4 bytes of a double as a float, the result is totally undefined. - checkResult(data, new Schema.Parser().parse(avroTypeJson), 5.848603E35f) + checkResult(data, avroTypeJson, 5.848603E35f) } } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala index ebe318d48567..90a4cd6ccf9d 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -30,56 +30,52 @@ class AvroFunctionsSuite extends QueryTest with SharedSQLContext { val df = spark.range(10).select('id, 'id.cast("string").as("str")) val avroDF = df.select(to_avro('id).as("a"), to_avro('str).as("b")) - val avroTypeLong = new Schema.Parser().parse( - s""" - |{ - | "type": "int", - | "name": "id" - |} - """.stripMargin) - val avroTypeStr = new Schema.Parser().parse( - s""" - |{ - | "type": "string", - | "name": "str" - |} - """.stripMargin) + val avroTypeLong = s""" + |{ + | "type": "int", + | "name": "id" + |} + """.stripMargin + val avroTypeStr = s""" + |{ + | "type": "string", + | "name": "str" + |} + """.stripMargin checkAnswer(avroDF.select(from_avro('a, avroTypeLong), from_avro('b, avroTypeStr)), df) } test("roundtrip in to_avro and from_avro - struct") { val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct")) val avroStructDF = df.select(to_avro('struct).as("avro")) - val avroTypeStruct = new Schema.Parser().parse( - s""" - |{ - | "type": "record", - | "name": "struct", - | "fields": [ - | {"name": "col1", "type": "long"}, - | {"name": "col2", "type": "string"} - | ] - |} - """.stripMargin) + val avroTypeStruct = s""" + |{ + | "type": "record", + | "name": "struct", + | "fields": [ + | {"name": "col1", "type": "long"}, + | {"name": "col2", "type": "string"} + | ] + |} + """.stripMargin checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), df) } test("roundtrip in to_avro and from_avro - array with null") { val dfOne = Seq(Tuple1(Tuple1(1) :: Nil), Tuple1(null :: Nil)).toDF("array") - val avroTypeArrStruct = new Schema.Parser().parse( - s""" - |[ { - | "type" : "array", - | "items" : [ { - | "type" : "record", - | "name" : "x", - | "fields" : [ { - | "name" : "y", - | "type" : "int" - | } ] - | }, "null" ] - |}, "null" ] - """.stripMargin) + val avroTypeArrStruct = s""" + |[ { + | "type" : "array", + | "items" : [ { + | "type" : "record", + | "name" : "x", + | "fields" : [ { + | "name" : "y", + | "type" : "int" + | } ] + | }, "null" ] + |}, "null" ] + """.stripMargin val readBackOne = dfOne.select(to_avro($"array").as("avro")) .select(from_avro($"avro", avroTypeArrStruct).as("array")) checkAnswer(dfOne, readBackOne) From 47caecee4efdfae4b74c05974f36254d2974c278 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 20 Jul 2018 16:24:45 +0800 Subject: [PATCH 8/9] override sql --- .../org/apache/spark/sql/avro/AvroDataToCatalyst.scala | 6 ++++-- .../org/apache/spark/sql/avro/CatalystDataToAvro.scala | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index 7e741625958f..6671b3fb8705 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -53,10 +53,12 @@ case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String) } override def simpleString: String = { - s"from_avro(${child.sql}, ${dataType.catalogString})" + s"from_avro(${child.sql}, ${dataType.simpleString})" } - override def sql: String = simpleString + override def sql: String = { + s"from_avro(${child.sql}, ${dataType.catalogString})" + } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val expr = ctx.addReferenceObj("this", this) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala index 10ed731eef39..a669388e8825 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala @@ -54,10 +54,12 @@ case class CatalystDataToAvro(child: Expression) extends UnaryExpression { } override def simpleString: String = { - s"to_avro(${child.sql}, ${child.dataType.catalogString})" + s"to_avro(${child.sql}, ${child.dataType.simpleString})" } - override def sql: String = simpleString + override def sql: String = { + s"to_avro(${child.sql}, ${child.dataType.catalogString})" + } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val expr = ctx.addReferenceObj("this", this) From f54adc9251936abecf1cd8403c7df6b0d9b5a71e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sun, 22 Jul 2018 20:31:13 +0800 Subject: [PATCH 9/9] add scalacheck and fix build failure --- external/avro/pom.xml | 5 +++ .../avro/src/test/resources/log4j.properties | 39 +++++-------------- 2 files changed, 14 insertions(+), 30 deletions(-) diff --git a/external/avro/pom.xml b/external/avro/pom.xml index 42e865bc3882..ad7df1f49ac4 100644 --- a/external/avro/pom.xml +++ b/external/avro/pom.xml @@ -61,6 +61,11 @@ test-jar test + + org.scalacheck + scalacheck_${scala.binary.version} + test + org.apache.spark spark-tags_${scala.binary.version} diff --git a/external/avro/src/test/resources/log4j.properties b/external/avro/src/test/resources/log4j.properties index f80a5291bc07..75e3b53a093f 100644 --- a/external/avro/src/test/resources/log4j.properties +++ b/external/avro/src/test/resources/log4j.properties @@ -15,35 +15,14 @@ # limitations under the License. # -# Set everything to be logged to the file core/target/unit-tests.log -log4j.rootLogger=DEBUG, CA, FA +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n -#Console Appender -log4j.appender.CA=org.apache.log4j.ConsoleAppender -log4j.appender.CA.layout=org.apache.log4j.PatternLayout -log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n -log4j.appender.CA.Threshold = WARN +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark-project.jetty=WARN - -#File Appender -log4j.appender.FA=org.apache.log4j.FileAppender -log4j.appender.FA.append=false -log4j.appender.FA.file=target/unit-tests.log -log4j.appender.FA.layout=org.apache.log4j.PatternLayout -log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Set the logger level of File Appender to WARN -log4j.appender.FA.Threshold = INFO - -# Some packages are noisy for no good reason. -log4j.additivity.parquet.hadoop.ParquetRecordReader=false -log4j.logger.parquet.hadoop.ParquetRecordReader=OFF - -log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false -log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF - -log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false -log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF - -log4j.additivity.hive.ql.metadata.Hive=false -log4j.logger.hive.ql.metadata.Hive=OFF