1717
1818package org .apache .spark .streaming .dstream
1919
20- import java .util .concurrent .ArrayBlockingQueue
2120import java .nio .ByteBuffer
21+ import java .util .concurrent .ArrayBlockingQueue
2222
23- import scala .collection .mutable .ArrayBuffer
23+ import scala .collection .mutable .{ ArrayBuffer , HashMap }
2424import scala .concurrent .Await
2525import scala .concurrent .duration ._
2626import scala .reflect .ClassTag
2727
28- import akka .actor .{Props , Actor }
28+ import akka .actor .{Actor , Props }
2929import akka .pattern .ask
3030
31- import org .apache .spark .streaming .util .{RecurringTimer , SystemClock }
32- import org .apache .spark .streaming ._
3331import org .apache .spark .{Logging , SparkEnv }
34- import org .apache .spark .rdd .{RDD , BlockRDD }
32+ import org .apache .spark .rdd .{BlockRDD , RDD }
3533import org .apache .spark .storage .{BlockId , StorageLevel , StreamBlockId }
36- import org .apache .spark .streaming .scheduler .{DeregisterReceiver , AddBlocks , RegisterReceiver }
34+ import org .apache .spark .streaming ._
35+ import org .apache .spark .streaming .scheduler .{ReceivedBlockInfo , AddBlocks , DeregisterReceiver , RegisterReceiver }
36+ import org .apache .spark .streaming .util .{RecurringTimer , SystemClock }
3737
3838/**
3939 * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream ]]
@@ -48,8 +48,10 @@ import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, Regi
4848abstract class NetworkInputDStream [T : ClassTag ](@ transient ssc_ : StreamingContext )
4949 extends InputDStream [T ](ssc_) {
5050
51- // This is an unique identifier that is used to match the network receiver with the
52- // corresponding network input stream.
51+ /** Keeps all received blocks information */
52+ private val receivedBlockInfo = new HashMap [Time , Array [ReceivedBlockInfo ]]
53+
54+ /** This is an unique identifier for the network input stream. */
5355 val id = ssc.getNewNetworkStreamId()
5456
5557 /**
@@ -64,23 +66,45 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte
6466
6567 def stop () {}
6668
69+ /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */
6770 override def compute (validTime : Time ): Option [RDD [T ]] = {
6871 // If this is called for any time before the start time of the context,
6972 // then this returns an empty RDD. This may happen when recovering from a
7073 // master failure
7174 if (validTime >= graph.startTime) {
72- val blockIds = ssc.scheduler.networkInputTracker.getBlockIds(id, validTime)
75+ val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id)
76+ receivedBlockInfo(validTime) = blockInfo
77+ val blockIds = blockInfo.map(_.blockId.asInstanceOf [BlockId ])
7378 Some (new BlockRDD [T ](ssc.sc, blockIds))
7479 } else {
7580 Some (new BlockRDD [T ](ssc.sc, Array [BlockId ]()))
7681 }
7782 }
83+
84+ /** Get information on received blocks. */
85+ private [streaming] def getReceivedBlockInfo (time : Time ) = {
86+ receivedBlockInfo(time)
87+ }
88+
89+ /**
90+ * Clear metadata that are older than `rememberDuration` of this DStream.
91+ * This is an internal method that should not be called directly. This
92+ * implementation overrides the default implementation to clear received
93+ * block information.
94+ */
95+ private [streaming] override def clearMetadata (time : Time ) {
96+ super .clearMetadata(time)
97+ val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
98+ receivedBlockInfo --= oldReceivedBlocks.keys
99+ logDebug(" Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
100+ (time - rememberDuration) + " : " + oldReceivedBlocks.keys.mkString(" , " ))
101+ }
78102}
79103
80104
81105private [streaming] sealed trait NetworkReceiverMessage
82106private [streaming] case class StopReceiver (msg : String ) extends NetworkReceiverMessage
83- private [streaming] case class ReportBlock (blockId : BlockId , metadata : Any )
107+ private [streaming] case class ReportBlock (blockId : StreamBlockId , numRecords : Long , metadata : Any )
84108 extends NetworkReceiverMessage
85109private [streaming] case class ReportError (msg : String ) extends NetworkReceiverMessage
86110
@@ -156,21 +180,20 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
156180 actor ! ReportError (e.toString)
157181 }
158182
159-
160183 /**
161184 * Pushes a block (as an ArrayBuffer filled with data) into the block manager.
162185 */
163- def pushBlock (blockId : BlockId , arrayBuffer : ArrayBuffer [T ], metadata : Any , level : StorageLevel ) {
186+ def pushBlock (blockId : StreamBlockId , arrayBuffer : ArrayBuffer [T ], metadata : Any , level : StorageLevel ) {
164187 env.blockManager.put(blockId, arrayBuffer.asInstanceOf [ArrayBuffer [Any ]], level)
165- actor ! ReportBlock (blockId, metadata)
188+ actor ! ReportBlock (blockId, arrayBuffer.size, metadata)
166189 }
167190
168191 /**
169192 * Pushes a block (as bytes) into the block manager.
170193 */
171- def pushBlock (blockId : BlockId , bytes : ByteBuffer , metadata : Any , level : StorageLevel ) {
194+ def pushBlock (blockId : StreamBlockId , bytes : ByteBuffer , metadata : Any , level : StorageLevel ) {
172195 env.blockManager.putBytes(blockId, bytes, level)
173- actor ! ReportBlock (blockId, metadata)
196+ actor ! ReportBlock (blockId, - 1 , metadata)
174197 }
175198
176199 /** A helper actor that communicates with the NetworkInputTracker */
@@ -188,8 +211,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
188211 }
189212
190213 override def receive () = {
191- case ReportBlock (blockId, metadata) =>
192- tracker ! AddBlocks (streamId, Array ( blockId), metadata)
214+ case ReportBlock (blockId, numRecords, metadata) =>
215+ tracker ! AddBlocks (ReceivedBlockInfo ( streamId, blockId, numRecords, metadata) )
193216 case ReportError (msg) =>
194217 tracker ! DeregisterReceiver (streamId, msg)
195218 case StopReceiver (msg) =>
@@ -211,7 +234,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
211234 class BlockGenerator (storageLevel : StorageLevel )
212235 extends Serializable with Logging {
213236
214- case class Block (id : BlockId , buffer : ArrayBuffer [T ], metadata : Any = null )
237+ case class Block (id : StreamBlockId , buffer : ArrayBuffer [T ], metadata : Any = null )
215238
216239 val clock = new SystemClock ()
217240 val blockInterval = env.conf.getLong(" spark.streaming.blockInterval" , 200 )
0 commit comments