@@ -406,17 +406,14 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
406406 * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
407407 * DStream
408408 * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
409- * @param initial state value of each key.
410409 * @tparam S State type
411410 */
412411 def updateStateByKey [S : ClassTag ](
413412 updateFunc : (Iterator [(K , Seq [V ], Option [S ])]) => Iterator [(K , S )],
414413 partitioner : Partitioner ,
415- rememberPartitioner : Boolean ,
416- initial : RDD [(K , S )]
414+ rememberPartitioner : Boolean
417415 ): DStream [(K , S )] = {
418- new StateDStream (self, ssc.sc.clean(updateFunc), partitioner,
419- rememberPartitioner, Some (initial))
416+ new StateDStream (self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None )
420417 }
421418
422419 /**
@@ -431,14 +428,17 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
431428 * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
432429 * DStream
433430 * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
431+ * @param initialRDD state value of each key.
434432 * @tparam S State type
435433 */
436434 def updateStateByKey [S : ClassTag ](
437435 updateFunc : (Iterator [(K , Seq [V ], Option [S ])]) => Iterator [(K , S )],
438436 partitioner : Partitioner ,
439- rememberPartitioner : Boolean
437+ rememberPartitioner : Boolean ,
438+ initialRDD : RDD [(K , S )]
440439 ): DStream [(K , S )] = {
441- new StateDStream (self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None )
440+ new StateDStream (self, ssc.sc.clean(updateFunc), partitioner,
441+ rememberPartitioner, Some (initialRDD))
442442 }
443443
444444 /**
0 commit comments