Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class Analyzer(
GlobalAggregates ::
ResolveAggregateFunctions ::
TimeWindowing ::
ResolveInlineTables ::
ResolveInlineTables(conf) ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to fix this bug by re-order analyzer rules?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried before. But the resolution rules will add new timezone-aware expressions, so it still needs the rule to resolve timezone-aware expressions after resolution rules.

TypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package org.apache.spark.sql.catalyst.analysis

import scala.util.control.NonFatal

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Cast, TimeZoneAwareExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.{StructField, StructType}

/**
* An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]].
*/
object ResolveInlineTables extends Rule[LogicalPlan] {
case class ResolveInlineTables(conf: CatalystConf) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case table: UnresolvedInlineTable if table.expressionsResolved =>
validateInputDimension(table)
Expand Down Expand Up @@ -95,11 +95,15 @@ object ResolveInlineTables extends Rule[LogicalPlan] {
InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
val targetType = fields(ci).dataType
try {
if (e.dataType.sameType(targetType)) {
e.eval()
val castedExpr = if (e.dataType.sameType(targetType)) {
e
} else {
Cast(e, targetType).eval()
Cast(e, targetType)
}
castedExpr.transform {
case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
e.withTimeZone(conf.sessionLocalTimeZone)
}.eval()
} catch {
case NonFatal(ex) =>
table.failAnalysis(s"failed to evaluate expression ${e.sql}: ${ex.getMessage}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,82 +20,92 @@ package org.apache.spark.sql.catalyst.analysis
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Literal, Rand}
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal, Rand}
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.types.{LongType, NullType}
import org.apache.spark.sql.types.{LongType, NullType, TimestampType}

/**
* Unit tests for [[ResolveInlineTables]]. Note that there are also test cases defined in
* end-to-end tests (in sql/core module) for verifying the correct error messages are shown
* in negative cases.
*/
class ResolveInlineTablesSuite extends PlanTest with BeforeAndAfter {
class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter {

private def lit(v: Any): Literal = Literal(v)

test("validate inputs are foldable") {
ResolveInlineTables.validateInputEvaluable(
ResolveInlineTables(conf).validateInputEvaluable(
UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1)))))

// nondeterministic (rand) should not work
intercept[AnalysisException] {
ResolveInlineTables.validateInputEvaluable(
ResolveInlineTables(conf).validateInputEvaluable(
UnresolvedInlineTable(Seq("c1"), Seq(Seq(Rand(1)))))
}

// aggregate should not work
intercept[AnalysisException] {
ResolveInlineTables.validateInputEvaluable(
ResolveInlineTables(conf).validateInputEvaluable(
UnresolvedInlineTable(Seq("c1"), Seq(Seq(Count(lit(1))))))
}

// unresolved attribute should not work
intercept[AnalysisException] {
ResolveInlineTables.validateInputEvaluable(
ResolveInlineTables(conf).validateInputEvaluable(
UnresolvedInlineTable(Seq("c1"), Seq(Seq(UnresolvedAttribute("A")))))
}
}

test("validate input dimensions") {
ResolveInlineTables.validateInputDimension(
ResolveInlineTables(conf).validateInputDimension(
UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2)))))

// num alias != data dimension
intercept[AnalysisException] {
ResolveInlineTables.validateInputDimension(
ResolveInlineTables(conf).validateInputDimension(
UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1)), Seq(lit(2)))))
}

// num alias == data dimension, but data themselves are inconsistent
intercept[AnalysisException] {
ResolveInlineTables.validateInputDimension(
ResolveInlineTables(conf).validateInputDimension(
UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(21), lit(22)))))
}
}

test("do not fire the rule if not all expressions are resolved") {
val table = UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(UnresolvedAttribute("A"))))
assert(ResolveInlineTables(table) == table)
assert(ResolveInlineTables(conf)(table) == table)
}

test("convert") {
val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L))))
val converted = ResolveInlineTables.convert(table)
val converted = ResolveInlineTables(conf).convert(table)

assert(converted.output.map(_.dataType) == Seq(LongType))
assert(converted.data.size == 2)
assert(converted.data(0).getLong(0) == 1L)
assert(converted.data(1).getLong(0) == 2L)
}

test("convert TimeZoneAwareExpression") {
val table = UnresolvedInlineTable(Seq("c1"),
Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType))))
val converted = ResolveInlineTables(conf).convert(table)
val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType)
.withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long]
assert(converted.output.map(_.dataType) == Seq(TimestampType))
assert(converted.data.size == 1)
assert(converted.data(0).getLong(0) == correct)
}

test("nullability inference in convert") {
val table1 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L))))
val converted1 = ResolveInlineTables.convert(table1)
val converted1 = ResolveInlineTables(conf).convert(table1)
assert(!converted1.schema.fields(0).nullable)

val table2 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(Literal(null, NullType))))
val converted2 = ResolveInlineTables.convert(table2)
val converted2 = ResolveInlineTables(conf).convert(table2)
assert(converted2.schema.fields(0).nullable)
}
}
3 changes: 3 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@ select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b)

-- error reporting: aggregate expression
select * from values ("one", count(1)), ("two", 2) as data(a, b);

-- string to timestamp
select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) as data(a, b);
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 16
-- Number of queries: 17


-- !query 0
Expand Down Expand Up @@ -143,3 +143,11 @@ struct<>
-- !query 15 output
org.apache.spark.sql.AnalysisException
cannot evaluate expression count(1) in inline table definition; line 1 pos 29


-- !query 16
select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) as data(a, b)
-- !query 16 schema
struct<a:timestamp,b:array<timestamp>>
-- !query 16 output
1991-12-06 00:00:00 [1991-12-06 01:00:00.0,1991-12-06 12:00:00.0]