Skip to content

Conversation

@marmbrus
Copy link
Contributor

This PR adds the ability to perform aggregations inside of a ContinuousQuery. In order to implement this feature, the planning of aggregation has augmented with a new StatefulAggregationStrategy. Unlike batch aggregation, stateful-aggregation uses the StateStore (introduced in #11645) to persist the results of partial aggregation across different invocations. The resulting physical plan performs the aggregation using the following progression:

  • Partial Aggregation
  • Shuffle
  • Partial Merge (now there is at most 1 tuple per group)
  • StateStoreRestore (now there is 1 tuple from this batch + optionally one from the previous)
  • Partial Merge (now there is at most 1 tuple per group)
  • StateStoreSave (saves the tuple for the next batch)
  • Complete (output the current result of the aggregation)

The following refactoring was also performed to allow us to plug into existing code:

  • The get/put implementation is taken from [SPARK-14214][SQL] Update state to provide get/put interface #12013
  • The logic for breaking down and de-duping the physical execution of aggregation has been move into a new pattern PhysicalAggregation
  • The AttributeReference used to identify the result of an AggregateFunction as been moved into the AggregateExpression container. This change moves the reference into the same object as the other intermediate references used in aggregation and eliminates the need to pass around a Map[(AggregateFunction, Boolean), Attribute]. Further clean up (using a different aggregation container for logical/physical plans) is deferred to a followup.
  • Some planning logic is moved from the SessionState into the QueryExecution to make it easier to override in the streaming case.
  • The ability to write a StreamTest that checks only the output of the last batch has been added to simulate the future addition of output modes.

@SparkQA
Copy link

SparkQA commented Mar 29, 2016

Test build #54469 has finished for PR 12048 at commit 7a5e0ae.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 30, 2016

Test build #54470 has finished for PR 12048 at commit 29355db.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 30, 2016

Test build #54485 has finished for PR 12048 at commit 6aeb27a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor Author

Test this please

@marmbrus
Copy link
Contributor Author

test this please

@SparkQA
Copy link

SparkQA commented Mar 30, 2016

Test build #54543 has finished for PR 12048 at commit 6aeb27a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 30, 2016

Test build #2710 has finished for PR 12048 at commit 6aeb27a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor Author

@tdas the maintenance test failed twice on this PR, but not the third time.

@yhuai
Copy link
Contributor

yhuai commented Mar 30, 2016

Aggregation part looks good to me.

} catch {
case NonFatal(e) =>
failTest(message, e)
if (!condition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i had written this in this way so that if there are any errors in the lazy eval of condition that gets caught and message printed correctly. Happened to me a few times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All uses of verify are just doing equality checks with variables (and thus can't throw), except for those that were modified specifically such that that were going to throw exceptions upon failure. So I think really the problem is overloading what was a simple assert to be an error handler.

The issue this this construction is it now pollutes the output with the obvious:

condition was false
[info]   org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
[info]          org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
[info]          org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
[info]          org.apache.spark.sql.StreamTest$class.verify$1(StreamTest.scala:228)
[info]          org.apache.spark.sql.StreamTest$$anonfun$testStream$1.apply(StreamTest.scala:355)
[info]          org.apache.spark.sql.StreamTest$$anonfun$testStream$1.apply(StreamTest.scala:271)
[info]          scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]          scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]          org.apache.spark.sql.StreamTest$class.testStream(StreamTest.scala:271)
[info]          org.apache.spark.sql.streaming.StreamSuite.testStream(StreamSuite.scala:24)

@asfgit asfgit closed this in 0fc4aaa Apr 1, 2016
@marmbrus
Copy link
Contributor Author

marmbrus commented Apr 1, 2016

Thanks, I'm going to merge to master and will address further comments in follow-ups.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants