Skip to content

Conversation

@lw-lin
Copy link
Contributor

@lw-lin lw-lin commented Apr 30, 2016

What changes were proposed in this pull request?

Currently in StreamTest, we have a StartStream which will start a streaming query against trigger ProcessTime(intervalMS = 0) and SystemClock.

We also need to test cases against ProcessTime(intervalMS > 0), which often requires ManualClock.

This patch:

  • fixes an issue of ProcessingTimeExecutor, where for a batch it should run batchRunner only once but might run multiple times under certain conditions;
  • adds support for testing against the ProcessingTime(intervalMS > 0) trigger and ManualClock, by specifying them as fields for StartStream, and by adding an AdvanceManualClock action;
  • adds a test, which takes advantage of the new StartStream and AdvanceManualClock, to test against PR#[SPARK-14942] Reduce delay between batch construction and execution .

How was this patch tested?

N/A

@lw-lin
Copy link
Contributor Author

lw-lin commented Apr 30, 2016

This paragraph focuses on the ProcessingTimeExecutor might run batchRunner multiple times issue.

Current implementation might run batchRunner multiple times for a batch

Let's have intervalMs = 100, then in the current implementation, we'll get:

nextBatchTime(0) = 0
nextBatchTime(1) = 100
...
nextBatchTime(99) = 100
nextBatchTime(100) = 100
nextBatchTime(101) = 200
...
nextBatchTime(199) = 200
nextBatchTime(200) = 200

Because nextBatchTime(nextBatchTime(0)) is still 0, we might get the unexpected execution:

we run batchRunner()
now = 0, so nextBatchTime(with now = 0) is 0
so we run batchRunner()
now = 0, so nextBatchTime(with now = 0) is 0
so we run batchRunner()
...
until now advance to 1, then nextBatchTime(with now = 1) is 100, then we jump to the next batch

This patch's fix

After this patch, we'll get:

nextBatchTime(0) = 100     // this is the change
nextBatchTime(1) = 100
...
nextBatchTime(99) = 100
nextBatchTime(100) = 200   // this is the change
nextBatchTime(101) = 200
...
nextBatchTime(199) = 200
nextBatchTime(200) = 300   // this is the change

Because nextBatchTime(nextBatchTime(0)) is now 200, we'll get the expected execution:

we run batchRunner()
now = 0, so nextBatchTime(with now = 0) is 100
we wait until the next batch
we run batchRunner()
now = 100, so nextBatchTime(with now = 100) is 200
we wait until the next batch
we run batchRunner()
...

Why would this matter?

Generally in normal runtime environment, this issue is triggered only under extreme conditions where nextBatchTime is multiple of intervalMS and batchRunner() runs very fast so now does not change.

However, if we run tests against ManualClock, this issue is almost always triggered. This is because this time how fast or slow batchRunner() runs is irrelevant -- now always does not change unless we advance the clock manually. So although it's expected that batchRunner() runs only once each time we advance the clock, facts are batchRunner() might run many times. So I think we should fix this.

testNextBatchTimeAgainstClock(new SystemClock)
}

test("nextBatchTime against ManualClock") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note the ProcessingTimeExecutor issue would fail this test without this patch, but would pass with this patch.

@lw-lin
Copy link
Contributor Author

lw-lin commented Apr 30, 2016

@marmbrus @tdas @zsxwing would you mind taking a look? Thanks!

@SparkQA
Copy link

SparkQA commented Apr 30, 2016

Test build #57391 has finished for PR 12797 at commit 9d80b15.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

/** Return the next multiple of intervalMs */
/** Return the next multiple of intervalMs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: comment style is off, we use javadoc style

@marmbrus
Copy link
Contributor

marmbrus commented May 2, 2016

Some minor comments about code understandability, but overall this looks good. Thanks for working on this!


/** Starts the stream, resuming if data has already been processed. It must not be running. */
case object StartStream extends StreamAction
case class StartStream(trigger: Trigger = null, triggerClock: Clock = null) extends StreamAction
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the same default values of StreamExecution. Then you don't need to handle the null case.

Copy link
Contributor Author

@lw-lin lw-lin May 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This layer of nulls was intended to delegate the default values of StreamExecution into these tests, so that we don't have to set the same default values in many places and maintain their consistency. But since it seems very unlikely that we would change the default values, so I've removed the nulls layer and followed your comments.

Thanks!

@lw-lin lw-lin changed the title [SPARK-15022][SPARK-15023] Add support for testing against the ProcessingTime(intervalMS > 0) trigger and ManualClock [SPARK-15022][SPARK-15023][SQL][Streaming] Add support for testing against the ProcessingTime(intervalMS > 0) trigger and ManualClock May 3, 2016
@zsxwing
Copy link
Member

zsxwing commented May 3, 2016

Looks pretty good. @lw-lin could you address the comments and resolve the conflicts?

lw-lin added 2 commits May 4, 2016 07:16
# Conflicts:
#
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamE
xecution.scala
* Returns the start time in milliseconds for the next batch interval, given the current time.
* Note that a batch interval is inclusive with respect to its start time, and thus calling
* `nextBatchTime` with the result of a previous call should return the next interval. (i.e. given
* an interval of `100 ms`, `nextBatchTime(nextBatchTime(0)) = 200` rather than `0`).
Copy link
Member

@zsxwing zsxwing May 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: nextBatchTime(nextBatchTime(0)) = 200 -> nextBatchTime(nextBatchTime(0)) = 100

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextBatchTime(0) = 100, so nextBatchTime(nextBatchTime(0)) = 200?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. Sorry for the mistake.

@zsxwing
Copy link
Member

zsxwing commented May 4, 2016

LGTM pending tests.

@lw-lin
Copy link
Contributor Author

lw-lin commented May 4, 2016

@marmbrus @zsxwing thank you for the review! :-)

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57691 has finished for PR 12797 at commit bc89962.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented May 4, 2016

Merging to master / 2.0. Thanks again @lw-lin

@asfgit asfgit closed this in e597ec6 May 4, 2016
asfgit pushed a commit that referenced this pull request May 4, 2016
…ainst the `ProcessingTime(intervalMS > 0)` trigger and `ManualClock`

## What changes were proposed in this pull request?

Currently in `StreamTest`, we have a `StartStream` which will start a streaming query against trigger `ProcessTime(intervalMS = 0)` and `SystemClock`.

We also need to test cases against `ProcessTime(intervalMS > 0)`, which often requires `ManualClock`.

This patch:
- fixes an issue of `ProcessingTimeExecutor`, where for a batch it should run `batchRunner` only once but might run multiple times under certain conditions;
- adds support for testing against the `ProcessingTime(intervalMS > 0)` trigger and `AdvanceManualClock`, by specifying them as fields for `StartStream`, and by adding an `AdvanceClock` action;
- adds a test, which takes advantage of the new `StartStream` and `AdvanceManualClock`, to test against [PR#[SPARK-14942] Reduce delay between batch construction and execution ](#12725).

## How was this patch tested?

N/A

Author: Liwei Lin <[email protected]>

Closes #12797 from lw-lin/add-trigger-test-support.

(cherry picked from commit e597ec6)
Signed-off-by: Shixiong Zhu <[email protected]>
@lw-lin lw-lin deleted the add-trigger-test-support branch June 11, 2016 03:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants