-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-13809][SQL] State store for streaming aggregations #11645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #52897 has finished for PR 11645 at commit
|
|
Test build #52898 has finished for PR 11645 at commit
|
| } | ||
| } | ||
|
|
||
| private def remove(storeId: StateStoreId): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we also need synchronized for remove()?
|
Test build #52988 has finished for PR 11645 at commit
|
|
Test build #53064 has finished for PR 11645 at commit
|
|
Test build #53070 has finished for PR 11645 at commit
|
|
Test build #53085 has finished for PR 11645 at commit
|
|
Test build #53146 has finished for PR 11645 at commit
|
|
Test build #53147 has finished for PR 11645 at commit
|
|
|
||
| override protected def getPartitions: Array[Partition] = dataRDD.partitions | ||
| override def getPreferredLocations(partition: Partition): Seq[String] = { | ||
| Seq.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not you be using preffered location here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Its still WIP. I need to add enable StateStoreCoordinator for this, which is what I am working on now.
|
Test build #53178 has finished for PR 11645 at commit
|
| * to ensure re-executed RDD operations re-apply updates on the correct past version of the | ||
| * store. | ||
| */ | ||
| class HDFSBackedStateStoreProvider( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note to self: private[state]
|
Test build #53805 has finished for PR 11645 at commit
|
|
Test build #53794 has finished for PR 11645 at commit
|
|
Test build #53793 has finished for PR 11645 at commit
|
| maintenanceTask.cancel(false) | ||
| maintenanceTask = null | ||
| } | ||
| logInfo("StateStore stopped") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to call To allow this companion object to be reused, it should not be shutdown.maintenanceTaskExecutor.shutdown before this line.
|
Test build #53804 has finished for PR 11645 at commit
|
|
|
||
| /** | ||
| * Create a reference to a [[StateStoreCoordinator]], This can be called from driver as well as | ||
| * executors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can not be called from executors as creating StateStoreCoordinator will succeed even if there is one StateStoreCoordinator in driver.
|
Test build #53791 has finished for PR 11645 at commit
|
| } | ||
| } | ||
|
|
||
| private[state] object HDFSBackedStateStoreProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Remove this
|
Test build #53810 has finished for PR 11645 at commit
|
|
|
||
| test("distributed test") { | ||
| quietly { | ||
| withSpark(new SparkContext(sparkConf.setMaster("local-cluster[2, 1, 1024]"))) { sc => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should clone sparkConf
| if (keySize == -1) { | ||
| eof = true | ||
| } else if (keySize < 0) { | ||
| throw new Exception( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: IOException
|
Looks good overall. Just some bits On Tue, Mar 22, 2016 at 5:53 PM Apache Spark QA [email protected]
|
|
Test build #53845 has finished for PR 11645 at commit
|
|
Test build #53851 has finished for PR 11645 at commit
|
|
Test build #53850 has finished for PR 11645 at commit
|
|
Test build #53857 has finished for PR 11645 at commit
|
This PR adds the ability to perform aggregations inside of a `ContinuousQuery`. In order to implement this feature, the planning of aggregation has augmented with a new `StatefulAggregationStrategy`. Unlike batch aggregation, stateful-aggregation uses the `StateStore` (introduced in #11645) to persist the results of partial aggregation across different invocations. The resulting physical plan performs the aggregation using the following progression: - Partial Aggregation - Shuffle - Partial Merge (now there is at most 1 tuple per group) - StateStoreRestore (now there is 1 tuple from this batch + optionally one from the previous) - Partial Merge (now there is at most 1 tuple per group) - StateStoreSave (saves the tuple for the next batch) - Complete (output the current result of the aggregation) The following refactoring was also performed to allow us to plug into existing code: - The get/put implementation is taken from #12013 - The logic for breaking down and de-duping the physical execution of aggregation has been move into a new pattern `PhysicalAggregation` - The `AttributeReference` used to identify the result of an `AggregateFunction` as been moved into the `AggregateExpression` container. This change moves the reference into the same object as the other intermediate references used in aggregation and eliminates the need to pass around a `Map[(AggregateFunction, Boolean), Attribute]`. Further clean up (using a different aggregation container for logical/physical plans) is deferred to a followup. - Some planning logic is moved from the `SessionState` into the `QueryExecution` to make it easier to override in the streaming case. - The ability to write a `StreamTest` that checks only the output of the last batch has been added to simulate the future addition of output modes. Author: Michael Armbrust <[email protected]> Closes #12048 from marmbrus/statefulAgg.
What changes were proposed in this pull request?
In this PR, I am implementing a new abstraction for management of streaming state data - State Store. It is a key-value store for persisting running aggregates for aggregate operations in streaming dataframes. The motivation and design is discussed here.
https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254/edit#
How was this patch tested?
Coverage from unit tests

## TODO - [x] Fix updates() iterator to avoid duplicate updates for same key - [x] Use Coordinator in ContinuousQueryManager - [x] Plugging in hadoop conf and other confs - [x] Unit tests - [x] StateStore object lifecycle and methods - [x] StateStoreCoordinator communication and logic - [x] StateStoreRDD fault-tolerance - [x] StateStoreRDD preferred location using StateStoreCoordinator - [ ] Cluster tests - [ ] Whether preferred locations are set correctly - [ ] Whether recovery works correctly with distributed storage - [x] Basic performance tests - [x] Docs