Skip to content

Conversation

@lw-lin
Copy link
Contributor

@lw-lin lw-lin commented Apr 27, 2016

Problem

Currently in StreamExecution, we first run the batch, then construct the next:

if (dataAvailable) runBatch()
constructNextBatch()

This is good when we run batches ASAP, where data would get processed in the very next batch:

1

However, when we run batches at trigger like ProcessTime("1 minute"), data - such as y below - may not get processed in the very next batch i.e. batch 1, but in batch 2:

2

What changes were proposed in this pull request?

This patch reverses the order of constructNextBatch() and runBatch(). After this patch, data would get processed in the very next batch, i.e. batch 1:

3

In addition, this patch alters when we do currentBatchId += 1: let's do that when the processing of the current batch's data is completed, so we won't bother passing currentBatchId + 1 or currentBatchId - 1 to states or sinks.

How was this patch tested?

New added test case. Also this should be covered by existing test suits, e.g. stress tests and others.

@SparkQA
Copy link

SparkQA commented Apr 27, 2016

Test build #57084 has finished for PR 12725 at commit 8c8d73a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lw-lin
Copy link
Contributor Author

lw-lin commented Apr 27, 2016

@marmbrus @tdas @zsxwing would you mind taking a look? Thanks!

@marmbrus
Copy link
Contributor

This makes sense. Thanks for writing a very clear description! Perhaps a better title would be "Reduce delay between batch construction and execution"

Is there any way we can test this? What if we injected a manual timed executor?

@lw-lin lw-lin changed the title [SPARK-14942][SQL][Streaming] First construct a batch then run the batch for continuous queries [SPARK-14942][SQL][Streaming] Reduce delay between batch construction and execution Apr 28, 2016
@lw-lin
Copy link
Contributor Author

lw-lin commented Apr 28, 2016

Sure, I'll add a manual timed executor and some dedicated tests as well.

@lw-lin
Copy link
Contributor Author

lw-lin commented Apr 30, 2016

To make things easier to review, I've added the manual timed executor for testing general cases in a separate PR.

populateStartOffsets()
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
}
else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: merge this line and the previous one.

@zsxwing
Copy link
Member

zsxwing commented May 2, 2016

Looks pretty good.

asfgit pushed a commit that referenced this pull request May 4, 2016
…ainst the `ProcessingTime(intervalMS > 0)` trigger and `ManualClock`

## What changes were proposed in this pull request?

Currently in `StreamTest`, we have a `StartStream` which will start a streaming query against trigger `ProcessTime(intervalMS = 0)` and `SystemClock`.

We also need to test cases against `ProcessTime(intervalMS > 0)`, which often requires `ManualClock`.

This patch:
- fixes an issue of `ProcessingTimeExecutor`, where for a batch it should run `batchRunner` only once but might run multiple times under certain conditions;
- adds support for testing against the `ProcessingTime(intervalMS > 0)` trigger and `AdvanceManualClock`, by specifying them as fields for `StartStream`, and by adding an `AdvanceClock` action;
- adds a test, which takes advantage of the new `StartStream` and `AdvanceManualClock`, to test against [PR#[SPARK-14942] Reduce delay between batch construction and execution ](#12725).

## How was this patch tested?

N/A

Author: Liwei Lin <[email protected]>

Closes #12797 from lw-lin/add-trigger-test-support.
asfgit pushed a commit that referenced this pull request May 4, 2016
…ainst the `ProcessingTime(intervalMS > 0)` trigger and `ManualClock`

## What changes were proposed in this pull request?

Currently in `StreamTest`, we have a `StartStream` which will start a streaming query against trigger `ProcessTime(intervalMS = 0)` and `SystemClock`.

We also need to test cases against `ProcessTime(intervalMS > 0)`, which often requires `ManualClock`.

This patch:
- fixes an issue of `ProcessingTimeExecutor`, where for a batch it should run `batchRunner` only once but might run multiple times under certain conditions;
- adds support for testing against the `ProcessingTime(intervalMS > 0)` trigger and `AdvanceManualClock`, by specifying them as fields for `StartStream`, and by adding an `AdvanceClock` action;
- adds a test, which takes advantage of the new `StartStream` and `AdvanceManualClock`, to test against [PR#[SPARK-14942] Reduce delay between batch construction and execution ](#12725).

## How was this patch tested?

N/A

Author: Liwei Lin <[email protected]>

Closes #12797 from lw-lin/add-trigger-test-support.

(cherry picked from commit e597ec6)
Signed-off-by: Shixiong Zhu <[email protected]>
outputMode: OutputMode,
checkpointLocation: String,
currentBatchId: Long)
val currentBatchId: Long)
Copy link
Contributor Author

@lw-lin lw-lin May 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's expose this to test suits

@SparkQA
Copy link

SparkQA commented May 7, 2016

Test build #58051 has finished for PR 12725 at commit d4cd47a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 7, 2016

Test build #58054 has finished for PR 12725 at commit a72423b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lw-lin
Copy link
Contributor Author

lw-lin commented May 7, 2016

Now that the manual timed executor patch has been merged, and I've addressed comments and expanded tests for this patch -- @zsxwing would you mind taking another look? Thanks!

@lw-lin
Copy link
Contributor Author

lw-lin commented May 9, 2016

@zsxwing would you take another look? Thanks!

StopStream,
StartStream(ProcessingTime("10 seconds"), new ManualClock),

/* -- batch 1 rerun ----------------- */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can avoid to rerun a batch that has already finished before stopping. How about storing the offsets after finishing a batch instead of storing it before running a batch? @marmbrus what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failure is the rare case, so I don't think its that bad to rerun if it reduces the complexity of the implementation.

@lw-lin
Copy link
Contributor Author

lw-lin commented May 16, 2016

@marmbrus @zsxwing maybe this is ready to go? Thanks!

@SparkQA
Copy link

SparkQA commented May 16, 2016

Test build #2986 has finished for PR 12725 at commit a72423b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented May 16, 2016

LGTM. Merging to master / 2.0. Thanks, @lw-lin

@asfgit asfgit closed this in 95f4fba May 16, 2016
asfgit pushed a commit that referenced this pull request May 16, 2016
… and execution

## Problem

Currently in `StreamExecution`, [we first run the batch, then construct the next](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L165):
```scala
if (dataAvailable) runBatch()
constructNextBatch()
```

This is good when we run batches ASAP, where data would get processed in the **very next batch**:

![1](https://cloud.githubusercontent.com/assets/15843379/14779964/2786e698-0b0d-11e6-9d2c-bb41513488b2.png)

However, when we run batches at trigger like `ProcessTime("1 minute")`, data - such as _y_ below - may not get processed in the very next batch i.e. _batch 1_, but in _batch 2_:

![2](https://cloud.githubusercontent.com/assets/15843379/14779818/6f3bb064-0b0c-11e6-9f16-c1ce4897186b.png)

## What changes were proposed in this pull request?

This patch reverses the order of `constructNextBatch()` and `runBatch()`. After this patch, data would get processed in the **very next batch**, i.e. _batch 1_:

![3](https://cloud.githubusercontent.com/assets/15843379/14779816/6f36ee62-0b0c-11e6-9e53-bc8397fade18.png)

In addition, this patch alters when we do `currentBatchId += 1`: let's do that when the processing of the current batch's data is completed, so we won't bother passing `currentBatchId + 1` or  `currentBatchId - 1` to states or sinks.

## How was this patch tested?

New added test case. Also this should be covered by existing test suits, e.g. stress tests and others.

Author: Liwei Lin <[email protected]>

Closes #12725 from lw-lin/construct-before-run-3.

(cherry picked from commit 95f4fba)
Signed-off-by: Shixiong Zhu <[email protected]>
@lw-lin lw-lin deleted the construct-before-run-3 branch June 11, 2016 03:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants