diff --git a/pom.xml b/pom.xml
index 445e65c0459b..f41109753f78 100644
--- a/pom.xml
+++ b/pom.xml
@@ -140,7 +140,7 @@
1.2.1
10.10.1.1
- 1.7.0
+ 1.8.1
1.6.0
1.2.4
8.1.14.v20131031
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index a958373eb769..430964c2f5b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -25,11 +25,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema._
import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
@@ -112,10 +112,30 @@ private[parquet] object CatalystReadSupport {
*/
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
- Types
- .buildMessage()
- .addFields(clippedParquetFields: _*)
- .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+
+ if (clippedParquetFields.isEmpty) {
+ // !! HACK ALERT !!
+ //
+ // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType,
+ // which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`.
+ // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT.
+ //
+ // To workaround this problem, here we first construct a `MessageType` with a single dummy
+ // field, and then remove the field to obtain an empty `MessageType`.
+ //
+ // TODO Reverts this change after upgrading parquet-mr to 1.8.2+
+ val messageType = Types
+ .buildMessage()
+ .addField(Types.required(INT32).named("dummy"))
+ .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+ messageType.getFields.clear()
+ messageType
+ } else {
+ Types
+ .buildMessage()
+ .addFields(clippedParquetFields: _*)
+ .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+ }
}
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 07714329370a..72c786d669d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -22,8 +22,6 @@ import java.io.Serializable
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.OriginalType
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
@@ -53,18 +51,15 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
- // Binary.fromString and Binary.fromByteArray don't accept null values
+ // Binary.fromString and Binary.fromConstantByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
- Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
+ Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
- Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- */
+ Option(v).map(b => Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]])).orNull)
}
private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -79,17 +74,15 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
+ // Binary.fromString and Binary.fromConstantByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
- Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
+ Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
- Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- */
+ Option(v).map(b => Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]])).orNull)
}
private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -101,17 +94,12 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
-
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
case StringType =>
(n: String, v: Any) =>
- FilterApi.lt(binaryColumn(n),
- Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
+ FilterApi.lt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
- FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
- */
+ FilterApi.lt(binaryColumn(n), Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]]))
}
private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -123,17 +111,12 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
-
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
case StringType =>
(n: String, v: Any) =>
- FilterApi.ltEq(binaryColumn(n),
- Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
+ FilterApi.ltEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
- FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
- */
+ FilterApi.ltEq(binaryColumn(n), Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]]))
}
private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -145,17 +128,12 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
-
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
case StringType =>
(n: String, v: Any) =>
- FilterApi.gt(binaryColumn(n),
- Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
+ FilterApi.gt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
- FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
- */
+ FilterApi.gt(binaryColumn(n), Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]]))
}
private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -167,17 +145,12 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
-
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
case StringType =>
(n: String, v: Any) =>
- FilterApi.gtEq(binaryColumn(n),
- Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
+ FilterApi.gtEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
- FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
- */
+ FilterApi.gtEq(binaryColumn(n), Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]]))
}
private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
@@ -193,18 +166,14 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
-
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
case StringType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
- SetInFilter(v.map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8")))))
+ SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
case BinaryType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
- SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
- */
+ SetInFilter(v.map(e => Binary.fromConstantByteArray(e.asInstanceOf[Array[Byte]]))))
}
/**
@@ -213,8 +182,6 @@ private[sql] object ParquetFilters {
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
- relaxParquetValidTypeMap
-
// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
@@ -271,35 +238,4 @@ private[sql] object ParquetFilters {
case _ => None
}
}
-
- // !! HACK ALERT !!
- //
- // This lazy val is a workaround for PARQUET-201, and should be removed once we upgrade to
- // parquet-mr 1.8.1 or higher versions.
- //
- // In Parquet, not all types of columns can be used for filter push-down optimization. The set
- // of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and
- // prior versions, the limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be
- // pushed down.
- //
- // This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps
- // to Parquet original type `ENUM` directly, and always converts `ENUM` to `StringType`. Thus,
- // a predicate involving a `ENUM` field can be pushed-down as a string column, which is perfectly
- // legal except that it fails the `ValidTypeMap` check.
- //
- // Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to workaround this issue.
- private lazy val relaxParquetValidTypeMap: Unit = {
- val constructor = Class
- .forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor")
- .getDeclaredConstructor(classOf[PrimitiveTypeName], classOf[OriginalType])
-
- constructor.setAccessible(true)
- val enumTypeDescriptor = constructor
- .newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM)
- .asInstanceOf[AnyRef]
-
- val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == "add").get
- addMethod.setAccessible(true)
- addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
- }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
index 36b929ee1f40..e90a56aa316e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet
-import java.io.File
import java.nio.ByteBuffer
import java.util.{List => JList, Map => JMap}
@@ -27,6 +26,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.parquet.test.avro._
@@ -35,14 +35,14 @@ import org.apache.spark.sql.test.SharedSQLContext
class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with SharedSQLContext {
private def withWriter[T <: IndexedRecord]
(path: String, schema: Schema)
- (f: AvroParquetWriter[T] => Unit): Unit = {
+ (f: ParquetWriter[T] => Unit): Unit = {
logInfo(
s"""Writing Avro records with the following Avro schema into Parquet file:
|
|${schema.toString(true)}
""".stripMargin)
- val writer = new AvroParquetWriter[T](new Path(path), schema)
+ val writer = AvroParquetWriter.builder[T](new Path(path)).withSchema(schema).build()
try f(writer) finally writer.close()
}
@@ -163,8 +163,56 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared
}
}
- ignore("nullable arrays (parquet-avro 1.7.0 does not properly support this)") {
- // TODO Complete this test case after upgrading to parquet-mr 1.8+
+ test("nullable arrays") {
+ withTempPath { dir =>
+ import ParquetCompatibilityTest._
+
+ // This Parquet schema is translated from the following Avro schema, with Hadoop configuration
+ // `parquet.avro.write-old-list-structure` set to `false`:
+ //
+ // record AvroArrayOfOptionalInts {
+ // array f;
+ // }
+ val schema =
+ """message AvroArrayOfOptionalInts {
+ | required group f (LIST) {
+ | repeated group list {
+ | optional int32 element;
+ | }
+ | }
+ |}
+ """.stripMargin
+
+ writeDirect(dir.getCanonicalPath, schema, { rc =>
+ rc.message {
+ rc.field("f", 0) {
+ rc.group {
+ rc.field("list", 0) {
+ rc.group {
+ rc.field("element", 0) {
+ rc.addInteger(0)
+ }
+ }
+
+ rc.group { /* null */ }
+
+ rc.group {
+ rc.field("element", 0) {
+ rc.addInteger(1)
+ }
+ }
+
+ rc.group { /* null */ }
+ }
+ }
+ }
+ }
+ })
+
+ checkAnswer(
+ sqlContext.read.parquet(dir.getCanonicalPath),
+ Row(Array(0: Integer, null, 1: Integer, null)))
+ }
}
test("SPARK-10136 array of primitive array") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 60fa81b1ab81..056b9e27ee1a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -1385,21 +1385,25 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin)
- testSchemaClipping(
- "empty requested schema",
-
- parquetSchema =
- """message root {
- | required group f0 {
- | required int32 f00;
- | required int64 f01;
- | }
- |}
- """.stripMargin,
-
- catalystSchema = new StructType(),
-
- expectedSchema = "message root {}")
+ // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType. Should
+ // re-enable this test case after upgrading to parquet-mr 1.8.2 or some later version.
+ ignore("empty requested schema") {
+ testSchemaClipping(
+ "empty requested schema",
+
+ parquetSchema =
+ """message root {
+ | required group f0 {
+ | required int32 f00;
+ | required int64 f01;
+ | }
+ |}
+ """.stripMargin,
+
+ catalystSchema = new StructType(),
+
+ expectedSchema = "message root {}")
+ }
testSchemaClipping(
"disjoint field sets",