Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
c019474
First draft of StateStore
tdas Mar 10, 2016
4f8dade
Updated tests
tdas Mar 11, 2016
f417bde
Added basic unit test for StateStoreRDD
tdas Mar 11, 2016
d8cee54
Style fix
tdas Mar 11, 2016
7d74c67
Fixed test
tdas Mar 12, 2016
c5dd061
Fixed versioning in StateStoreRDD, and made store updates thread-safe
tdas Mar 14, 2016
7adca70
Style fixes
tdas Mar 14, 2016
a0ba498
Added docs
tdas Mar 14, 2016
48afbe6
Merge remote-tracking branch 'apache-github/master' into state-store
tdas Mar 14, 2016
bee673c
Refactored for new design
tdas Mar 14, 2016
d963efa
Merge remote-tracking branch 'apache-github/master' into HEAD
tdas Mar 14, 2016
22d7e66
Fixed a lot of things
tdas Mar 15, 2016
34ae7ff
Fixed style
tdas Mar 15, 2016
13c29a2
Fixed updates iterator
tdas Mar 15, 2016
d5e2b10
Added unit tests for Coordinator
tdas Mar 15, 2016
f5660d2
Added unit tests and fixed scala style
tdas Mar 15, 2016
d313683
Added unit test for preferred location
tdas Mar 15, 2016
7ea847c
Fixed style
tdas Mar 15, 2016
b8b4632
Added unit test for StateStore background management
tdas Mar 16, 2016
e89f4d0
Updated unit test to test instance unloading in state store
tdas Mar 16, 2016
b5e2421
Updated unit test
tdas Mar 16, 2016
76dd988
Fixed StateStoreRDD unit test
tdas Mar 16, 2016
8123818
Added docs
tdas Mar 16, 2016
2fb5b85
Minor fixes
tdas Mar 16, 2016
dee7a0e
Updated store to UnsafeRow instead of InternalRow
tdas Mar 16, 2016
15e1780
Added custom serialization for delta and added schema to state stores
tdas Mar 17, 2016
b0bd043
state-store-squashed
tdas Mar 17, 2016
e6c7016
Updates based on PR comments
tdas Mar 18, 2016
3fe34e9
Style fixes
tdas Mar 18, 2016
8cb0da8
Some more cleanup
tdas Mar 19, 2016
9fb9c43
Reverting unnecessary changes
tdas Mar 19, 2016
e6b1fb3
Fixed style
tdas Mar 19, 2016
e6f5ab8
Merge remote-tracking branch 'apache-github/master' into state-store
tdas Mar 19, 2016
2bd6cbd
Fixed logging
tdas Mar 19, 2016
25afe31
Updated serialization of snapshot files
tdas Mar 21, 2016
29c2af0
Updated StateStoreCoordinator lifecycle
tdas Mar 21, 2016
32c0139
Added StateStoreCoordinator to ContinuousQueryManager
tdas Mar 21, 2016
3824053
Updated logging
tdas Mar 21, 2016
534ad48
Minor updates
tdas Mar 21, 2016
19a60a6
Addressed comments
tdas Mar 21, 2016
5b7cf53
Added StateStoreConf to address comments
tdas Mar 22, 2016
f4f3838
Updated StateStoreConf
tdas Mar 22, 2016
502e5a5
Addressed comments
tdas Mar 22, 2016
2f29d9a
Merge remote-tracking branch 'apache-github/master' into state-store
tdas Mar 22, 2016
24fb325
Minor style fix
tdas Mar 22, 2016
4752d73
Renamed for consistency
tdas Mar 22, 2016
756762a
Updated comment
tdas Mar 22, 2016
63fad92
Replaced Timer with Scheduled Executor
tdas Mar 22, 2016
b147f59
Style fix
tdas Mar 22, 2016
819ca17
Fixed coordinator bug and added distributed test
tdas Mar 23, 2016
70cc7b1
Addressed more comments
tdas Mar 23, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import org.apache.spark.sql.util.ContinuousQueryListener
@Experimental
class ContinuousQueryManager(sqlContext: SQLContext) {

private[sql] val stateStoreCoordinator = StateStoreCoordinatorRef(sqlContext.sparkContext.env)
private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sqlContext.sparkContext.env)
private val listenerBus = new ContinuousQueryListenerBus(sqlContext.sparkContext.listenerBus)
private val activeQueries = new mutable.HashMap[String, ContinuousQuery]
private val activeQueriesLock = new Object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,3 @@ private[state] class HDFSBackedStateStoreProvider(
}
}

private[state] object HDFSBackedStateStoreProvider {

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.util.ThreadUtils
/** Unique identifier for a [[StateStore]] */
case class StateStoreId(checkpointLocation: String, operatorId: Long, partitionId: Int)


/**
* Base trait for a versioned key-value store used for streaming aggregations
*/
Expand Down Expand Up @@ -84,6 +85,7 @@ trait StateStore {
def hasCommitted: Boolean
}


/** Trait representing a provider of a specific version of a [[StateStore]]. */
trait StateStoreProvider {

Expand All @@ -94,6 +96,7 @@ trait StateStoreProvider {
def doMaintenance(): Unit = { }
}


/** Trait representing updates made to a [[StateStore]]. */
sealed trait StoreUpdate

Expand All @@ -118,7 +121,6 @@ private[state] object StateStore extends Logging {
val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be defined in SQLConf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cannot be defined in SQLConf as this is a executor-wide configuration. I am renaming this to
spark.streaming.stateStore.maintenanceInterval


private val loadedProviders = new mutable.HashMap[StateStoreId, StateStoreProvider]()
private val maintenanceTimer = new Timer("StateStore Timer", true)
private val maintenanceTaskExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task")

Expand Down Expand Up @@ -232,7 +234,7 @@ private[state] object StateStore extends Logging {
val env = SparkEnv.get
if (env != null) {
if (_coordRef == null) {
_coordRef = StateStoreCoordinatorRef(env)
_coordRef = StateStoreCoordinatorRef.forExecutor(env)
}
logDebug(s"Retrieved reference to StateStoreCoordinator: ${_coordRef}")
Some(_coordRef)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,25 @@ private[sql] object StateStoreCoordinatorRef extends Logging {
* Create a reference to a [[StateStoreCoordinator]], This can be called from driver as well as
* executors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can not be called from executors as creating StateStoreCoordinator will succeed even if there is one StateStoreCoordinator in driver.

*/
def apply(env: SparkEnv): StateStoreCoordinatorRef = synchronized {
def forDriver(env: SparkEnv): StateStoreCoordinatorRef = synchronized {
try {
val coordinator = new StateStoreCoordinator(env.rpcEnv)
val coordinatorRef = env.rpcEnv.setupEndpoint(endpointName, coordinator)
logInfo("Registered StateStoreCoordinator endpoint")
new StateStoreCoordinatorRef(coordinatorRef)
} catch {
case e: IllegalArgumentException =>
logDebug("Retrieving existing StateStoreCoordinator endpoint")
val rpcEndpointRef = RpcUtils.makeDriverRef(endpointName, env.conf, env.rpcEnv)
logDebug("Retrieved existing StateStoreCoordinator endpoint")
new StateStoreCoordinatorRef(rpcEndpointRef)
}
}

def forExecutor(env: SparkEnv): StateStoreCoordinatorRef = synchronized {
val rpcEndpointRef = RpcUtils.makeDriverRef(endpointName, env.conf, env.rpcEnv)
logDebug("Retrieved existing StateStoreCoordinator endpoint")
new StateStoreCoordinatorRef(rpcEndpointRef)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {

test("multiple references have same underlying coordinator") {
withCoordinatorRef(sc) { coordRef1 =>
val coordRef2 = StateStoreCoordinatorRef(sc.env)
val coordRef2 = StateStoreCoordinatorRef.forDriver(sc.env)

val id = StateStoreId("x", 0, 0)

Expand All @@ -114,7 +114,7 @@ object StateStoreCoordinatorSuite {
def withCoordinatorRef(sc: SparkContext)(body: StateStoreCoordinatorRef => Unit): Unit = {
var coordinatorRef: StateStoreCoordinatorRef = null
try {
coordinatorRef = StateStoreCoordinatorRef(sc.env)
coordinatorRef = StateStoreCoordinatorRef.forDriver(sc.env)
body(coordinatorRef)
} finally {
if (coordinatorRef != null) coordinatorRef.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,38 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn
}
}

test("distributed test") {
quietly {
withSpark(new SparkContext(sparkConf.setMaster("local-cluster[2, 1, 1024]"))) { sc =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should clone sparkConf

implicit val sqlContet = new SQLContext(sc)
val path = Utils.createDirectory(tempDir, Random.nextString(10)).toString
val increment = (store: StateStore, iter: Iterator[String]) => {
iter.foreach { s =>
store.update(
stringToRow(s), oldRow => {
val oldValue = oldRow.map(rowToInt).getOrElse(0)
intToRow(oldValue + 1)
})
}
store.commit()
store.iterator().map(rowsToStringInt)
}
val opId = 0
val rdd1 = makeRDD(sc, Seq("a", "b", "a")).mapPartitionWithStateStore(
increment, path, opId, storeVersion = 0, keySchema, valueSchema)
assert(rdd1.collect().toSet === Set("a" -> 2, "b" -> 1))

// Generate next version of stores
val rdd2 = makeRDD(sc, Seq("a", "c")).mapPartitionWithStateStore(
increment, path, opId, storeVersion = 1, keySchema, valueSchema)
assert(rdd2.collect().toSet === Set("a" -> 3, "b" -> 1, "c" -> 1))

// Make sure the previous RDD still has the same data.
assert(rdd1.collect().toSet === Set("a" -> 2, "b" -> 1))
}
}
}

private def makeRDD(sc: SparkContext, seq: Seq[String]): RDD[String] = {
sc.makeRDD(seq, 2).groupBy(x => x).flatMap(_._2)
}
Expand Down