Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ case class AnalysisContext(
nestedViewDepth: Int = 0,
maxNestedViewDepth: Int = -1,
relationCache: mutable.Map[Seq[String], LogicalPlan] = mutable.Map.empty,
referredTempViewNames: Seq[Seq[String]] = Seq.empty)
referredTempViewNames: Seq[Seq[String]] = Seq.empty,
referredTempFunctionNames: Seq[String] = Seq.empty)

object AnalysisContext {
private val value = new ThreadLocal[AnalysisContext]() {
Expand All @@ -139,7 +140,8 @@ object AnalysisContext {
originContext.nestedViewDepth + 1,
maxNestedViewDepth,
originContext.relationCache,
viewDesc.viewReferredTempViewNames)
viewDesc.viewReferredTempViewNames,
viewDesc.viewReferredTempFunctionNames)
set(context)
try f finally { set(originContext) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1484,16 +1484,36 @@ class SessionCatalog(
def lookupFunction(
name: FunctionIdentifier,
children: Seq[Expression]): Expression = synchronized {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
// Note: the implementation of this function is a little bit convoluted.
// We probably shouldn't use a single FunctionRegistry to register all three kinds of functions
// (built-in, temp, and external).
if (name.database.isEmpty && functionRegistry.functionExists(name)) {
// This function has been already loaded into the function registry.
return functionRegistry.lookupFunction(name, children)
val referredTempFunctionNames = AnalysisContext.get.referredTempFunctionNames
val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
// Lookup the function as a temporary or a built-in function (i.e. without database) and
// 1. if we are not resolving view, we don't care about the function type and just return it.
// 2. if we are resolving view, only return a temp function if it's referred by this view.
if (!isResolvingView ||
!isTemporaryFunction(name) ||
referredTempFunctionNames.contains(name.funcName)) {
// This function has been already loaded into the function registry.
return functionRegistry.lookupFunction(name, children)
}
}

// Get the database from AnalysisContext if it's defined, otherwise, use current database
val currentDatabase = AnalysisContext.get.catalogAndNamespace match {
case Seq() => getCurrentDatabase
case Seq(_, db) => db
case Seq(catalog, namespace @ _*) =>
throw new AnalysisException(
s"V2 catalog does not support functions yet. " +
s"catalog: ${catalog}, namespace: '${namespace.quoted}'")
}

// If the name itself is not qualified, add the current database to it.
val database = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val database = formatDatabaseName(name.database.getOrElse(currentDatabase))
val qualifiedName = name.copy(database = Some(database))

if (functionRegistry.functionExists(qualifiedName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,33 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils {
}
}
}

test("SPARK-33692: view should use captured catalog and namespace to lookup function") {
val avgFuncClass = "test.org.apache.spark.sql.MyDoubleAvg"
val sumFuncClass = "test.org.apache.spark.sql.MyDoubleSum"
val functionName = "test_udf"
withTempDatabase { dbName =>
withUserDefinedFunction(
s"default.$functionName" -> false,
s"$dbName.$functionName" -> false,
functionName -> true) {
// create a function in default database
sql("USE DEFAULT")
sql(s"CREATE FUNCTION $functionName AS '$avgFuncClass'")
// create a view using a function in 'default' database
val viewName = createView("v1", s"SELECT $functionName(col1) FROM VALUES (1), (2), (3)")
// create function in another database with the same function name
sql(s"USE $dbName")
sql(s"CREATE FUNCTION $functionName AS '$sumFuncClass'")
// create temporary function with the same function name
sql(s"CREATE TEMPORARY FUNCTION $functionName AS '$sumFuncClass'")
withView(viewName) {
// view v1 should still using function defined in `default` database
checkViewOutput(viewName, Seq(Row(102.0)))
}
}
}
}
}

class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
Expand Down