Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.streaming

import java.io.{InterruptedIOException, IOException}
import java.util.UUID
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -37,6 +38,12 @@ import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}

/** States for [[StreamExecution]]'s lifecycle. */
trait State
case object INITIALIZING extends State
case object ACTIVE extends State
case object TERMINATED extends State

/**
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
* Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
Expand Down Expand Up @@ -298,7 +305,14 @@ class StreamExecution(
// `stop()` is already called. Let `finally` finish the cleanup.
}
} catch {
case _: InterruptedException if state.get == TERMINATED => // interrupted by stop()
case _: InterruptedException | _: InterruptedIOException if state.get == TERMINATED =>
// interrupted by stop()
updateStatusMessage("Stopped")
case e: IOException if e.getMessage != null
&& e.getMessage.startsWith(classOf[InterruptedException].getName)
&& state.get == TERMINATED =>
// This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException`
// to `new IOException(ie.toString())` before Hadoop 2.8.
updateStatusMessage("Stopped")
case e: Throwable =>
streamDeathCause = new StreamingQueryException(
Expand Down Expand Up @@ -709,10 +723,6 @@ class StreamExecution(
}
}

trait State
case object INITIALIZING extends State
case object ACTIVE extends State
case object TERMINATED extends State
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.streaming

import java.io.{InterruptedIOException, IOException}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}

import scala.reflect.ClassTag
import scala.util.control.ControlThrowable

Expand Down Expand Up @@ -350,20 +353,56 @@ class StreamSuite extends StreamTest {
}
}
}
}

/**
* A fake StreamSourceProvider thats creates a fake Source that cannot be reused.
*/
class FakeDefaultSource extends StreamSourceProvider {
test("handle IOException when the streaming thread is interrupted (pre Hadoop 2.8)") {
// This test uses a fake source to throw the same IOException as pre Hadoop 2.8 when the
// streaming thread is interrupted. We should handle it properly by not failing the query.
ThrowingIOExceptionLikeHadoop12074.createSourceLatch = new CountDownLatch(1)
val query = spark
.readStream
.format(classOf[ThrowingIOExceptionLikeHadoop12074].getName)
.load()
.writeStream
.format("console")
.start()
assert(ThrowingIOExceptionLikeHadoop12074.createSourceLatch
.await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
"ThrowingIOExceptionLikeHadoop12074.createSource wasn't called before timeout")
query.stop()
assert(query.exception.isEmpty)
}

test("handle InterruptedIOException when the streaming thread is interrupted (Hadoop 2.8+)") {
// This test uses a fake source to throw the same InterruptedIOException as Hadoop 2.8+ when the
// streaming thread is interrupted. We should handle it properly by not failing the query.
ThrowingInterruptedIOException.createSourceLatch = new CountDownLatch(1)
val query = spark
.readStream
.format(classOf[ThrowingInterruptedIOException].getName)
.load()
.writeStream
.format("console")
.start()
assert(ThrowingInterruptedIOException.createSourceLatch
.await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
"ThrowingInterruptedIOException.createSource wasn't called before timeout")
query.stop()
assert(query.exception.isEmpty)
}
}

abstract class FakeSource extends StreamSourceProvider {
private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)

override def sourceSchema(
spark: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType) = ("fakeSource", fakeSchema)
}

/** A fake StreamSourceProvider that creates a fake Source that cannot be reused. */
class FakeDefaultSource extends FakeSource {

override def createSource(
spark: SQLContext,
Expand Down Expand Up @@ -395,3 +434,63 @@ class FakeDefaultSource extends StreamSourceProvider {
}
}
}

/** A fake source that throws the same IOException like pre Hadoop 2.8 when it's interrupted. */
class ThrowingIOExceptionLikeHadoop12074 extends FakeSource {
import ThrowingIOExceptionLikeHadoop12074._

override def createSource(
spark: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
createSourceLatch.countDown()
try {
Thread.sleep(30000)
throw new TimeoutException("sleep was not interrupted in 30 seconds")
} catch {
case ie: InterruptedException =>
throw new IOException(ie.toString)
}
}
}

object ThrowingIOExceptionLikeHadoop12074 {
/**
* A latch to allow the user to wait until [[ThrowingIOExceptionLikeHadoop12074.createSource]] is
* called.
*/
@volatile var createSourceLatch: CountDownLatch = null
}

/** A fake source that throws InterruptedIOException like Hadoop 2.8+ when it's interrupted. */
class ThrowingInterruptedIOException extends FakeSource {
import ThrowingInterruptedIOException._

override def createSource(
spark: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
createSourceLatch.countDown()
try {
Thread.sleep(30000)
throw new TimeoutException("sleep was not interrupted in 30 seconds")
} catch {
case ie: InterruptedException =>
val iie = new InterruptedIOException(ie.toString)
iie.initCause(ie)
throw iie
}
}
}

object ThrowingInterruptedIOException {
/**
* A latch to allow the user to wait until [[ThrowingInterruptedIOException.createSource]] is
* called.
*/
@volatile var createSourceLatch: CountDownLatch = null
}