Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ private[sql] case class ParquetRelation(
self: Product =>

/** Schema derived from ParquetFile */
def parquetSchema: MessageType =
ParquetTypesConverter
.readMetaData(new Path(path), conf)
.getFileMetaData
.getSchema
def parquetSchema: MessageType = {
ParquetTypesConverter.readMetaData(new Path(path), conf)
.map(_.getFileMetaData.getSchema)
.getOrElse(
throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path"))
}

/** Attributes */
override val output =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[parquet] case class ParquetTypeInfo(
decimalMetadata: Option[DecimalMetadata] = None,
length: Option[Int] = None)

private[parquet] object ParquetTypesConverter extends Logging {
private[sql] object ParquetTypesConverter extends Logging {
def isPrimitiveType(ctype: DataType): Boolean =
classOf[PrimitiveType] isAssignableFrom ctype.getClass

Expand Down Expand Up @@ -425,7 +425,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
* @param configuration The Hadoop configuration to use.
* @return The `ParquetMetadata` containing among other things the schema.
*/
def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = {
def readMetaData(
origPath: Path,
configuration: Option[Configuration]): Option[ParquetMetadata] = {
if (origPath == null) {
throw new IllegalArgumentException("Unable to read Parquet metadata: path is null")
}
Expand All @@ -450,13 +452,11 @@ private[parquet] object ParquetTypesConverter extends Logging {
// all data in a single Parquet file have the same schema, which is normally true.
children
// Try any non-"_metadata" file first...
.find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE)
.find(file => file.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE && !file.isDir)
// ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is
// empty, thus normally the "_metadata" file is expected to be fairly small).
.orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE))
.map(ParquetFileReader.readFooter(conf, _))
.getOrElse(
throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path"))
}

/**
Expand All @@ -473,15 +473,19 @@ private[parquet] object ParquetTypesConverter extends Logging {
origPath: Path,
conf: Option[Configuration],
isBinaryAsString: Boolean): Seq[Attribute] = {
val metaData = readMetaData(origPath, conf)
if (metaData.isEmpty) {
return Seq.empty
}

val keyValueMetadata: java.util.Map[String, String] =
readMetaData(origPath, conf)
.getFileMetaData
.getKeyValueMetaData
metaData.map(_.getFileMetaData.getKeyValueMetaData).getOrElse(Map.empty[String, String])

if (keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) {
convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
} else {
val attributes = convertToAttributes(
readMetaData(origPath, conf).getFileMetaData.getSchema, isBinaryAsString)
readMetaData(origPath, conf).get.getFileMetaData.getSchema, isBinaryAsString)
log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
attributes
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA

// test default compression codec
rdd.saveAsParquetFile(path)
var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
var actualCodec = ParquetTypesConverter
.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)).get
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)

Expand All @@ -190,7 +191,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "UNCOMPRESSED")

rdd.saveAsParquetFile(path)
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
actualCodec = ParquetTypesConverter
.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)).get
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)

Expand All @@ -206,7 +208,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "none")

rdd.saveAsParquetFile(path)
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
actualCodec = ParquetTypesConverter
.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)).get
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
assert(actualCodec === "UNCOMPRESSED" :: Nil)

Expand All @@ -222,7 +225,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "gzip")

rdd.saveAsParquetFile(path)
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
actualCodec = ParquetTypesConverter
.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)).get
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)

Expand All @@ -238,7 +242,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "snappy")

rdd.saveAsParquetFile(path)
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
actualCodec = ParquetTypesConverter
.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)).get
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)

Expand Down Expand Up @@ -341,7 +346,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
path,
TestSQLContext.sparkContext.hadoopConfiguration)
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
val metaData = ParquetTypesConverter.readMetaData(path, Some(ContextUtil.getConfiguration(job)))
val metaData =
ParquetTypesConverter.readMetaData(path, Some(ContextUtil.getConfiguration(job))).get
assert(metaData != null)
ParquetTestData
.testData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.parquet.{ParquetTypesConverter, ParquetRelation}
import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -104,6 +104,14 @@ private[hive] trait HiveStrategies {
case a: AttributeReference => UnresolvedAttribute(a.name)
})

val path = relation.hiveQlTable.getPath
val conf = hiveContext.sparkContext.hadoopConfiguration
val meta = ParquetTypesConverter.readMetaData(path, Some(conf))

if (meta.isEmpty) {
ParquetRelation.createEmpty(path.toString, relation.attributes, true, conf, hiveContext)
}

if (relation.hiveQlTable.isPartitioned) {
val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
// Translate the predicate so that it automatically casts the input values to the correct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft
compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames)
}

test("SPARK-4552: query for empty parquet table get IllegalArgumentException") {
sql("CREATE TABLE parquet_test(key INT, value STRING)")
val result = sql("select count(*) from parquet_test limit 5").collect
assert(result.size == 1)
assert(result(0).getLong(0) == 0)
}

private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) {
var counter = 0
(rddOne, rddTwo).zipped.foreach {
Expand Down