Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}

testQuietly("single listener, check trigger events are generated correctly") {
testSingleListenerBasic(new EventCollectorV1)
testSingleListenerBasic(new EventCollectorV2)
}

private def testSingleListenerBasic(listener: EventCollector): Unit = {
val clock = new StreamManualClock
val inputData = new MemoryStream[Int](0, sqlContext)
val df = inputData.toDS().as[Long].map { 10 / _ }
val listener = new EventCollector

case class AssertStreamExecThreadToWaitForClock()
extends AssertOnQuery(q => {
Expand Down Expand Up @@ -155,7 +159,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {

test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") {
val df = MemoryStream[Int].toDS().as[Long]
val listeners = (1 to 5).map(_ => new EventCollector)
val listeners = (1 to 5).map(_ => new EventCollectorV2)
try {
listeners.foreach(listener => spark.streams.addListener(listener))
testStream(df, OutputMode.Append)(
Expand All @@ -182,7 +186,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {

test("continuous processing listeners should receive QueryTerminatedEvent") {
val df = spark.readStream.format("rate").load()
val listeners = (1 to 5).map(_ => new EventCollector)
val listeners = (1 to 5).map(_ => new EventCollectorV2)
try {
listeners.foreach(listener => spark.streams.addListener(listener))
testStream(df, OutputMode.Append)(
Expand Down Expand Up @@ -218,8 +222,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}

try {
val listener1 = new EventCollector
val listener2 = new EventCollector
val listener1 = new EventCollectorV1
val listener2 = new EventCollectorV2

spark.streams.addListener(listener1)
assert(isListenerActive(listener1))
Expand All @@ -236,7 +240,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}

test("event ordering") {
val listener = new EventCollector
val listener = new EventCollectorV2
withListenerAdded(listener) {
for (i <- 1 to 50) {
listener.reset()
Expand Down Expand Up @@ -348,8 +352,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("listener only posts events from queries started in the related sessions") {
val session1 = spark.newSession()
val session2 = spark.newSession()
val collector1 = new EventCollector
val collector2 = new EventCollector
val collector1 = new EventCollectorV2
val collector2 = new EventCollectorV2

def runQuery(session: SparkSession): Unit = {
collector1.reset()
Expand Down Expand Up @@ -434,7 +438,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
.observe(
name = "other_event",
avg($"value").cast("int").as("avg_val"))
val listener = new EventCollector
val listener = new EventCollectorV2
def checkMetrics(f: java.util.Map[String, Row] => Unit): StreamAction = {
AssertOnQuery { _ =>
eventually(Timeout(streamingTimeout)) {
Expand Down Expand Up @@ -576,7 +580,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}

/** Collects events from the StreamingQueryListener for testing */
class EventCollector extends StreamingQueryListener {
abstract class EventCollector extends StreamingQueryListener {
// to catch errors in the async listener events
@volatile private var asyncTestWaiter = new Waiter

Expand Down Expand Up @@ -606,32 +610,61 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
asyncTestWaiter.await(timeout(streamingTimeout))
}

override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
protected def handleOnQueryStarted(queryStarted: QueryStartedEvent): Unit = {
asyncTestWaiter {
startEvent = queryStarted
}
}

override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
protected def handleOnQueryProgress(queryProgress: QueryProgressEvent): Unit = {
asyncTestWaiter {
assert(startEvent != null, "onQueryProgress called before onQueryStarted")
_progressEvents.synchronized { _progressEvents += queryProgress.progress }
_progressEvents.synchronized {
_progressEvents += queryProgress.progress
}
}
}

override def onQueryIdle(queryIdle: QueryIdleEvent): Unit = {
protected def handleOnQueryIdle(queryIdle: QueryIdleEvent): Unit = {
asyncTestWaiter {
assert(startEvent != null, "onQueryIdle called before onQueryStarted")
idleEvent = queryIdle
}
}

override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
protected def handleOnQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
asyncTestWaiter {
assert(startEvent != null, "onQueryTerminated called before onQueryStarted")
terminationEvent = queryTerminated
}
asyncTestWaiter.dismiss()
}
}

/**
* V1: Initial interface of StreamingQueryListener containing methods `onQueryStarted`,
* `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5.
*/
class EventCollectorV1 extends EventCollector {
override def onQueryStarted(event: QueryStartedEvent): Unit = handleOnQueryStarted(event)

override def onQueryProgress(event: QueryProgressEvent): Unit = handleOnQueryProgress(event)

override def onQueryTerminated(event: QueryTerminatedEvent): Unit =
handleOnQueryTerminated(event)
}

/**
* V2: The interface after the method `onQueryIdle` is added. It is Spark 3.5+.
*/
class EventCollectorV2 extends EventCollector {
override def onQueryStarted(event: QueryStartedEvent): Unit = handleOnQueryStarted(event)

override def onQueryProgress(event: QueryProgressEvent): Unit = handleOnQueryProgress(event)

override def onQueryIdle(event: QueryIdleEvent): Unit = handleOnQueryIdle(event)

override def onQueryTerminated(event: QueryTerminatedEvent): Unit =
handleOnQueryTerminated(event)
}
}