Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
(relation -> relation.output, parquetRelation, attributedRewrites)

// Write path
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
Expand All @@ -470,7 +470,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
(relation -> relation.output, parquetRelation, attributedRewrites)

// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
Expand All @@ -479,33 +479,35 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
(relation -> relation.output, parquetRelation, attributedRewrites)
}

// Quick fix for SPARK-6450: Notice that we're using both the MetastoreRelation instances and
// their output attributes as the key of the map. This is because MetastoreRelation.equals
// doesn't take output attributes into account, thus multiple MetastoreRelation instances
// pointing to the same table get collapsed into a single entry in the map. A proper fix for
// this should be overriding equals & hashCode in MetastoreRelation.
val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))

// Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
// attribute IDs referenced in other nodes.
plan.transformUp {
case r: MetastoreRelation if relationMap.contains(r) => {
val parquetRelation = relationMap(r)
val withAlias =
r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
Subquery(r.tableName, parquetRelation))
case r: MetastoreRelation if relationMap.contains(r -> r.output) =>
val parquetRelation = relationMap(r -> r.output)
val alias = r.alias.getOrElse(r.tableName)
Subquery(alias, parquetRelation)

withAlias
}
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
if relationMap.contains(r) => {
val parquetRelation = relationMap(r)
if relationMap.contains(r -> r.output) =>
val parquetRelation = relationMap(r -> r.output)
InsertIntoTable(parquetRelation, partition, child, overwrite)
}

case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
if relationMap.contains(r) => {
val parquetRelation = relationMap(r)
if relationMap.contains(r -> r.output) =>
val parquetRelation = relationMap(r -> r.output)
InsertIntoTable(parquetRelation, partition, child, overwrite)
}

case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,31 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {

sql("DROP TABLE IF EXISTS test_insert_parquet")
}

test("SPARK-6450 regression test") {
sql(
"""CREATE TABLE IF NOT EXISTS ms_convert (key INT)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

// This shouldn't throw AnalysisException
val analyzed = sql(
"""SELECT key FROM ms_convert
|UNION ALL
|SELECT key FROM ms_convert
""".stripMargin).queryExecution.analyzed

assertResult(2) {
analyzed.collect {
case r @ LogicalRelation(_: ParquetRelation2) => r
}.size
}

sql("DROP TABLE ms_convert")
}
}

class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
Expand Down