Skip to content

Commit 8c82688

Browse files
committed
[SPARK-13809][SQL] State store for streaming aggregations
## What changes were proposed in this pull request? In this PR, I am implementing a new abstraction for management of streaming state data - State Store. It is a key-value store for persisting running aggregates for aggregate operations in streaming dataframes. The motivation and design is discussed here. https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254/edit# ## How was this patch tested? - [x] Unit tests - [x] Cluster tests **Coverage from unit tests** <img width="952" alt="screen shot 2016-03-21 at 3 09 40 pm" src="https://cloud.githubusercontent.com/assets/663212/13935872/fdc8ba86-ef76-11e5-93e8-9fa310472c7b.png"> ## TODO - [x] Fix updates() iterator to avoid duplicate updates for same key - [x] Use Coordinator in ContinuousQueryManager - [x] Plugging in hadoop conf and other confs - [x] Unit tests - [x] StateStore object lifecycle and methods - [x] StateStoreCoordinator communication and logic - [x] StateStoreRDD fault-tolerance - [x] StateStoreRDD preferred location using StateStoreCoordinator - [ ] Cluster tests - [ ] Whether preferred locations are set correctly - [ ] Whether recovery works correctly with distributed storage - [x] Basic performance tests - [x] Docs Author: Tathagata Das <[email protected]> Closes #11645 from tdas/state-store.
1 parent 0a64294 commit 8c82688

File tree

11 files changed

+2052
-0
lines changed

11 files changed

+2052
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.collection.mutable
2121

2222
import org.apache.spark.annotation.Experimental
2323
import org.apache.spark.sql.execution.streaming.{ContinuousQueryListenerBus, Sink, StreamExecution}
24+
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
2425
import org.apache.spark.sql.util.ContinuousQueryListener
2526

2627
/**
@@ -33,6 +34,8 @@ import org.apache.spark.sql.util.ContinuousQueryListener
3334
@Experimental
3435
class ContinuousQueryManager(sqlContext: SQLContext) {
3536

37+
private[sql] val stateStoreCoordinator =
38+
StateStoreCoordinatorRef.forDriver(sqlContext.sparkContext.env)
3639
private val listenerBus = new ContinuousQueryListenerBus(sqlContext.sparkContext.listenerBus)
3740
private val activeQueries = new mutable.HashMap[String, ContinuousQuery]
3841
private val activeQueriesLock = new Object

0 commit comments

Comments
 (0)