Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable

import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
Expand Down Expand Up @@ -117,6 +117,14 @@ class SessionCatalog(
if (conf.caseSensitiveAnalysis) name else name.toLowerCase
}

/**
* A cache of qualified table name to table relation plan.
*/
val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
// TODO: create a config instead of hardcode 1000 here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @cloud-fan .
Why not making this config in this PR? It seems to be easy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it's easy, but I wanna minimal the code changes so it's easier to review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Sure~

CacheBuilder.newBuilder().maximumSize(1000).build[QualifiedTableName, LogicalPlan]()
}

/**
* This method is used to make the given path qualified before we
* store this path in the underlying external catalog. So, when a path
Expand Down Expand Up @@ -573,7 +581,7 @@ class SessionCatalog(
val relationAlias = alias.getOrElse(table)
if (db == globalTempViewManager.database) {
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(relationAlias, viewDef, Some(name))
SubqueryAlias(relationAlias, viewDef, None)
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
Expand All @@ -586,12 +594,12 @@ class SessionCatalog(
desc = metadata,
output = metadata.schema.toAttributes,
child = parser.parsePlan(viewText))
SubqueryAlias(relationAlias, child, Option(name))
SubqueryAlias(relationAlias, child, Some(name.copy(table = table, database = Some(db))))
} else {
SubqueryAlias(relationAlias, SimpleCatalogRelation(metadata), None)
}
} else {
SubqueryAlias(relationAlias, tempTables(table), Option(name))
SubqueryAlias(relationAlias, tempTables(table), None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the existing way? This was introduced for the EXPLAIN command of view. See the PR: #14657

Copy link
Contributor Author

@cloud-fan cloud-fan Jan 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the existing way is to set None, see https://github.com/apache/spark/pull/16621/files#diff-ca4533edbf148c89cc0c564ab6b0aeaaL75

This shows the evil of duplicated code, we have inconsistent behaviors with and without hive support. I think we should only set table identifier for persisted view, @hvanhovell is that true?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @hvanhovell again. : )

Copy link
Contributor

@hvanhovell hvanhovell Jan 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I have been living under a rock for the past month or so.

This is not really needed anymore. Lets remove it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

}
}
}
Expand Down Expand Up @@ -651,14 +659,21 @@ class SessionCatalog(
* Refresh the cache entry for a metastore table, if any.
*/
def refreshTable(name: TableIdentifier): Unit = synchronized {
val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
val tableName = formatTableName(name.table)

// Go through temporary tables and invalidate them.
// If the database is defined, this is definitely not a temp table.
// If the database is defined, this may be a global temporary view.
// If the database is not defined, there is a good chance this is a temp table.
if (name.database.isEmpty) {
tempTables.get(formatTableName(name.table)).foreach(_.refresh())
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh())
tempTables.get(tableName).foreach(_.refresh())
} else if (dbName == globalTempViewManager.database) {
globalTempViewManager.get(tableName).foreach(_.refresh())
}

// Also invalidate the table relation cache.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about keep the previous comment here?

    // refreshTable does not eagerly reload the cache. It just invalidate the cache.
    // Next time when we use the table, it will be populated in the cache.
    // Since we also cache ParquetRelations converted from Hive Parquet tables and
    // adding converted ParquetRelations into the cache is not defined in the load function
    // of the cache (instead, we add the cache entry in convertToParquetRelation),
    // it is better at here to invalidate the cache to avoid confusing waring logs from the
    // cache loader (e.g. cannot find data source provider, which is only defined for
    // data source table.).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it still valid?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After an offline discussion, I am fine to remove it. Thanks!

val qualifiedTableName = QualifiedTableName(dbName, tableName)
tableRelationCache.invalidate(qualifiedTableName)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ case class TableIdentifier(table: String, database: Option[String])
override val identifier: String = table

def this(table: String) = this(table, None)

}

/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are having a case sensitivity issue here, right? Previously, we always make both database and table to lower cases. Database and table names are not case sensitive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users don't change the case sensitive config at runtime, it will be ok.


object TableIdentifier {
def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ class SessionCatalogSuite extends PlanTest {
== SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
// Otherwise, we'll first look up a temporary table with the same name
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1"))))
== SubqueryAlias("tbl1", tempTable1, None))
// Then, if that does not exist, look up the relation in the current database
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
Expand All @@ -462,7 +462,7 @@ class SessionCatalogSuite extends PlanTest {
val tmpView = Range(1, 10, 2, 10)
catalog.createTempView("vw1", tmpView, overrideIfExists = false)
val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range"))
assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1"))))
assert(plan == SubqueryAlias("range", tmpView, None))
}

test("look up view relation") {
Expand All @@ -479,7 +479,7 @@ class SessionCatalogSuite extends PlanTest {
// Look up a view using current database of the session catalog.
sessionCatalog.setCurrentDatabase("db3")
comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1")),
SubqueryAlias("view1", view, Some(TableIdentifier("view1"))))
SubqueryAlias("view1", view, Some(TableIdentifier("view1", Some("db3")))))
}

test("table exists") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
relation.catalogTable.identifier
}
EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {

val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is to avoid overriding lookupRelation in HiveMetastoreCatalog , right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, now lookupRelation will return SimpleCatalogRelation and other analyzer rules will convert it to LogicalRelation or MetastoreRelation

EliminateSubqueryAliases(tableRelation) match {
// check if the table is a data source table (the relation is a BaseRelation).
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ case class AnalyzeColumnCommand(
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
val relation =
EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)

// Compute total size
val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ case class AnalyzeTableCommand(
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
val relation =
EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)

relation match {
case relation: CatalogRelation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ case class DescribeTableCommand(
if (metadata.schema.isEmpty) {
// In older version(prior to 2.1) of Spark, the table schema can be empty and should be
// inferred at runtime. We should still support it.
describeSchema(catalog.lookupRelation(metadata.identifier).schema, result)
describeSchema(sparkSession.table(metadata.identifier).schema, result)
} else {
describeSchema(metadata.schema, result)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

package org.apache.spark.sql.execution.datasources

import scala.collection.mutable.ArrayBuffer
import java.util.concurrent.Callable

import org.apache.hadoop.fs.Path
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
Expand All @@ -37,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPa
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -215,37 +215,43 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {


/**
* Replaces [[SimpleCatalogRelation]] with data source table if its table property contains data
* source information.
* Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive.
*/
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private def readDataSourceTable(
sparkSession: SparkSession,
simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
val table = simpleCatalogRelation.catalogTable
val pathOption = table.storage.locationUri.map("path" -> _)
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = Some(table.schema),
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = table.provider.get,
options = table.storage.properties ++ pathOption)

LogicalRelation(
dataSource.resolveRelation(),
expectedOutputAttributes = Some(simpleCatalogRelation.output),
catalogTable = Some(table))
private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
val cache = sparkSession.sessionState.catalog.tableRelationCache
val withHiveSupport =
sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive"

cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
override def call(): LogicalPlan = {
val pathOption = table.storage.locationUri.map("path" -> _)
val dataSource =
DataSource(
sparkSession,
// In older version(prior to 2.1) of Spark, the table schema can be empty and should be
// inferred at runtime. We should still support it.
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = table.provider.get,
options = table.storage.properties ++ pathOption,
// TODO: improve `InMemoryCatalog` and remove this limitation.
catalogTable = if (withHiveSupport) Some(table) else None)

LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that, previously we will set expectedOutputAttributes here, which was added by #15182

However, this doesn't work when the table schema needs to be inferred at runtime, and it turns out that we don't need to do it at all. AnalyzeColumnCommand now gets attributes from the resolved table relation plan , so it's fine for rule FindDataSourceTable to change outputs during analysis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @wzhfy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, then we can revert it without changing the analyze part.

}
})
}

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
if DDLUtils.isDatasourceTable(s.metadata) =>
i.copy(table = readDataSourceTable(sparkSession, s))
i.copy(table = readDataSourceTable(s.metadata))

case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
readDataSourceTable(sparkSession, s)
readDataSourceTable(s.metadata)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {

// If this table is cached as an InMemoryRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
val logicalPlan = sparkSession.table(tableIdent).queryExecution.analyzed
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sparkSession.sharedState.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
Expand Down
11 changes: 0 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1626,17 +1626,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
assert(d.size == d.distinct.size)
}

test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") {
val tableName = "tbl"
withTable(tableName) {
spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName)
val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName))
val expr = relation.resolve("i")
val qe = spark.sessionState.executePlan(Project(Seq(expr), relation))
qe.assertAnalyzed()
}
}

private def verifyNullabilityInFilterExec(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df: DataFrame,
expr: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1790,7 +1790,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}

test("SET LOCATION for managed table") {
withTable("src") {
withTable("tbl") {
withTempDir { dir =>
sql("CREATE TABLE tbl(i INT) USING parquet")
sql("INSERT INTO tbl SELECT 1")
Expand All @@ -1799,6 +1799,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
.getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get

sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'")
spark.catalog.refreshTable("tbl")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 :)

// SET LOCATION won't move data from previous table path to new table path.
assert(spark.table("tbl").count() == 0)
// the previous table path should be still there.
Expand Down
Loading