-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-20209][SS] Execute next trigger immediately if previous batch took longer than trigger interval #17525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,14 +17,21 @@ | |
|
|
||
| package org.apache.spark.sql.execution.streaming | ||
|
|
||
| import java.util.concurrent.{CountDownLatch, TimeUnit} | ||
| import scala.collection.mutable | ||
|
|
||
| import org.scalatest.concurrent.Eventually | ||
| import org.scalatest.concurrent.PatienceConfiguration.Timeout | ||
| import org.scalatest.concurrent.Timeouts._ | ||
| import org.scalatest.time.SpanSugar._ | ||
|
|
||
| import org.apache.spark.SparkFunSuite | ||
| import org.apache.spark.sql.streaming.ProcessingTime | ||
| import org.apache.spark.util.{Clock, ManualClock, SystemClock} | ||
| import org.apache.spark.sql.streaming.util.StreamManualClock | ||
|
|
||
| class ProcessingTimeExecutorSuite extends SparkFunSuite { | ||
|
|
||
| val timeout = 10.seconds | ||
|
|
||
| test("nextBatchTime") { | ||
| val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100)) | ||
| assert(processingTimeExecutor.nextBatchTime(0) === 100) | ||
|
|
@@ -35,6 +42,56 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite { | |
| assert(processingTimeExecutor.nextBatchTime(150) === 200) | ||
| } | ||
|
|
||
| test("trigger timing") { | ||
| val executedTimes = new mutable.ArrayBuffer[Long] | ||
| val manualClock = new StreamManualClock() | ||
| @volatile var continueExecuting = true | ||
| @volatile var lastTriggerTime = -1L | ||
| @volatile var clockIncrementInTrigger = 0L | ||
| val executor = ProcessingTimeExecutor(ProcessingTime("1000 milliseconds"), manualClock) | ||
| val executorThread = new Thread() { | ||
| override def run(): Unit = { | ||
| executor.execute(() => { | ||
| // Record the trigger time, increment clock if needed and | ||
| lastTriggerTime = manualClock.getTimeMillis() | ||
| manualClock.advance(clockIncrementInTrigger) | ||
| clockIncrementInTrigger = 0 // reset this so that there are no runaway triggers | ||
| continueExecuting | ||
| }) | ||
| } | ||
| } | ||
| executorThread.start() | ||
| // First batch should execute immediately, then executor should wait for next one | ||
| eventually { | ||
| assert(lastTriggerTime === 0) | ||
| assert(manualClock.isStreamWaitingAt(0)) | ||
| assert(manualClock.isStreamWaitingFor(1000)) | ||
| } | ||
|
|
||
| // Second batch should execute when clock reaches the next trigger time. | ||
| // If next trigger takes less than the trigger interval, executor should wait for next one | ||
| clockIncrementInTrigger = 500 | ||
| manualClock.setTime(1000) | ||
| eventually { | ||
| assert(lastTriggerTime === 1000) | ||
| assert(manualClock.isStreamWaitingAt(1500)) | ||
| assert(manualClock.isStreamWaitingFor(2000)) | ||
| } | ||
|
|
||
| // If next trigger takes less than the trigger interval, executor should immediately execute | ||
| // another one | ||
| clockIncrementInTrigger = 1500 | ||
| manualClock.setTime(2000) | ||
| eventually { | ||
| assert(lastTriggerTime === 3500) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it was hard to understand that the test is actually testing that this value is |
||
| assert(manualClock.isStreamWaitingAt(3500)) | ||
| assert(manualClock.isStreamWaitingFor(4000)) | ||
| } | ||
| continueExecuting = false | ||
| manualClock.advance(1000) | ||
| waitForThreadJoin(executorThread) | ||
| } | ||
|
|
||
| test("calling nextBatchTime with the result of a previous call should return the next interval") { | ||
| val intervalMS = 100 | ||
| val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMS)) | ||
|
|
@@ -54,7 +111,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite { | |
| val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs)) | ||
| processingTimeExecutor.execute(() => { | ||
| batchCounts += 1 | ||
| // If the batch termination works well, batchCounts should be 3 after `execute` | ||
| // If the batch termination works correctly, batchCounts should be 3 after `execute` | ||
| batchCounts < 3 | ||
| }) | ||
| assert(batchCounts === 3) | ||
|
|
@@ -66,9 +123,8 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite { | |
| } | ||
|
|
||
| test("notifyBatchFallingBehind") { | ||
| val clock = new ManualClock() | ||
| val clock = new StreamManualClock() | ||
| @volatile var batchFallingBehindCalled = false | ||
| val latch = new CountDownLatch(1) | ||
| val t = new Thread() { | ||
| override def run(): Unit = { | ||
| val processingTimeExecutor = new ProcessingTimeExecutor(ProcessingTime(100), clock) { | ||
|
|
@@ -77,17 +133,24 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite { | |
| } | ||
| } | ||
| processingTimeExecutor.execute(() => { | ||
| latch.countDown() | ||
| clock.waitTillTime(200) | ||
| false | ||
| }) | ||
| } | ||
| } | ||
| t.start() | ||
| // Wait until the batch is running so that we don't call `advance` too early | ||
| assert(latch.await(10, TimeUnit.SECONDS), "the batch has not yet started in 10 seconds") | ||
| eventually { assert(clock.isStreamWaitingFor(200)) } | ||
| clock.advance(200) | ||
| t.join() | ||
| waitForThreadJoin(t) | ||
| assert(batchFallingBehindCalled === true) | ||
| } | ||
|
|
||
| private def eventually(body: => Unit): Unit = { | ||
| Eventually.eventually(Timeout(timeout)) { body } | ||
| } | ||
|
|
||
| private def waitForThreadJoin(thread: Thread): Unit = { | ||
| failAfter(timeout) { thread.join() } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.streaming.util | ||
|
|
||
| import org.apache.spark.util.ManualClock | ||
|
|
||
| /** ManualClock used for streaming tests */ | ||
| class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable { | ||
| private var waitStartTime: Option[Long] = None | ||
| private var waitTargetTime: Option[Long] = None | ||
|
|
||
| override def waitTillTime(targetTime: Long): Long = synchronized { | ||
| try { | ||
| waitStartTime = Some(getTimeMillis()) | ||
| waitTargetTime = Some(targetTime) | ||
| super.waitTillTime(targetTime) | ||
| } finally { | ||
| waitStartTime = None | ||
| waitTargetTime = None | ||
| } | ||
| } | ||
|
|
||
| def isStreamWaitingAt(time: Long): Boolean = synchronized { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mind adding docs on when these should be used? |
||
| waitStartTime == Some(time) | ||
| } | ||
|
|
||
| def isStreamWaitingFor(target: Long): Boolean = synchronized { | ||
| waitTargetTime == Some(target) | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the doc seems wrong btw, mind fixing it?
nextBatchTime(nextBatchTime(0)) = 100or am I understanding it wrong?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spoken offline, this isnt wrong.