Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@
package org.apache.spark.sql.hive.thriftserver

import java.io.File
import java.sql.{DriverManager, SQLException, Statement, Timestamp}
import java.util.Locale
import java.sql.{DriverManager, Statement, Timestamp}
import java.util.{Locale, MissingFormatArgumentException}

import scala.util.{Random, Try}
import scala.util.control.NonFatal

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.HiveSQLException
import org.scalatest.Ignore

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, SQLQueryTestSuite}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.util.fileToString
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand All @@ -43,12 +45,12 @@ import org.apache.spark.sql.types._
* 2. Support DESC command.
* 3. Support SHOW command.
*/
@Ignore
class ThriftServerQueryTestSuite extends SQLQueryTestSuite {

private var hiveServer2: HiveThriftServer2 = _

override def beforeEach(): Unit = {
override def beforeAll(): Unit = {
super.beforeAll()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an earlier PR I commented that the flakiness may be because of async issues.
I meant that calling startThriftServer actually starts some things asynchronously in the background before the server becomes ready.
Moving it to beforeAll instead of beforeEach should help the flakiness by having this race only at the start of the suite and not before every test, but I think adding a 3 or 5 s sleep would make sure it never happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to be hitting it in the first test of the suite quite often:

[info] ThriftServerQueryTestSuite:
[info] - query_regex_column.sql *** FAILED *** (184 milliseconds)
[info]   java.sql.SQLException: Could not open client transport with JDBC Uri: jdbc:hive2://localhost:15421: java.net.ConnectException: Connection refused (Connection refused)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pgSQL/text.sql should fail if spark.sql.hive.thriftServer.async is enabled: #25567 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, let's try to enable it: #26172

// Chooses a random port between 10000 and 19999
var listeningPort = 10000 + Random.nextInt(10000)

Expand All @@ -65,10 +67,19 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite {
logInfo("HiveThriftServer2 started successfully")
}

override def afterEach(): Unit = {
hiveServer2.stop()
override def afterAll(): Unit = {
try {
hiveServer2.stop()
} finally {
super.afterAll()
}
}

override def sparkConf: SparkConf = super.sparkConf
// Hive Thrift server should not executes SQL queries in an asynchronous way
// because we may set session configuration.
.set(HiveUtils.HIVE_THRIFT_SERVER_ASYNC, false)
Copy link
Member

@zsxwing zsxwing Sep 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean this is broken and the user should also turn if off? If so, should we change the default value? Otherwise, our tests are actually testing something that's rarely used.

Hive Thrift server should not executes SQL queries in an asynchronous way because we may set session configuration.

Could you clarify what's the exact issue? Is it because the background thread is missing some thread-local variables because threads are reused? Can we copy them from the parent thread here? https://github.com/apache/spark/blob/master/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala#L186

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, this suite is not aiming concurrency stress test. It's just targeting SQL execution one by one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it exposes some bugs in the default mode. Seems worth to fix the bug.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, that doesn't imply a bug in the default mode. That means we want to run one by one simply in this test suite.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, if there is a bug, definitely we should fix it. But, let's not enable that in this test suite. That is completely a separate issue, isn't it? With a separate UT and a patch, that should be handled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With HiveUtils.HIVE_THRIFT_SERVER_ASYNC enabled the Thriftserver will still execute queries one by one. The difference is that it will not block the request:

  • With HIVE_THRIFT_SERVER_ASYNC=false client sends a query in an TExecuteStatementReq. The query executes, and only after it finishes the server responds with a TExecuteStatementResp. Then the client calls TGetOperationStatusReq to see if the result was a success or failure, and then potentially continues fetching results...
  • With HIVE_THRIFT_SERVER_ASYNC=true client sends a query in an TExecuteStatementReq, and the server starts it in a background thread and immediately returns a handle in the response. Then the client periodically polls with TGetOperationStatusReq until the query is finished, an then potentailly continues fetching results...

In both cases, the Hive JDBC driver executes one query at once and there is no concurrency.

I think this setting does not need to be set.


override val isTestWithConfigSets = false

/** List of test cases to ignore, in lower cases. */
Expand All @@ -79,22 +90,17 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite {
"pgSQL/case.sql",
// SPARK-28624
"date.sql",
// SPARK-28619
"pgSQL/aggregates_part1.sql",
"group-by.sql",
// SPARK-28620
"pgSQL/float4.sql",
// SPARK-28636
"decimalArithmeticOperations.sql",
"literals.sql",
"subquery/scalar-subquery/scalar-subquery-predicate.sql",
"subquery/in-subquery/in-limit.sql",
"subquery/in-subquery/in-group-by.sql",
"subquery/in-subquery/simple-in.sql",
"subquery/in-subquery/in-order-by.sql",
"subquery/in-subquery/in-set-operations.sql",
// SPARK-28637
"cast.sql",
"ansi/interval.sql"
"subquery/in-subquery/in-set-operations.sql"
)

override def runQueries(
Expand Down Expand Up @@ -166,19 +172,42 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite {
|| d.sql.toUpperCase(Locale.ROOT).startsWith("DESC\n")
|| d.sql.toUpperCase(Locale.ROOT).startsWith("DESCRIBE ")
|| d.sql.toUpperCase(Locale.ROOT).startsWith("DESCRIBE\n") =>

// Skip show command, see HiveResult.hiveResultString
case s if s.sql.toUpperCase(Locale.ROOT).startsWith("SHOW ")
|| s.sql.toUpperCase(Locale.ROOT).startsWith("SHOW\n") =>
// AnalysisException should exactly match.
// SQLException should not exactly match. We only assert the result contains Exception.
case _ if output.output.startsWith(classOf[SQLException].getName) =>

case _ if output.output.startsWith(classOf[NoSuchTableException].getPackage.getName) =>
assert(expected.output.startsWith(classOf[NoSuchTableException].getPackage.getName),
s"Exception did not match for query #$i\n${expected.sql}, " +
s"expected: ${expected.output}, but got: ${output.output}")

case _ if output.output.startsWith(classOf[SparkException].getName) &&
output.output.contains("overflow") =>
assert(expected.output.contains(classOf[ArithmeticException].getName) &&
expected.output.contains("overflow"),
s"Exception did not match for query #$i\n${expected.sql}, " +
s"expected: ${expected.output}, but got: ${output.output}")

case _ if output.output.startsWith(classOf[RuntimeException].getName) =>
assert(expected.output.contains("Exception"),
s"Exception did not match for query #$i\n${expected.sql}, " +
s"expected: ${expected.output}, but got: ${output.output}")
// HiveSQLException is usually a feature that our ThriftServer cannot support.
// Please add SQL to blackList.
case _ if output.output.startsWith(classOf[HiveSQLException].getName) =>
assert(false, s"${output.output} for query #$i\n${expected.sql}")

case _ if output.output.startsWith(classOf[ArithmeticException].getName) &&
output.output.contains("causes overflow") =>
assert(expected.output.contains(classOf[ArithmeticException].getName) &&
expected.output.contains("causes overflow"),
s"Exception did not match for query #$i\n${expected.sql}, " +
s"expected: ${expected.output}, but got: ${output.output}")

case _ if output.output.startsWith(classOf[MissingFormatArgumentException].getName) &&
output.output.contains("Format specifier") =>
assert(expected.output.contains(classOf[MissingFormatArgumentException].getName) &&
expected.output.contains("Format specifier"),
s"Exception did not match for query #$i\n${expected.sql}, " +
s"expected: ${expected.output}, but got: ${output.output}")

case _ =>
assertResult(expected.output, s"Result did not match for query #$i\n${expected.sql}") {
output.output
Expand Down Expand Up @@ -248,8 +277,9 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite {
val msg = if (a.plan.nonEmpty) a.getSimpleMessage else a.getMessage
Seq(a.getClass.getName, msg.replaceAll("#\\d+", "#x")).sorted
case NonFatal(e) =>
val rootCause = ExceptionUtils.getRootCause(e)
// If there is an exception, put the exception class followed by the message.
Seq(e.getClass.getName, e.getMessage)
Seq(rootCause.getClass.getName, rootCause.getMessage)
}
}

Expand Down