Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.kafka.clients.producer.ProducerRecord

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.datasources.v2.ContinuousScanExec
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.ContinuousTrigger
import org.apache.spark.sql.streaming.Trigger

// Run tests in KafkaSourceSuiteBase in continuous execution mode.
Expand Down
6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ object MimaExcludes {

// [SPARK-26616][MLlib] Expose document frequency in IDFModel
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.feature.IDFModel.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf"),

// [SPARK-28199][SS] Remove deprecated ProcessingTime
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$")
)

// Exclude rules for 2.4.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import java.util.concurrent.TimeUnit;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import scala.concurrent.duration.Duration;

import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;

/**
Expand All @@ -40,7 +41,7 @@ public class Trigger {
* @since 2.2.0
*/
public static Trigger ProcessingTime(long intervalMs) {
return ProcessingTime.create(intervalMs, TimeUnit.MILLISECONDS);
return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
}

/**
Expand All @@ -56,7 +57,7 @@ public static Trigger ProcessingTime(long intervalMs) {
* @since 2.2.0
*/
public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
return ProcessingTime.create(interval, timeUnit);
return ProcessingTimeTrigger.create(interval, timeUnit);
}

/**
Expand All @@ -71,7 +72,7 @@ public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
* @since 2.2.0
*/
public static Trigger ProcessingTime(Duration interval) {
return ProcessingTime.apply(interval);
return ProcessingTimeTrigger.apply(interval);
}

/**
Expand All @@ -84,7 +85,7 @@ public static Trigger ProcessingTime(Duration interval) {
* @since 2.2.0
*/
public static Trigger ProcessingTime(String interval) {
return ProcessingTime.apply(interval);
return ProcessingTimeTrigger.apply(interval);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchSt
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.util.Clock

class MicroBatchExecution(
Expand All @@ -51,7 +51,7 @@ class MicroBatchExecution(
@volatile protected var sources: Seq[SparkDataStream] = Seq.empty

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.internal.Logging
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.util.{Clock, SystemClock}

trait TriggerExecutor {
Expand All @@ -43,10 +42,12 @@ case class OneTimeExecutor() extends TriggerExecutor {
/**
* A trigger executor that runs a batch every `intervalMs` milliseconds.
*/
case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock())
case class ProcessingTimeExecutor(
processingTimeTrigger: ProcessingTimeTrigger,
clock: Clock = new SystemClock())
extends TriggerExecutor with Logging {

private val intervalMs = processingTime.intervalMs
private val intervalMs = processingTimeTrigger.intervalMs
require(intervalMs >= 0)

override def execute(triggerHandler: () => Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,94 @@

package org.apache.spark.sql.execution.streaming

import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.unsafe.types.CalendarInterval

private object Triggers {
def validate(intervalMs: Long): Unit = {
require(intervalMs >= 0, "the interval of trigger should not be negative")
}

def convert(interval: String): Long = {
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
TimeUnit.MICROSECONDS.toMillis(cal.microseconds)
}

def convert(interval: Duration): Long = interval.toMillis

def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval)
}

/**
* A [[Trigger]] that processes only one batch of data in a streaming query then terminates
* the query.
*/
@Experimental
@Evolving
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we remove the annotations? it's private but the annotations say it's an API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right. These classes are now not intended to expose so should remove annotations. Thanks for finding it out!

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jun 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well... in reality that was done in #25200. Let's make sure we check the latest code (not the code diff) while doing post-hoc review after long delay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sure. Thanks :D.

case object OneTimeTrigger extends Trigger
private[sql] case object OneTimeTrigger extends Trigger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, let's don't have private[sql] since execution package is already private per SPARK-16964

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK will fix. The practice seems to be really easy to miss IMHO though.


/**
* A [[Trigger]] that runs a query periodically based on the processing time. If `interval` is 0,
* the query will run as fast as possible.
*/
@Evolving
private[sql] case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you put this into another file like ContinuousTrigger.scala and the previous ProcessingTime.scala?
If possible, please do git mv for renaming ProcessingTime to ProcessingTimeTrigger. Then, update the new file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this as @srowen's suggestion (#24996 (comment)) as OneTimeTrigger is there without its own file.

I'm still not sure, but if the intention of deprecation is hiding implementations to end users, actually I'd also like to move ContinuousTrigger to Triggers.scala, as they can be controlled together.

If we change the mind to have file per implementation, Triggers.scala would be better to be renamed as OneTimeTrigger.scala too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya. Moving ContinuousTrigger to Triggers.scala is also a possible way to be consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to keep original class as we haven't deprecated it yet, and to allow end users to only create Trigger implementations as Trigger.xxx, we may also want to deprecated some more classes as well.

The change may look like below commit:
HeartSaVioR@f8488cf

IMHO this could be considered as another issue as more deprecations are happening. WDYT?

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, moving looks okay, but the new deprecation of OneTimeTrigger there is unexpected one for me.

@deprecated("use Trigger.Once()", "3.0.0")
// NOTE: In later release, we can change this to `private[sql]` and remove deprecated.
case object OneTimeTrigger extends Trigger

Please make another PR for the deprecation of OneTimeTrigger if we really need that.

If the PR has multiple themes unexpectedly, we cannot merge it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion is we follow the comment above by moving OneTimeTrigger too, and leave it there. That's more consistent, and consistent with the intent. This leaves the class potentially 'public' in the bytecode, but it was before, and that's true of lots of private[spark] classes anyway. While I wouldn't argue with further moving things to sql/execution, I think that just putting the implementations in their natural home right now sounds coherent and an improvement, and doesn't expand the change much.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jul 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I guess I mixed up. My bad, not moving OneTimeTrigger but moving ContinuousTrigger. Let me enumerate necessary changes from what I understand:

  • OneTimeTrigger is already in Triggers.scala but it's open to public, so need to add deprecation and plan to hide it.
    • I guess adding private[sql] would work since Triggers.scala is already in sql.execution.streaming package. Not sure exact package of sql.execution will be hidden.
    • Otherwise we can add replacement as same as what we do for ProcessingTime, but requires bigger changeset.
  • Move ContinuousTrigger to Triggers.scala with private[sql] scope, and deprecate origin.

I guess both moving and deprecating make the changeset looking verbose, but I guess even in major release we may not want to remove public classes which haven't been deprecated.

I guess my commit (HeartSaVioR@f8488cf) mentioned above already covered it, so please take a look at the commit. If we are OK to go or would like to continue reviewing under the commit, I'll add the commit to the PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops I misspoke, I mean "move the implementations to Triggers.scala".
I'm personally OK with not even deprecating, just moving, as it's a major release and a small detail, but, OK with deprecation too.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jul 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah either is fine for me too. If we would like to have simpler one, skipping deprecation would work. If we would like to have safer one (possibly user facing API), deprecation would work. I'd like to ask the decision for committers/PMC members, as it seems like related to some policy on the project.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To move this forward, I suggest we just move the class and skip deprecation. A note in the Spark 3.0 migration guide about streaming would be good, as we're removing a deprecated class anyway.

Triggers.validate(intervalMs)
}

private[sql] object ProcessingTimeTrigger {
import Triggers._

def apply(interval: String): ProcessingTimeTrigger = {
ProcessingTimeTrigger(convert(interval))
}

def apply(interval: Duration): ProcessingTimeTrigger = {
ProcessingTimeTrigger(convert(interval))
}

def create(interval: String): ProcessingTimeTrigger = {
apply(interval)
}

def create(interval: Long, unit: TimeUnit): ProcessingTimeTrigger = {
ProcessingTimeTrigger(convert(interval, unit))
}
}

/**
* A [[Trigger]] that continuously processes streaming data, asynchronously checkpointing at
* the specified interval.
*/
@Evolving
private[sql] case class ContinuousTrigger(intervalMs: Long) extends Trigger {
Triggers.validate(intervalMs)
}

private[sql] object ContinuousTrigger {
import Triggers._

def apply(interval: String): ContinuousTrigger = {
ContinuousTrigger(convert(interval))
}

def apply(interval: Duration): ContinuousTrigger = {
ContinuousTrigger(convert(interval))
}

def create(interval: String): ContinuousTrigger = {
apply(interval)
}

def create(interval: Long, unit: TimeUnit): ContinuousTrigger = {
ContinuousTrigger(convert(interval, unit))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, TableCapability}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.util.Clock

class ContinuousExecution(
Expand Down Expand Up @@ -93,7 +93,7 @@ class ContinuousExecution(
}

private val triggerExecutor = trigger match {
case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTimeTrigger(t), triggerClock)
case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.sources._
import org.apache.spark.sql.sources.v2.{SupportsWrite, TableProvider}
import org.apache.spark.sql.sources.v2.TableCapability._
Expand Down
Loading