Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Jul 4, 2019

What changes were proposed in this pull request?

This patch fixes the flaky test "query without test harness" on ContinuousSuite, via adding some more gaps on waiting query to commit the epoch which writes output rows.

The observation of this issue is below (injected some debug logs to get them):

reader creation time                                   1562225320210
epoch 1 launched                                       1562225320593 (+380ms from reader creation time)
epoch 13 launched                                      1562225321702 (+1.5s from reader creation time)
partition reader creation time                         1562225321715 (+1.5s from reader creation time)

next read time for first next call                     1562225321210 (+1s from reader creation time)
first next called in partition reader                  1562225321746 (immediately after creation of partition reader)
wait finished in next called in partition reader       1562225321746 (no wait)

second next called in partition reader                 1562225321747 (immediately after first next())

epoch 0 commit started                                 1562225321861

writing rows (0, 1) (belong to epoch 13)               1562225321866 (+100ms after first next())

wait start in waitForRateSourceTriggers(2)             1562225322059

next read time for second next call                    1562225322210 (+1s from previous "next read time")
wait finished in next called in partition reader       1562225322211 (+450ms wait)

writing rows (2, 3) (belong to epoch 13)               1562225322211 (immediately after next())

epoch 14 launched                                      1562225322246

desired wait time in waitForRateSourceTriggers(2)      1562225322510 (+2.3s from reader creation time)

epoch 12 committed                                     1562225323034

These rows were written within desired wait time, but the epoch 13 couldn't be committed within it. Interestingly, epoch 12 was lucky to be committed within a gap between finished waiting in waitForRateSourceTriggers and query.stop() - but even suppose the rows were written in epoch 12, it would be just in luck and epoch should be committed within desired wait time.

This patch modifies Rate continuous stream to track the highest committed value, so that test can wait until desired value is reported to the stream as committed.

This patch also modifies Rate continuous stream to track the timestamp at stream gets the first committed offset, and let waitForRateSourceTriggers use the timestamp. This also relies on waiting for specific period, but safer approach compared to current based on the observation above. Based on the change, this patch saves couple of seconds in test time.

How was this patch tested?

10 sequential test runs succeeded locally.

@HeartSaVioR
Copy link
Contributor Author

I was trying to find the way to get the time when partition reader is created, but it doesn't look like easy since it's running on executor whereas we're tracking from driver.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jul 4, 2019

cc. @dongjoon-hyun Here's the patch for the flaky test failure we found in #24996.
cc. @jose-torres as he authored most of parts on continuous processing, including this test.
cc. @gaborgsomogyi in case he might be interested.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

1 similar comment
@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jul 4, 2019

Test build #107221 has finished for PR 25048 at commit 299f629.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 4, 2019

Test build #107222 has finished for PR 25048 at commit 299f629.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 4, 2019

Test build #107223 has finished for PR 25048 at commit 299f629.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Let me run couple of builds again, as it fixes flaky test.

@HeartSaVioR
Copy link
Contributor Author

Retest this, please

2 similar comments
@HeartSaVioR
Copy link
Contributor Author

Retest this, please

@HeartSaVioR
Copy link
Contributor Author

Retest this, please

// Since previous epochs should be committed before to commit the epoch which output rows
// are written, slow initialization of partition reader and tiny trigger interval leads
// output rows to wait long time to be committed.
val deltaMs = numTriggers * 1000 + 3000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for investigating this, @HeartSaVioR .
The original logic looks not safe. Do you think if there is any other better way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put the test in a loop left it for couple of hours and haven't seen any problem.
On the other hand I agree with @dongjoon-hyun, the original waitForaWhile approach is not safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I can perfectly track when these rows are committed (the safest way), as we are checking this from driver side whereas things are delayed in executor side.

But given the main delay is occurred before first epoch being committed, instead of adding more gaps, we may be able to add 2s (which would be enough for 4 rows to be emitted in rate source) after waiting first epoch to be committed. I'll make a change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. I found we can track the highest committed value, so we can wait for specific value to be committed. Updated the patch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!

@SparkQA
Copy link

SparkQA commented Jul 4, 2019

Test build #107228 has finished for PR 25048 at commit 299f629.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 4, 2019

Test build #107230 has finished for PR 25048 at commit 299f629.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 4, 2019

Test build #107231 has finished for PR 25048 at commit 299f629.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

query match {
findRateStreamContinuousStream(query).foreach { reader =>
val deltaMs = numTriggers * 1000 + 300
val firstCommittedTime = reader.firstCommittedTime.longValue()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is based on the observation: while reader.creationTime doesn't guarantee partition readers are initialized. Instead, reader.firstCommittedTime guarantees partition readers are initialized before, so we can capture and use this.
(This is a best-effort on driver side, not fastest approach, of course.)

}
}

protected def waitForRateSourceCommittedValue(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safest approach to expect some rows to produce outputs. We still need to have max time to wait, since it may block infinitely in case of bugs.

val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble

val highestCommittedValue = new AtomicLong(Long.MinValue)
val firstCommittedTime = new AtomicLong(Long.MinValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, do we need to add these variable only for testing? Is there any other valuable information about these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually yes, and creationTime is also only used from tests. Rate stream is designed for test and evaluation purpose, so it seems OK to modify rate stream to support tests.

Btw, I'll adjust the scope to private[sql], as it doesn't need to be exposed outside of Spark.

StartStream(longContinuousTrigger),
AwaitEpoch(0),
Execute(waitForRateSourceTriggers(_, 10)),
Execute(waitForRateSourceTriggers(_, 5)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change saves couple of seconds in my machine.

StartStream(Trigger.Continuous(2012)),
AwaitEpoch(0),
Execute(waitForRateSourceTriggers(_, 10)),
Execute(waitForRateSourceTriggers(_, 5)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

}

override def commit(end: Offset): Unit = {}
override def commit(end: Offset): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change shouldn't bring noticeable perf hit, as it is only called per epoch which interval would be at least hundreds of milliseconds.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jul 5, 2019

Test build #107248 has finished for PR 25048 at commit 2cadef3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@dongjoon-hyun
Copy link
Member

Thank you for working on this. In addition, you know, our Jenkins environment seems to be sometimes slow and under very heavy load. So, that might happen more frequently than other environment (like local Mac).

val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble

private[sql] val highestCommittedValue = new AtomicLong(Long.MinValue)
private[sql] val firstCommittedTime = new AtomicLong(Long.MinValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have a doubt on the value of these. We usually don't add a variable like this just for testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I commented earlier, creationTime in this class above is one of fields just for testing.

Unlike microbatch mode, it's not easy to track and capture the progress. That might be the reason we were relying on wait time. I totally agree the change is not beauty, but given rate source is for testing/benchmarking purpose, and end users don't deal with RateStreamContinuousStream directly, we might be just OK with it.

If it's still a thing for us to be concerned, please let me know, and we can rollback the change and just add more seconds in wait time to make test be higher rate to be passed.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi Jul 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first approach is definitely more ugly than this. Just a question (without having a deep look) is there a metric maybe which can be used? Or just add something like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream query listener is not ready for continuous processing. #24537 is trying to address it, but seems like it's not ready. Actually the query listener mainly reports per batch to show changing numbers, and if we apply this to continuous processing, the report could be far behind if there's skew among partitions and epochs for partitions differ.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you subclass this for testing and only add the new fields in the subclass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also require a new data source provider, as we just use it by the name of data source.

The change would bring the test version of RateStreamProvider, and test version of RateStreamTable (maybe subclassing to deduplicate), and test version of RateStreamContinuousStream. I'd like to confirm whether it's OK to apply the change since the changeset is going to be bigger.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, hmm, it's not instantiated directly.
I'm also uneasy about adding this just for a test; is there any way to avoid this while fixing the test, even if it means the test can't test as much?
I'm not super against this though. I'd mark it as visible for testing and comment of course.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK maybe I can try to find alternative approach first and stick with this only when I can't find one. I didn't do that since alternatives would be pretty less simpler, but I feel we would like to avoid the change like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I should admit that I just realized creationTime is not only used for testing. My bad. Sorry about the missing.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jul 7, 2019

UPDATE: 5000 times with no failure among with 10 machines so stopped it.

Regarding inconsistency under heavy load, that's not test could deal with it, as it means continuous processing (or rate continuous source) is inconsistent. Trying to stabilize the test for that case just leads the test to rerun itself or wait for more time, so IMHO we have to live with it.

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented Jul 7, 2019

Just FYI initially I've reproduced the exception on my local Mac without high load.

I think my approach ended-up in additional problem because I've reused the SparkContext. Namely my test was to wrap the original code like this:

while (true) {
  // original test code
}

I've changed the approach to validate it in the following way:

  1. The loop is now in bash script on my local Mac => 3k executions and no issue
  2. The loop is now in bash script on cloud machine => 3k executions and no issue

Reacting to @dongjoon-hyun comment both cases added high CPU load on machines.

All in all the latest patch died only when SparkContext is not recreated which is not realistic on jenkins.

val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble

private[sql] val highestCommittedValue = new AtomicLong(Long.MinValue)
private[sql] val firstCommittedTime = new AtomicLong(Long.MinValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you subclass this for testing and only add the new fields in the subclass?

override def commit(end: Offset): Unit = {
end.asInstanceOf[RateStreamOffset].partitionToValueAndRunTimeMs.foreach {
case (_, ValueRunTimeMsPair(value, _)) =>
if (highestCommittedValue.get() < value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the 'atomic' part is essential here, but if it is, I think you have a race condition here. You'd want to use updateAndGet or something to make sure the check and update are atomic.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jul 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out! Only one writer and one reader will run concurrently. I'm revisiting the change, and it looks like just over-engineering. volatile would just work. If reader is reading the old value they just need to wait a bit more, so not strictly need to have atomicity on update. I'll make a change.

highestCommittedValue.set(value)
}
}
firstCommittedTime.compareAndSet(Long.MinValue, System.currentTimeMillis())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here: one writer and one reader, and we have alternative logic (in waitForRateSourceTriggers) when reader reads old value so atomicity is not strictly needed.

@HeartSaVioR
Copy link
Contributor Author

I finally dealt it without touching rate source. Please take a look again. I'm not sure I can run the same long-run test sooner, but the test would be running similar.

@SparkQA
Copy link

SparkQA commented Jul 9, 2019

Test build #107379 has finished for PR 25048 at commit faa41b5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

I've had a look and the change looks good. Starting a long running test...

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests passed. LGTM, only a nit found.

val results = spark.read.table("noharness").collect()
assert(Set(0, 1, 2, 3).map(Row(_)).subsetOf(results.toSet))
assert(expected.map(Row(_)).subsetOf(results.toSet),
s"Result set ${results.toSet} are not a superset of $expected!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indent

@HeartSaVioR
Copy link
Contributor Author

@dongjoon-hyun @srowen Could we have another round of review? I guess we've resolved the issue on weird shape and now we're good to go.

while (System.currentTimeMillis < reader.creationTime + deltaMs) {
Thread.sleep(reader.creationTime + deltaMs - System.currentTimeMillis)
while (System.currentTimeMillis < firstCommittedTime + deltaMs) {
Thread.sleep(firstCommittedTime + deltaMs - System.currentTimeMillis)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could cause an exception if the current time advanced between the check and the sleep, as a negative argument will cause an exception. Maybe:

var toWaitMS = firstCommittedTime + deltaMs - System.currentTimeMillis
while (toWaitMS > 0) {
  Thread.sleep(toWaitMS)
  toWaitMS = firstCommittedTime + deltaMs - System.currentTimeMillis
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I guess technically you should use System.nanoTime here but it probably won't matter)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right we're calling System.currentTimeMillis twice which might not be the same, so possible to be negative. Nice finding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

}

query match {
case c: ContinuousExecution =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make the method take a ContinuousExecution if that's all it can meaningfully handle. (Cast in the caller if needed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make callers a bit more verbose than handling it from here, but not a big deal and better in perspective of avoiding no-op when misused. I'll make a change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

@SparkQA
Copy link

SparkQA commented Jul 10, 2019

Test build #107465 has finished for PR 25048 at commit aa932bc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 10, 2019

Test build #107479 has finished for PR 25048 at commit 6706a65.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

@dongjoon-hyun Would you mind taking another look, please? Thanks in advance!

@srowen
Copy link
Member

srowen commented Jul 13, 2019

Merged to master. It doesn't pick cleanly into 2.4; if it's important there, a back-port would be fine.

@srowen srowen closed this in b5a9baa Jul 13, 2019
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging! I'll see whether it can (and needs to) be ported back, and try other PR for the 2.4 if it does.

HeartSaVioR added a commit to HeartSaVioR/spark that referenced this pull request Jul 14, 2019
…inuousSuite

This patch fixes the flaky test "query without test harness" on ContinuousSuite, via adding some more gaps on waiting query to commit the epoch which writes output rows.

The observation of this issue is below (injected some debug logs to get them):

```
reader creation time                                   1562225320210
epoch 1 launched                                       1562225320593 (+380ms from reader creation time)
epoch 13 launched                                      1562225321702 (+1.5s from reader creation time)
partition reader creation time                         1562225321715 (+1.5s from reader creation time)

next read time for first next call                     1562225321210 (+1s from reader creation time)
first next called in partition reader                  1562225321746 (immediately after creation of partition reader)
wait finished in next called in partition reader       1562225321746 (no wait)

second next called in partition reader                 1562225321747 (immediately after first next())

epoch 0 commit started                                 1562225321861

writing rows (0, 1) (belong to epoch 13)               1562225321866 (+100ms after first next())

wait start in waitForRateSourceTriggers(2)             1562225322059

next read time for second next call                    1562225322210 (+1s from previous "next read time")
wait finished in next called in partition reader       1562225322211 (+450ms wait)

writing rows (2, 3) (belong to epoch 13)               1562225322211 (immediately after next())

epoch 14 launched                                      1562225322246

desired wait time in waitForRateSourceTriggers(2)      1562225322510 (+2.3s from reader creation time)

epoch 12 committed                                     1562225323034
```

These rows were written within desired wait time, but the epoch 13 couldn't be committed within it. Interestingly, epoch 12 was lucky to be committed within a gap between finished waiting in waitForRateSourceTriggers and query.stop() - but even suppose the rows were written in epoch 12, it would be just in luck and epoch should be committed within desired wait time.

This patch modifies Rate continuous stream to track the highest committed value, so that test can wait until desired value is reported to the stream as committed.

This patch also modifies Rate continuous stream to track the timestamp at stream gets the first committed offset, and let `waitForRateSourceTriggers` use the timestamp. This also relies on waiting for specific period, but safer approach compared to current based on the observation above. Based on the change, this patch saves couple of seconds in test time.

10 sequential test runs succeeded locally.

Closes apache#25048 from HeartSaVioR/SPARK-28247.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
@HeartSaVioR HeartSaVioR deleted the SPARK-28247 branch July 15, 2019 01:11
vinodkc pushed a commit to vinodkc/spark that referenced this pull request Jul 18, 2019
…inuousSuite

## What changes were proposed in this pull request?

This patch fixes the flaky test "query without test harness" on ContinuousSuite, via adding some more gaps on waiting query to commit the epoch which writes output rows.

The observation of this issue is below (injected some debug logs to get them):

```
reader creation time                                   1562225320210
epoch 1 launched                                       1562225320593 (+380ms from reader creation time)
epoch 13 launched                                      1562225321702 (+1.5s from reader creation time)
partition reader creation time                         1562225321715 (+1.5s from reader creation time)

next read time for first next call                     1562225321210 (+1s from reader creation time)
first next called in partition reader                  1562225321746 (immediately after creation of partition reader)
wait finished in next called in partition reader       1562225321746 (no wait)

second next called in partition reader                 1562225321747 (immediately after first next())

epoch 0 commit started                                 1562225321861

writing rows (0, 1) (belong to epoch 13)               1562225321866 (+100ms after first next())

wait start in waitForRateSourceTriggers(2)             1562225322059

next read time for second next call                    1562225322210 (+1s from previous "next read time")
wait finished in next called in partition reader       1562225322211 (+450ms wait)

writing rows (2, 3) (belong to epoch 13)               1562225322211 (immediately after next())

epoch 14 launched                                      1562225322246

desired wait time in waitForRateSourceTriggers(2)      1562225322510 (+2.3s from reader creation time)

epoch 12 committed                                     1562225323034
```

These rows were written within desired wait time, but the epoch 13 couldn't be committed within it. Interestingly, epoch 12 was lucky to be committed within a gap between finished waiting in waitForRateSourceTriggers and query.stop() - but even suppose the rows were written in epoch 12, it would be just in luck and epoch should be committed within desired wait time.

This patch modifies Rate continuous stream to track the highest committed value, so that test can wait until desired value is reported to the stream as committed.

This patch also modifies Rate continuous stream to track the timestamp at stream gets the first committed offset, and let `waitForRateSourceTriggers` use the timestamp. This also relies on waiting for specific period, but safer approach compared to current based on the observation above. Based on the change, this patch saves couple of seconds in test time.

## How was this patch tested?

10 sequential test runs succeeded locally.

Closes apache#25048 from HeartSaVioR/SPARK-28247.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants