-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32057][SQL][test-hive1.2][test-hadoop2.7] ExecuteStatement: cancel and close should not transiently ERROR #28912
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
928cb20
be5e2fa
49f4335
03b6c93
d68e77b
db9aa3c
839f44c
4aaa34b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,10 +17,25 @@ | |
|
|
||
| package org.apache.spark.sql.hive.thriftserver | ||
|
|
||
| import java.util | ||
| import java.util.concurrent.Semaphore | ||
|
|
||
| import scala.concurrent.duration._ | ||
|
|
||
| import org.apache.hadoop.hive.conf.HiveConf | ||
| import org.apache.hive.service.cli.OperationState | ||
| import org.apache.hive.service.cli.session.{HiveSession, HiveSessionImpl} | ||
| import org.mockito.Mockito.{doReturn, mock, spy, when, RETURNS_DEEP_STUBS} | ||
| import org.mockito.invocation.InvocationOnMock | ||
|
|
||
| import org.apache.spark.SparkFunSuite | ||
| import org.apache.spark.sql.{DataFrame, SQLContext} | ||
| import org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2EventManager | ||
| import org.apache.spark.sql.test.SharedSparkSession | ||
| import org.apache.spark.sql.types.{IntegerType, NullType, StringType, StructField, StructType} | ||
|
|
||
| class SparkExecuteStatementOperationSuite extends SparkFunSuite { | ||
| class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSparkSession { | ||
|
|
||
| test("SPARK-17112 `select null` via JDBC triggers IllegalArgumentException in ThriftServer") { | ||
| val field1 = StructField("NULL", NullType) | ||
| val field2 = StructField("(IF(true, NULL, NULL))", NullType) | ||
|
|
@@ -42,4 +57,68 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite { | |
| assert(columns.get(1).getType().getName == "INT") | ||
| assert(columns.get(1).getComment() == "") | ||
| } | ||
|
|
||
| Seq( | ||
| (OperationState.CANCELED, (_: SparkExecuteStatementOperation).cancel()), | ||
| (OperationState.CLOSED, (_: SparkExecuteStatementOperation).close()) | ||
| ).foreach { case (finalState, transition) => | ||
| test("SPARK-32057 SparkExecuteStatementOperation should not transiently become ERROR " + | ||
| s"before being set to $finalState") { | ||
| val hiveSession = new HiveSessionImpl(ThriftserverShimUtils.testedProtocolVersions.head, | ||
| "username", "password", new HiveConf, "ip address") | ||
| hiveSession.open(new util.HashMap) | ||
|
|
||
| HiveThriftServer2.eventManager = mock(classOf[HiveThriftServer2EventManager]) | ||
|
|
||
| val spySqlContext = spy(sqlContext) | ||
|
|
||
| // When cancel() is called on the operation, cleanup causes an exception to be thrown inside | ||
| // of execute(). This should not cause the state to become ERROR. The exception here will be | ||
| // triggered in our custom cleanup(). | ||
| val signal = new Semaphore(0) | ||
| val dataFrame = mock(classOf[DataFrame], RETURNS_DEEP_STUBS) | ||
| when(dataFrame.collect()).thenAnswer((_: InvocationOnMock) => { | ||
| signal.acquire() | ||
| throw new RuntimeException("Operation was cancelled by test cleanup.") | ||
| }) | ||
| val statement = "stmt" | ||
| doReturn(dataFrame, Nil: _*).when(spySqlContext).sql(statement) | ||
|
|
||
| val executeStatementOperation = new MySparkExecuteStatementOperation(spySqlContext, | ||
| hiveSession, statement, signal, finalState) | ||
|
|
||
| val run = new Thread() { | ||
| override def run(): Unit = executeStatementOperation.runInternal() | ||
| } | ||
| assert(executeStatementOperation.getStatus.getState === OperationState.INITIALIZED) | ||
| run.start() | ||
| eventually(timeout(5.seconds)) { | ||
| assert(executeStatementOperation.getStatus.getState === OperationState.RUNNING) | ||
| } | ||
| transition(executeStatementOperation) | ||
| run.join() | ||
| assert(executeStatementOperation.getStatus.getState === finalState) | ||
| } | ||
| } | ||
|
|
||
| private class MySparkExecuteStatementOperation( | ||
| sqlContext: SQLContext, | ||
| hiveSession: HiveSession, | ||
| statement: String, | ||
| signal: Semaphore, | ||
| finalState: OperationState) | ||
| extends SparkExecuteStatementOperation(sqlContext, hiveSession, statement, | ||
| new util.HashMap, false) { | ||
|
|
||
| override def cleanup(): Unit = { | ||
| super.cleanup() | ||
| signal.release() | ||
| // At this point, operation should already be in finalState (set by either close() or | ||
| // cancel()). We want to check if it stays in finalState after the exception thrown by | ||
| // releasing the semaphore propagates. We hence need to sleep for a short while. | ||
| Thread.sleep(1000) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the case where the state is already because we have not given the exception time to propagate in the other thread. Eventually will execute the first iteration (whose assert is true) then exit immediately. By sleeping, we check if ERROR does not surface even after having become
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alismess-db I was wondering about hte same place before as well. |
||
| // State should not be ERROR | ||
| assert(getStatus.getState === finalState) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a single blank line.