Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
def post(event: StreamingQueryListener.Event) {
event match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I didn't see any reason to distinguish these two types of event when posting

case s: QueryStartedEvent =>
sparkListenerBus.post(s)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryStartedEvent will be sent twice to StreamingQueryListener. There is a thread local variable in LiveListenerBus. You can use it to ignore duplicated QueryStartedEvent if it's posted in the listener thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is hacky, it's better to have some tests to make sure we won't break things in future when removing the hacky codes.

// post to local listeners to trigger callbacks
postToAll(s)
case _ =>
sparkListenerBus.post(event)
Expand All @@ -50,7 +52,13 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: StreamingQueryListener.Event =>
postToAll(e)
// SPARK-18144: we broadcast QueryStartedEvent to all listeners attached to this bus
// synchronously and the ones attached to LiveListenerBus asynchronously. Therefore,
// we need to ignore QueryStartedEvent if this method is called within SparkListenerBus
// thread
if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) {
postToAll(e)
}
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
// A StreamingQueryListener that gets the query status after the first completed trigger
val listener = new StreamingQueryListener {
@volatile var firstStatus: StreamingQueryStatus = null
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
@volatile var queryStartedEvent = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please add @volatile

override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
queryStartedEvent += 1
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
if (firstStatus == null) firstStatus = queryProgress.queryStatus
}
Expand All @@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
q.processAllAvailable()
eventually(timeout(streamingTimeout)) {
assert(listener.firstStatus != null)
// test if QueryStartedEvent callback is called for only once
assert(listener.queryStartedEvent === 1)
}
listener.firstStatus
} finally {
Expand Down