File tree Expand file tree Collapse file tree 1 file changed +28
-0
lines changed
sql/core/src/test/scala/org/apache/spark/sql/streaming Expand file tree Collapse file tree 1 file changed +28
-0
lines changed Original file line number Diff line number Diff line change @@ -500,6 +500,34 @@ class StreamSuite extends StreamTest {
500500 }
501501 }
502502 }
503+
504+ test(" calling stop() on a query cancels related jobs" ) {
505+ val input = MemoryStream [Int ]
506+ val query = input
507+ .toDS()
508+ .map { i =>
509+ while (! org.apache.spark.TaskContext .get().isInterrupted()) {
510+ // keep looping till interrupted by query.stop()
511+ Thread .sleep(100 )
512+ }
513+ i
514+ }
515+ .writeStream
516+ .format(" console" )
517+ .start()
518+
519+ input.addData(1 )
520+ // wait for jobs to start
521+ eventually(timeout(streamingTimeout)) {
522+ assert(sparkContext.statusTracker.getActiveJobIds().nonEmpty)
523+ }
524+
525+ query.stop()
526+ // make sure jobs are stopped
527+ eventually(timeout(streamingTimeout)) {
528+ assert(sparkContext.statusTracker.getActiveJobIds().isEmpty)
529+ }
530+ }
503531}
504532
505533abstract class FakeSource extends StreamSourceProvider {
You can’t perform that action at this time.
0 commit comments