-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-6555] [SQL] Overrides equals() and hashCode() for MetastoreRelation #5289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive | |
| import java.io.IOException | ||
| import java.util.{List => JList} | ||
|
|
||
| import com.google.common.base.Objects | ||
| import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} | ||
| import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => TPartition, Table => TTable} | ||
| import org.apache.hadoop.hive.metastore.{TableType, Warehouse} | ||
|
|
@@ -459,7 +460,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with | |
| relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => | ||
| val parquetRelation = convertToParquetRelation(relation) | ||
| val attributedRewrites = relation.output.zip(parquetRelation.output) | ||
| (relation -> relation.output, parquetRelation, attributedRewrites) | ||
| (relation, parquetRelation, attributedRewrites) | ||
|
|
||
| // Write path | ||
| case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _) | ||
|
|
@@ -470,7 +471,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with | |
| relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => | ||
| val parquetRelation = convertToParquetRelation(relation) | ||
| val attributedRewrites = relation.output.zip(parquetRelation.output) | ||
| (relation -> relation.output, parquetRelation, attributedRewrites) | ||
| (relation, parquetRelation, attributedRewrites) | ||
|
|
||
| // Read path | ||
| case p @ PhysicalOperation(_, _, relation: MetastoreRelation) | ||
|
|
@@ -479,33 +480,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with | |
| relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => | ||
| val parquetRelation = convertToParquetRelation(relation) | ||
| val attributedRewrites = relation.output.zip(parquetRelation.output) | ||
| (relation -> relation.output, parquetRelation, attributedRewrites) | ||
| (relation, parquetRelation, attributedRewrites) | ||
| } | ||
|
|
||
| // Quick fix for SPARK-6450: Notice that we're using both the MetastoreRelation instances and | ||
| // their output attributes as the key of the map. This is because MetastoreRelation.equals | ||
| // doesn't take output attributes into account, thus multiple MetastoreRelation instances | ||
| // pointing to the same table get collapsed into a single entry in the map. A proper fix for | ||
| // this should be overriding equals & hashCode in MetastoreRelation. | ||
| val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap | ||
| val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _)) | ||
|
|
||
| // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes | ||
| // attribute IDs referenced in other nodes. | ||
| plan.transformUp { | ||
| case r: MetastoreRelation if relationMap.contains(r -> r.output) => | ||
| val parquetRelation = relationMap(r -> r.output) | ||
| case r: MetastoreRelation if relationMap.contains(r) => | ||
| val parquetRelation = relationMap(r) | ||
| val alias = r.alias.getOrElse(r.tableName) | ||
| Subquery(alias, parquetRelation) | ||
|
|
||
| case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite) | ||
| if relationMap.contains(r -> r.output) => | ||
| val parquetRelation = relationMap(r -> r.output) | ||
| if relationMap.contains(r) => | ||
| val parquetRelation = relationMap(r) | ||
| InsertIntoTable(parquetRelation, partition, child, overwrite) | ||
|
|
||
| case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite) | ||
| if relationMap.contains(r -> r.output) => | ||
| val parquetRelation = relationMap(r -> r.output) | ||
| if relationMap.contains(r) => | ||
| val parquetRelation = relationMap(r) | ||
| InsertIntoTable(parquetRelation, partition, child, overwrite) | ||
|
|
||
| case other => other.transformExpressions { | ||
|
|
@@ -701,6 +697,19 @@ private[hive] case class MetastoreRelation | |
|
|
||
| self: Product => | ||
|
|
||
| override def equals(other: scala.Any): Boolean = other match { | ||
| case relation: MetastoreRelation => | ||
| databaseName == relation.databaseName && | ||
| tableName == relation.tableName && | ||
| alias == relation.alias && | ||
| output == relation.output | ||
| case _ => false | ||
| } | ||
|
|
||
| override def hashCode(): Int = { | ||
| Objects.hashCode(databaseName, tableName, alias, output) | ||
| } | ||
|
|
||
| // TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and | ||
| // use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions. | ||
| // Right now, using org.apache.hadoop.hive.ql.metadata.Table and | ||
|
|
@@ -780,10 +789,7 @@ private[hive] case class MetastoreRelation | |
| val columnOrdinals = AttributeMap(attributes.zipWithIndex) | ||
|
|
||
| override def newInstance() = { | ||
| val newCopy = MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext) | ||
| // The project here is an ugly hack to work around the fact that MetastoreRelation's | ||
| // equals method is broken. Please remove this when SPARK-6555 is fixed. | ||
| Project(newCopy.output, newCopy) | ||
| MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverts workaround introduced in #5251. |
||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,10 +22,12 @@ import org.apache.spark.sql.hive.test.TestHive | |
|
|
||
| class HivePlanTest extends QueryTest { | ||
| import TestHive._ | ||
| import TestHive.implicits._ | ||
|
|
||
| test("udf constant folding") { | ||
| val optimized = sql("SELECT cos(null) FROM src").queryExecution.optimizedPlan | ||
| val correctAnswer = sql("SELECT cast(null as double) FROM src").queryExecution.optimizedPlan | ||
| Seq.empty[Tuple1[Int]].toDF("a").registerTempTable("t") | ||
| val optimized = sql("SELECT cos(null) FROM t").queryExecution.optimizedPlan | ||
| val correctAnswer = sql("SELECT cast(null as double) FROM t").queryExecution.optimizedPlan | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually the trouble maker is
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think thats actually reasonable. Compare plans is mostly about checking plan structure.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd even be okay adding a special rule to compare plans that always replaces MetastoreRelations with LocalRelation |
||
|
|
||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All changes above this line are about reverting workaround introduced in #5183.