Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Sep 29, 2019

What changes were proposed in this pull request?

Credits to @tdas who reported and described the fix to SPARK-26425. I just followed the description of the issue.

This patch adds more checks on commit log as well as file streaming source so that multiple concurrent runs of streaming query don't mess up the status of query/checkpoint. This patch addresses two different spots which are having a bit different issues:

  1. FileStreamSource.fetchMaxOffset()

In structured streaming, we don't allow multiple streaming queries to run with same checkpoint (including concurrent runs of same query), so query should fail if it fails to write the metadata of specific batch ID due to same batch ID being written by others.

  1. commit log

As described in JIRA issue, assertion is already applied to the offsetLog for the same reason.

if (shouldConstructNextBatch) {
// Commit the next batch offset range to the offset log
updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
assert(offsetLog.add(currentBatchId,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId. " +
s"Metadata ${offsetSeqMetadata.toString}")

This patch applied the same for commit log.

Why are the changes needed?

This prevents the inconsistent behavior on streaming query and lets query fail instead.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

N/A, as the change is simple and obvious, and it's really hard to artificially reproduce the issue.

@HeartSaVioR
Copy link
Contributor Author

I'm also feeling that commitLog can be also guarded for same, but want to be sure before updating the patch.

withProgressLocked {
sinkCommitProgress = batchSinkProgress
watermarkTracker.updateWatermark(lastExecution.executedPlan)
commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
committedOffsets ++= availableOffsets
}
logDebug(s"Completed batch ${currentBatchId}")

@HeartSaVioR
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Sep 29, 2019

Test build #111570 has finished for PR 25965 at commit 726d920.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

@tdas @zsxwing @jose-torres @gaborgsomogyi Kindly reminder.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jan 13, 2020

Test build #116587 has finished for PR 25965 at commit 726d920.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 14, 2020

Test build #121235 has finished for PR 25965 at commit 726d920.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 30, 2020

Test build #122134 has finished for PR 25965 at commit 726d920.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 21, 2020

Test build #122934 has finished for PR 25965 at commit 1744179.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR HeartSaVioR changed the title [SPARK-26425][SS] Add more constraint checks in file streaming source to avoid checkpoint corruption [SPARK-26425][SS] Add more constraint checks to avoid checkpoint corruption May 26, 2020
@HeartSaVioR
Copy link
Contributor Author

Rebased - the part of this PR was resolved via SPARK-30915. I've updated to address commit log instead.

@SparkQA
Copy link

SparkQA commented May 27, 2020

Test build #123147 has finished for PR 25965 at commit d15acef.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented May 27, 2020

Test build #123157 has finished for PR 25965 at commit d15acef.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented May 28, 2020

Test build #123211 has finished for PR 25965 at commit d15acef.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented May 28, 2020

Test build #123221 has finished for PR 25965 at commit d15acef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jun 14, 2020

Test build #123992 has finished for PR 25965 at commit d15acef.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

 pyspark.mllib.tests.test_streaming_algorithms.StreamingLinearRegressionWithTests.test_train_prediction

 Error Details

1.672640157855923 not greater than 2

 Stack Trace

Traceback (most recent call last):
  File "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 466, in test_train_prediction
    eventually(condition, timeout=180.0)
  File "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/testing/utils.py", line 81, in eventually
    lastValue = condition()
  File "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 461, in condition
    self.assertGreater(errors[1] - errors[-1], 2)
AssertionError: 1.672640157855923 not greater than 2

Known flaky test.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jun 14, 2020

Test build #124003 has finished for PR 25965 at commit d15acef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jul 13, 2020

Test build #125731 has finished for PR 25965 at commit d15acef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Aug 18, 2020

Test build #127538 has finished for PR 25965 at commit d15acef.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Aug 18, 2020

Test build #127553 has finished for PR 25965 at commit d15acef.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Aug 18, 2020

Test build #127561 has finished for PR 25965 at commit d15acef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 15, 2020

Test build #128679 has finished for PR 25965 at commit d15acef.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 15, 2020

Test build #128704 has finished for PR 25965 at commit d15acef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. This change is simple and straightforward.

watermarkTracker.updateWatermark(lastExecution.executedPlan)
commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
assert(commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to the log -> update to the commit log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Addressed.

@SparkQA
Copy link

SparkQA commented Sep 16, 2020

Test build #128745 has finished for PR 25965 at commit a885212.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 16, 2020

Test build #128753 has finished for PR 25965 at commit a885212.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Sep 16, 2020

Again, this is simple and straightforward change. Tests are passed. If no objection, I think we can move forward to merge this.

@HeartSaVioR
Copy link
Contributor Author

Thanks for reviewing. This PR didn't have any valid review comments in 1 year despite of mentioning, so I don't expect more reviews. I'll go ahead merging.

@HeartSaVioR
Copy link
Contributor Author

Merged into master branch.

@HeartSaVioR HeartSaVioR deleted the SPARK-26425 branch September 17, 2020 00:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants