Skip to content

Conversation

@tdas
Copy link
Contributor

@tdas tdas commented Jan 31, 2017

What changes were proposed in this pull request?

mapGroupsWithState is a new API for arbitrary stateful operations in Structured Streaming, similar to DStream.mapWithState

Requirements

  • Users should be able to specify a function that can do the following
  • Access the input row corresponding to a key
  • Access the previous state corresponding to a key
  • Optionally, update or remove the state
  • Output any number of new rows (or none at all)

Proposed API

// ------------ New methods on KeyValueGroupedDataset ------------
class KeyValueGroupedDataset[K, V] {	
	// Scala friendly
	def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U)
        def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U])
	// Java friendly
       def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
       def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
}

// ------------------- New Java-friendly function classes ------------------- 
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
  R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
  Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}

// ---------------------- Wrapper class for state data ---------------------- 
trait KeyedState[S] {
	def exists(): Boolean  	
  	def get(): S 			// throws Exception is state does not exist
	def getOption(): Option[S]       
	def update(newState: S): Unit
	def remove(): Unit		// exists() will be false after this
}

Key Semantics of the State class

  • The state can be null.
  • If the state.remove() is called, then state.exists() will return false, and getOption will returm None.
  • After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...).
  • None of the operations are thread-safe. This is to avoid memory barriers.

Usage

val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => {
    val newCount = words.size + runningCount.getOption.getOrElse(0L)
    runningCount.update(newCount)
   (word, newCount)
}

dataset					                        // type is Dataset[String]
  .groupByKey[String](w => w)        	                // generates KeyValueGroupedDataset[String, String]
  .mapGroupsWithState[Long, (String, Long)](stateFunc)	// returns Dataset[(String, Long)]

How was this patch tested?

New unit tests.

@tdas tdas changed the title Arbitrary stateful operations with MapGroupsWithState [SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations Jan 31, 2017
@SparkQA
Copy link

SparkQA commented Jan 31, 2017

Test build #72205 has finished for PR 16758 at commit 6fab7a5.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}

trait StateStoreReader extends StatefulOperator {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This files should probably be renamed from StatefulAggregation to StatefulOperations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and lowercase if it contains multiple classes

@SparkQA
Copy link

SparkQA commented Jan 31, 2017

Test build #72206 has finished for PR 16758 at commit 8be63de.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@marmbrus marmbrus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking pretty good!

* optionally update or remove the corresponding state. The returned object will form a new
* [[Dataset]].
*
* This function can be applied on both batch and streaming Datasets. With a streaming dataset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be called

* [[Dataset]].
*
* This function can be applied on both batch and streaming Datasets. With a streaming dataset,
* this function will be once for each in every trigger. For each key, the updated state from the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any updates to the state will be stored and passed to the user given function in subsequent batches when executed as a Streaming Query.


/**
* ::Experimental::
* (Scala-specific)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while maintaining some user-defined state for each key.

/**
* ::Experimental::
* (Scala-specific)
* Applies the given function to each group of data, while using an additional keyed state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider breaking this up to make it a little easier to follow:

For each unique group, the given function will be invoked with the following arguments:
 - The key of the group.
 - A user-defined state object set by previous invocations of the given function.  Note that, for batch queries, there is only ever one invocation and thus the state object will always be empty.
 - An iterator containing all the values for this key.

* function call in a trigger will be the state available in the function call in the next
* trigger. However, for batch, `mapGroupsWithState` behaves exactly as `mapGroups` and the
* function is called only once per key without any prior state.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd maybe put these into bullets as well.

ClusteredDistribution(groupingAttributes) :: Nil

override def requiredChildOrdering: Seq[Seq[SortOrder]] =
Seq(groupingAttributes.map(SortOrder(_, Ascending))) // is this ordering needed?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the GroupedIterator relies on sorting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming, will remove the comment.

Seq(groupingAttributes.map(SortOrder(_, Ascending))) // is this ordering needed?

override protected def doExecute(): RDD[InternalRow] = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra newline

child.output.toStructType,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing, but should mapPartitionsWithStateStore be implementing the abort handling. Seems generic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of that. If I change the mapPartitionsWithStateStore, then StateStoreSave should also need to be change, which I didnt want to touch in this already big PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Managed to do it. Not a big change. Improves correctness, and reduces code.


// Assumption: Append mode can be done only when watermark has been specified
store.remove(watermarkPredicate.get.eval)
store.remove(watermarkPredicate.get.eval _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the addition of a new version of stateStore.remove. I think the compiler can disambiguate correctly without this.

}

def deserializeRowToObject(
deserializer: Expression): InternalRow => Any = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent, also does this not fit?

@SparkQA
Copy link

SparkQA commented Jan 31, 2017

Test build #72215 has finished for PR 16758 at commit 34449e4.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 31, 2017

Test build #72216 has finished for PR 16758 at commit 3628af8.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 1, 2017

Test build #72217 has finished for PR 16758 at commit 59c229b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 1, 2017

Test build #72223 has finished for PR 16758 at commit 3dc0353.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 1, 2017

Test build #72230 has finished for PR 16758 at commit 8b18fa1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 1, 2017

Test build #72256 has finished for PR 16758 at commit 8b3150a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

removed = true
}

override def toString: String = "KeyedState($value)"
Copy link
Contributor

@lw-lin lw-lin Feb 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s"KeyedState($value)"
:)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Function that returns running count only if its even, otherwise does not return
val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => {
if (state.exists) throw new IllegalArgumentException("state.exists should be false")
if (state.exists) {
Copy link
Contributor

@lw-lin lw-lin Feb 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: state.get == ...? also state.get in the error message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. not a nit! wrong test!

test("mapGroupsWithState - batch") {
val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => {
if (state.exists) throw new IllegalArgumentException("state.exists should be false")
if (state.exists) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. thanks for catching these.

* - If the `remove()` is called, then `exists()` will return `false`, and
* `getOption()` will return `None`.
* - After that `update(newState)` is called, then `exists()` will return `true`,
* and `getOption()` will return `Some(...)`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getOption ...

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made one pass. Most of my comments are nits.


// MapGroupsWithState: Not supported after a streaming aggregation
val att = new AttributeReference(name = "a", dataType = LongType)()
assertSupportedInStreamingPlan(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: assertSupportedInStreamingPlan -> assertSupportedInBatchPlan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Returns the data and the count if state is defined, otherwise does not return anything
val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => {

var count = Option(state.get).map(_.count).getOrElse(0L) + values.size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: var -> val

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

StartStream(),
CheckLastBatch(("a", 3L)) // task should not fail, and should show correct count
)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test for aggregation after mapGroupsWithState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.


/**
* Update the value of the state. Note that null is not a valid value, and `update(null)` is
* same as `remove()`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to disallow this case. Otherwise, the user may happen to send a null by mistake and we just hide the error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus any thoughts on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if get for a non-existent key returns null then I think its reasonable that remove(key) and put(key, null) are the same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus The user may pass a null by mistake (e.g., just call some method that may return null but not be aware of it). It's pretty hard to debug such silent mistake.

* - The key of the group.
* - An iterator containing all the values for this key.
* - A user-defined state object set by previous invocations of the given function.
* In case of a batch Dataset, there is only invocation and state object will be empty as
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: only one invocation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* preparation walks the query plan.
*/
private var operatorId = 0
private val operatorId = new AtomicInteger(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is only one thread here. Why change it? Any concern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really. just a tiny bit cleaner to do getAndIncrement than using x and then calling x += 1

case Some(ValueRemoved(_, _)) =>
// Remove already in update map, no need to change
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing writeToDeltaFile(tempDeltaFileStream, ValueRemoved(key, value)). It's better to extract the duplicated codes to a new method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. that's true. I should add tests for this StateStore operation.

*/
def remove(condition: UnsafeRow => Boolean): Unit

def remove(key: UnsafeRow): Unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing scala doc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val groupedIter = GroupedIterator(iter, groupingAttributes, child.output)

val getKeyObj = ObjectOperator.deserializeRowToObject(keyDeserializer, groupingAttributes)
val getKey = GenerateUnsafeProjection.generate(groupingAttributes, child.output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

val wrappedState = KeyedStateImpl[Any](stateObjOption.orNull)
val mappedIterator = func(keyObj, valueObjIter, wrappedState)

if (wrappedState.isRemoved) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not be checked here. mappedIterator may be lazy. You can wrap it with a CompletionIterator and add these codes there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. this would confusing for the user if we dont do this.


/** Collect all the streaming aggregates in a sub plan */
def collectStreamingAggregates(subplan: LogicalPlan): Seq[Aggregate] = {
subplan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
Copy link
Contributor

@brkyvz brkyvz Feb 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space before and after @. Actually if you're not going to use any of the Aggregate params, just change this to a: Aggregate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

stateSerializer: Seq[NamedExpression],
child: LogicalPlan) extends UnaryNode with ObjectProducer


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

* Important points to note about using KeyedState.
* - The value of the state cannot be null. So updating state with null is same as removing it.
* - Operations on `KeyedState` are not thread-safe. This is to avoid memory barriers.
* - If the `remove()` is called, then `exists()` will return `false`, and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If ``remove`` is called. Remove the

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


import org.apache.spark.sql.KeyedState

/** Internal implementation of the [[KeyedState]] interface */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to note here that this implementation is not thread safe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have mentioned that the trait KeyedState is not thread-safe

@tdas
Copy link
Contributor Author

tdas commented Feb 7, 2017

I addressed all the comments. However, @zsxwing @marmbrus, our offline discussion of throwing error on .update(null) ran into a problem. Since its typed as S, the behavior is odd when S is primitive type. See the failing test. When the type is Int, get return 0 when the state does not exist. That's very non-intuitive.

@SparkQA
Copy link

SparkQA commented Feb 7, 2017

Test build #72490 has started for PR 16758 at commit b32dcd1.

@tdas
Copy link
Contributor Author

tdas commented Feb 7, 2017

@zsxwing had a discussion with @marmbrus. It is indeed weird that after calling state.remove() and state.get will return 0 if it is of type int (as in JVM null casted to int = 0). So we went back to the semantics of get throwing error when state does not exist.
This ensures the following semantics that @marmbrus originally desired.

  • update(null) is not allowed - for primitive types, compile disallows it; for other types, it throw IllegalArgumentException
  • Hence, get will never return Some(null).

@SparkQA
Copy link

SparkQA commented Feb 8, 2017

Test build #72543 has finished for PR 16758 at commit f3d1231.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Feb 8, 2017

LGTM. Merging to master and 2.1.

@asfgit asfgit closed this in aeb8034 Feb 8, 2017
@zsxwing
Copy link
Member

zsxwing commented Feb 8, 2017

It conflicts with 2.1. Could you submit a backport PR, please?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants