Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import scala.concurrent.duration.Duration;

import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger;
Expand All @@ -40,7 +41,7 @@ public class Trigger {
* @since 2.2.0
*/
public static Trigger ProcessingTime(long intervalMs) {
return ProcessingTime.create(intervalMs, TimeUnit.MILLISECONDS);
return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
}

/**
Expand All @@ -56,7 +57,7 @@ public static Trigger ProcessingTime(long intervalMs) {
* @since 2.2.0
*/
public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
return ProcessingTime.create(interval, timeUnit);
return ProcessingTimeTrigger.create(interval, timeUnit);
}

/**
Expand All @@ -71,7 +72,7 @@ public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
* @since 2.2.0
*/
public static Trigger ProcessingTime(Duration interval) {
return ProcessingTime.apply(interval);
return ProcessingTimeTrigger.apply(interval);
}

/**
Expand All @@ -84,7 +85,7 @@ public static Trigger ProcessingTime(Duration interval) {
* @since 2.2.0
*/
public static Trigger ProcessingTime(String interval) {
return ProcessingTime.apply(interval);
return ProcessingTimeTrigger.apply(interval);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ class MicroBatchExecution(
@volatile protected var sources: Seq[SparkDataStream] = Seq.empty

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case ProcessingTime(interval) => ProcessingTimeExecutor(
ProcessingTimeTrigger(interval), triggerClock)
case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration

import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.unsafe.types.CalendarInterval

/**
* A [[Trigger]] that runs a query periodically based on the processing time. If `interval` is 0,
* the query will run as fast as possible.
*/
@Evolving
private[sql] case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, if this is basically an implementation class, I wonder if it belongs in the (unfortunately named) Triggers.scala class, which only now has OneTimeTrigger, but at least is just another implementation class too? No big deal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we decide to move this class to Triggers.scala, I guess ContinuousTrigger has to be moved too. No big deal for me too, so please let me know which feels cleaner for you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd move it, to rationalize Triggers.scala and avoid another file. It doesn't matter much. I see there is ContinuousTrigger too but I guess it belongs in the .continuous subpackage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your voice on this. Agreed on both points, I'll leave ContinuousTrigger as it is, and move ProcessingTimeTrigger.

require(intervalMs >= 0, "the interval of trigger should not be negative")
}

private[sql] object ProcessingTimeTrigger {
def apply(interval: String): ProcessingTimeTrigger = {
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
new ProcessingTimeTrigger(TimeUnit.MICROSECONDS.toMillis(cal.microseconds))
}

def apply(interval: Duration): ProcessingTimeTrigger = {
ProcessingTimeTrigger(interval.toMillis)
}

def create(interval: String): ProcessingTimeTrigger = {
apply(interval)
}

def create(interval: Long, unit: TimeUnit): ProcessingTimeTrigger = {
ProcessingTimeTrigger(unit.toMillis(interval))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.internal.Logging
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove the ProcessingTime import?

import org.apache.spark.util.{Clock, SystemClock}

trait TriggerExecutor {
Expand All @@ -43,7 +43,9 @@ case class OneTimeExecutor() extends TriggerExecutor {
/**
* A trigger executor that runs a batch every `intervalMs` milliseconds.
*/
case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock())
case class ProcessingTimeExecutor(
processingTime: ProcessingTimeTrigger,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename the variable together.

  • processingTime -> processingTimeTrigger.
  • private val intervalMs = processingTime.intervalMs -> private val intervalMs = processingTimeTrigger.intervalMs

clock: Clock = new SystemClock())
extends TriggerExecutor with Logging {

private val intervalMs = processingTime.intervalMs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class ContinuousExecution(
}

private val triggerExecutor = trigger match {
case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTimeTrigger(t), triggerClock)
case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
import org.apache.spark.sql.streaming.Trigger

class ProcessingTimeSuite extends SparkFunSuite {

test("create") {
def getIntervalMs(trigger: Trigger): Long = trigger.asInstanceOf[ProcessingTime].intervalMs
def getIntervalMs(trigger: Trigger): Long = {
trigger.asInstanceOf[ProcessingTimeTrigger].intervalMs
}

assert(getIntervalMs(Trigger.ProcessingTime(10.seconds)) === 10 * 1000)
assert(getIntervalMs(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) === 10 * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {
val timeout = 10.seconds

test("nextBatchTime") {
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100))
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTimeTrigger(100))
assert(processingTimeExecutor.nextBatchTime(0) === 100)
assert(processingTimeExecutor.nextBatchTime(1) === 100)
assert(processingTimeExecutor.nextBatchTime(99) === 100)
Expand All @@ -49,7 +49,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {
val clock = new StreamManualClock()
@volatile var continueExecuting = true
@volatile var clockIncrementInTrigger = 0L
val executor = ProcessingTimeExecutor(ProcessingTime("1000 milliseconds"), clock)
val executor = ProcessingTimeExecutor(ProcessingTimeTrigger("1000 milliseconds"), clock)
val executorThread = new Thread() {
override def run(): Unit = {
executor.execute(() => {
Expand Down Expand Up @@ -97,7 +97,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {

test("calling nextBatchTime with the result of a previous call should return the next interval") {
val intervalMS = 100
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMS))
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTimeTrigger(intervalMS))

val ITERATION = 10
var nextBatchTime: Long = 0
Expand All @@ -111,7 +111,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {

private def testBatchTermination(intervalMs: Long): Unit = {
var batchCounts = 0
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs))
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTimeTrigger(intervalMs))
processingTimeExecutor.execute(() => {
batchCounts += 1
// If the batch termination works correctly, batchCounts should be 3 after `execute`
Expand All @@ -130,7 +130,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {
@volatile var batchFallingBehindCalled = false
val t = new Thread() {
override def run(): Unit = {
val processingTimeExecutor = new ProcessingTimeExecutor(ProcessingTime(100), clock) {
val processingTimeExecutor = new ProcessingTimeExecutor(ProcessingTimeTrigger(100), clock) {
override def notifyBatchFallingBehind(realElapsedTimeMs: Long): Unit = {
batchFallingBehindCalled = true
}
Expand Down