Skip to content

Conversation

@uncleGen
Copy link
Contributor

@uncleGen uncleGen commented May 6, 2019

What changes were proposed in this pull request?

Enable query progress reporting in continuous mode.

  • add two new trait: MicroBatchProgressReporter and ContinuousProgressReporter
  • two stage duration in continuous query, including queryPlanning and walCommit. Especially, the queryPlanning is a static and wont change in each epoch. It only happens once time at the beginning of query.

How was this patch tested?

update existing uts

@SparkQA
Copy link

SparkQA commented May 6, 2019

Test build #105155 has finished for PR 24537 at commit d11b954.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 7, 2019

Test build #105187 has finished for PR 24537 at commit 8eeb526.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@uncleGen
Copy link
Contributor Author

uncleGen commented May 7, 2019

cc @jose-torres @tdas

reportTimeTaken("walCommit", epoch) {
synchronized {
// Record offsets before updating `committedOffsets`
recordTriggerOffsets(from = committedOffsets, to = availableOffsets, epoch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The availableOffsets seems never updated in continuous processing, maybe we should take this into consideration when report metrics.

currentTriggerEndOffsets.remove(earliestEpochId)
}

earliestEpochId += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like, the earliestEpochId will be also updated even no data removed from the metrics maps, this may cause some old data never got a chance to be removed.

val currentTriggerStartTimestamp = currentDurationsMs(epochId)._1
val currentTriggerEndTimestamp = triggerClock.getTimeMillis()

val executionStats = extractExecutionStats(hasNewData, Some(epochStats))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the origin extractExecutionStats method collects information from the SQL metrics (e.g. state operator metrics and watermark) which is reported only when task ended for now, is there a plan to handle this part of data?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 30, 2019
@github-actions github-actions bot closed this Dec 31, 2019
@bqiang-stackadapt
Copy link

@uncleGen @ivoson Hello folks, I'm wondering what's the final decision on this. Because ContinuousExecution not calling finishTrigger so streaming metrics are not updated in continuous structured streaming. Thank you very much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants