Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}

import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.execution.command.{DescribeCommandBase, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
Expand All @@ -37,43 +36,27 @@ object HiveResult {
* Returns the result as a hive compatible sequence of strings. This is used in tests and
* `SparkSQLDriver` for CLI applications.
*/
def hiveResultString(ds: Dataset[_]): Seq[String] = {
val executedPlan = ds.queryExecution.executedPlan
executedPlan match {
case ExecutedCommandExec(_: DescribeCommandBase) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
executedPlan.executeCollectPublic().map {
case Row(name: String, dataType: String, comment) =>
Seq(name, dataType,
Option(comment.asInstanceOf[String]).getOrElse(""))
.map(s => String.format(s"%-20s", s))
.mkString("\t")
}
// SHOW TABLES in Hive only output table names,
// while ours output database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
case _ =>
val sessionWithJava8DatetimeEnabled = {
val cloned = ds.sparkSession.cloneSession()
cloned.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true)
cloned
}
sessionWithJava8DatetimeEnabled.withActive {
// We cannot collect the original dataset because its encoders could be created
// with disabled Java 8 date-time API.
val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.logicalPlan)
.queryExecution
.executedPlan
.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = executedPlan.output.map(_.dataType)
// Reformat to match hive tab delimited output.
result.map(_.zip(types).map(e => toHiveString(e)))
.map(_.mkString("\t"))
}
}
def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan match {
case ExecutedCommandExec(_: DescribeCommandBase) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
executedPlan.executeCollectPublic().map {
case Row(name: String, dataType: String, comment) =>
Seq(name, dataType,
Option(comment.asInstanceOf[String]).getOrElse(""))
.map(s => String.format(s"%-20s", s))
.mkString("\t")
}
// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = executedPlan.output.map(_.dataType)
// Reformat to match hive tab delimited output.
result.map(_.zip(types).map(e => toHiveString(e)))
.map(_.mkString("\t"))
}

private lazy val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
example.split(" > ").toList.foreach(_ match {
case exampleRe(sql, output) =>
val df = clonedSpark.sql(sql)
val actual = unindentAndTrim(hiveResultString(df).mkString("\n"))
val actual = unindentAndTrim(
hiveResultString(df.queryExecution.executedPlan).mkString("\n"))
val expected = unindentAndTrim(output)
assert(actual === expected)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession {
val schema = df.schema.catalogString
// Get answer, but also get rid of the #1234 expression ids that show up in explain plans
val answer = SQLExecution.withNewExecutionId(df.queryExecution, Some(sql)) {
hiveResultString(df).map(replaceNotIncludedMsg)
hiveResultString(df.queryExecution.executedPlan).map(replaceNotIncludedMsg)
}

// If the output is not pre-sorted, sort it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ class HiveResultSuite extends SharedSparkSession {
test("date formatting in hive result") {
val dates = Seq("2018-12-28", "1582-10-13", "1582-10-14", "1582-10-15")
val df = dates.toDF("a").selectExpr("cast(a as date) as b")
val result = HiveResult.hiveResultString(df)
val executedPlan1 = df.queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan1)
assert(result == dates)
val df2 = df.selectExpr("array(b)")
val result2 = HiveResult.hiveResultString(df2)
val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan
val result2 = HiveResult.hiveResultString(executedPlan2)
assert(result2 == dates.map(x => s"[$x]"))
}

Expand All @@ -39,10 +40,11 @@ class HiveResultSuite extends SharedSparkSession {
"1582-10-14 01:02:03",
"1582-10-15 01:02:03")
val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b")
val result = HiveResult.hiveResultString(df)
val executedPlan1 = df.queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan1)
assert(result == timestamps)
val df2 = df.selectExpr("array(b)")
val result2 = HiveResult.hiveResultString(df2)
val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan
val result2 = HiveResult.hiveResultString(executedPlan2)
assert(result2 == timestamps.map(x => s"[$x]"))
}

Expand All @@ -55,14 +57,15 @@ class HiveResultSuite extends SharedSparkSession {
test("decimal formatting in hive result") {
val df = Seq(new java.math.BigDecimal("1")).toDS()
Seq(2, 6, 18).foreach { scala =>
val decimalDf = df.selectExpr(s"CAST(value AS decimal(38, $scala))")
val result = HiveResult.hiveResultString(decimalDf)
val executedPlan =
df.selectExpr(s"CAST(value AS decimal(38, $scala))").queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan)
assert(result.head.split("\\.").last.length === scala)
}

val df2 = Seq(java.math.BigDecimal.ZERO).toDS()
.selectExpr(s"CAST(value AS decimal(38, 8))")
val result = HiveResult.hiveResultString(df2)
val executedPlan = Seq(java.math.BigDecimal.ZERO).toDS()
.selectExpr(s"CAST(value AS decimal(38, 8))").queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan)
assert(result.head === "0.00000000")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.thriftserver

import java.security.PrivilegedExceptionAction
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}
import java.util.{Arrays, Map => JMap, UUID}
import java.util.concurrent.RejectedExecutionException

Expand Down Expand Up @@ -179,14 +178,7 @@ private[hive] class SparkExecuteStatementOperation(
}
curCol += 1
}
// Convert date-time instances to types that are acceptable by Hive libs
// used in conversions to strings.
val resultRow = row.map {
case i: Instant => Timestamp.from(i)
case ld: LocalDate => Date.valueOf(ld)
case other => other
}.toArray.asInstanceOf[Array[Object]]
resultRowSet.addRow(resultRow)
resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
curRow += 1
resultOffset += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont
// TODO unify the error code
try {
context.sparkContext.setJobDescription(command)
val df = context.sql(command)
val execution = df.queryExecution
val execution = context.sessionState.executePlan(context.sql(command).logicalPlan)
hiveResponse = SQLExecution.withNewExecutionId(execution) {
hiveResultString(df)
hiveResultString(execution.executedPlan)
}
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,7 @@ abstract class HiveComparisonTest
val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) =>
val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath))
def getResult(): Seq[String] = {
SQLExecution.withNewExecutionId(query) {
hiveResultString(Dataset.ofRows(query.sparkSession, query.logical))
}
SQLExecution.withNewExecutionId(query)(hiveResultString(query.executedPlan))
}
try { (query, prepareAnswer(query, getResult())) } catch {
case e: Throwable =>
Expand Down