[SPARK-20883][SPARK-20376][SS] Refactored StateStore APIs and added conf to choose implementation#18107
[SPARK-20883][SPARK-20376][SS] Refactored StateStore APIs and added conf to choose implementation#18107tdas wants to merge 6 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
update this description
|
Test build #77362 has finished for PR 18107 at commit
|
| partitionId: Int, | ||
| name: String = "") | ||
|
|
||
| case class StateStoreStats() |
| */ | ||
| def remove(key: UnsafeRow): Unit | ||
|
|
||
| def getRange(start: Option[UnsafeRow], end: Option[UnsafeRow]): Iterator[UnsafeRowTuple] |
| def key: UnsafeRow | ||
| def value: UnsafeRow | ||
| object StateStoreProvider { | ||
| def instantiate( |
| val key = getKey(row) | ||
| store.put(key.copy(), row.copy()) | ||
| numUpdatedStateRows += 1 | ||
| allUpdatesTimeMs += timeTakenMs { |
There was a problem hiding this comment.
this update is to accommodate for removal of StateStore.updates()
|
Test build #3755 has finished for PR 18107 at commit
|
|
Test build #77363 has finished for PR 18107 at commit
|
| expectedState = Some(5), // state should change | ||
| expectedTimeoutTimestamp = 5000) // timestamp should change | ||
|
|
||
| test("StateStoreUpdater - rows are cloned before writing to StateStore") { |
There was a problem hiding this comment.
This is not needed any more as the operator is not responsible for cloning the rows when writing to the store.
|
Test build #77386 has finished for PR 18107 at commit
|
|
Test build #77402 has finished for PR 18107 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Looks pretty good. My major comment is Prefer nanoTime over currentTimeMillis
|
|
||
| def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD) | ||
|
|
||
| def stateStoreProviderClass: Option[String] = getConf(STATE_STORE_PROVIDER_CLASS) |
There was a problem hiding this comment.
Also add this to StateStoreConf for consistency?
|
|
||
| /** Records the duration of running `body` for the next query progress update. */ | ||
| protected def timeTakenMs(body: => Unit): Long = { | ||
| val startTime = System.currentTimeMillis |
| .internal() | ||
| .doc( | ||
| "The class used to manage state data in stateful streaming queries. This class must" + | ||
| "be a subclass of StateStoreProvider, and must have a zero-arg constructor.") |
| storeConfs: StateStoreConf, | ||
| hadoopConf: Configuration): Unit = { | ||
| throw new Exception("Successfully instantiated") | ||
|
|
| indexOrdinal: Option[Int], // for sorting the data | ||
| storeConf: StateStoreConf, | ||
| hadoopConf: Configuration): StateStoreProvider = { | ||
| val provider = Utils.getContextOrSparkClassLoader |
There was a problem hiding this comment.
nit: Use Utils.classForName(providerClass).
| // Update and output modified rows from the StateStore. | ||
| case Some(Update) => | ||
|
|
||
| val updatesStartTimeMs = System.currentTimeMillis |
| override def output: Seq[Attribute] = child.output | ||
|
|
||
| override def outputPartitioning: Partitioning = child.outputPartitioning | ||
|
|
| case None => iter | ||
| } | ||
|
|
||
| val updatesStartTimeMs = System.currentTimeMillis |
| CompletionIterator[InternalRow, Iterator[InternalRow]](result, { | ||
| watermarkPredicateForKeys.foreach(f => store.remove(f.eval _)) | ||
| store.commit() | ||
| allUpdatesTimeMs += System.currentTimeMillis - updatesStartTimeMs |
| override protected def getNext(): InternalRow = { | ||
| var removedValueRow: InternalRow = null | ||
| while(rangeIter.hasNext && removedValueRow == null) { | ||
| val UnsafeRowPair(keyRow, valueRow) = rangeIter.next() |
There was a problem hiding this comment.
Case class's unapply will create a Tuple. You should not use this Scala syntactic sugar :)
There was a problem hiding this comment.
That is true! I had assumed unapply will get desugared into something simple, but its probably best to not to rely on the Scala compiler so much.
|
LGTM pending tests. |
|
Test build #77546 has finished for PR 18107 at commit
|
|
Test build #77547 has finished for PR 18107 at commit
|
|
Test build #77549 has finished for PR 18107 at commit
|
|
Thanks! Merging to master. |
What changes were proposed in this pull request?
A bunch of changes to the StateStore APIs and implementation.
Current state store API has a bunch of problems that causes too many transient objects causing memory pressure.
StateStore.get(): Optionforces creation of Some/None objects for every get. Changed this to return the row or null.StateStore.iterator(): (UnsafeRow, UnsafeRow)forces creation of new tuple for each record returned. Changed this to return a UnsafeRowTuple which can be reused across records.StateStore.updates()requires the implementation to keep track of updates, while this is used minimally (only by Append mode in streaming aggregations). Removed updates() and updated StateStoreSaveExec accordingly.StateStore.filter(condition)andStateStore.remove(condition)has been merge into a single APIgetRange(start, end)which allows a state store to do optimized range queries (i.e. avoid full scans). Stateful operators have been updated accordingly.Additionally,
How was this patch tested?
Old and new unit tests