Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ trait ProgressReporter extends Logging {

if (hasExecuted) {
// Reset noDataEventTimestamp if we processed any data
lastNoExecutionProgressEventTime = Long.MinValue
lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reset "lastNoExecutionProgressEventTime =Long.MinValue", it will make "now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime" always true when there is no new data. Then progress reporter will report an empty progress.

updateProgress(newProgress)
} else {
val now = triggerClock.getTimeMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,12 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
if (flag) assertNumStateRows(total = 1, updated = 1)
else assertNumStateRows(total = 7, updated = 1)
},
AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L)
AssertOnQuery { q =>
eventually(timeout(streamingTimeout)) {
q.lastProgress.sink.numOutputRows == 0L
true
}
}
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,22 +389,22 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Structured Streaming in Spark 2.0.0. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1)
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.0.txt", 1)
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") {
// query-event-logs-version-2.0.1.txt has all types of events generated by
// Structured Streaming in Spark 2.0.1. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1)
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.1.txt", 1)
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") {
// query-event-logs-version-2.0.2.txt has all types of events generated by
// Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events
// in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5)
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.2.txt", 5)
}

test("listener propagates observable metrics") {
Expand Down Expand Up @@ -433,9 +433,13 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}

try {
val noDataProgressIntervalKey = SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key
spark.streams.addListener(listener)
testStream(df, OutputMode.Append)(
StartStream(Trigger.ProcessingTime(100), triggerClock = clock),
StartStream(
Trigger.ProcessingTime(100),
triggerClock = clock,
Map(noDataProgressIntervalKey -> "100")),
Copy link
Contributor Author

@uncleGen uncleGen Jun 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set "noDataProgressIntervalKey" to 100. After advance manual clock at batch 3 (with no data), it will report an empty progress event.

// Batch 1
AddData(inputData, 1, 2),
AdvanceManualClock(100),
Expand Down Expand Up @@ -464,7 +468,49 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}

private def testReplayListenerBusWithBorkenEventJsons(
test("SPARK-31593: remove unnecessary streaming query progress update") {
withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100") {
@volatile var numProgressEvent = 0
val listener = new StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
numProgressEvent += 1
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
spark.streams.addListener(listener)

def checkProgressEvent(count: Int): StreamAction = {
AssertOnQuery { _ =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an inner function to simplify test.

eventually(Timeout(streamingTimeout)) {
assert(numProgressEvent == count)
}
true
}
}

try {
val input = new MemoryStream[Int](0, sqlContext)
val clock = new StreamManualClock()
val result = input.toDF().select("value")
testStream(result)(
StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock),
AddData(input, 10),
checkProgressEvent(1),
AdvanceManualClock(10),
checkProgressEvent(2),
AdvanceManualClock(90),
checkProgressEvent(2),
AdvanceManualClock(10),
checkProgressEvent(3)
)
} finally {
spark.streams.removeListener(listener)
}
}
}

private def testReplayListenerBusWithBrokenEventJsons(
fileName: String,
expectedEventSize: Int): Unit = {
val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")
Expand Down