Skip to content

Commit edf8a56

Browse files
kayousterhoutpwendell
authored andcommitted
Remote BlockFetchTracker trait
This trait seems to have been created a while ago when there were multiple implementations; now that there's just one, I think it makes sense to merge it into the BlockFetcherIterator trait. Author: Kay Ousterhout <[email protected]> Closes apache#39 from kayousterhout/remove_tracker and squashes the following commits: 8173939 [Kay Ousterhout] Remote BlockFetchTracker.
1 parent 40e080a commit edf8a56

File tree

2 files changed

+17
-38
lines changed

2 files changed

+17
-38
lines changed

core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala

Lines changed: 0 additions & 27 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,14 @@ import org.apache.spark.util.Utils
4444
*/
4545

4646
private[storage]
47-
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])]
48-
with Logging with BlockFetchTracker {
47+
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
4948
def initialize()
49+
def totalBlocks: Int
50+
def numLocalBlocks: Int
51+
def numRemoteBlocks: Int
52+
def remoteFetchTime: Long
53+
def fetchWaitTime: Long
54+
def remoteBytesRead: Long
5055
}
5156

5257

@@ -233,7 +238,16 @@ object BlockFetcherIterator {
233238
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
234239
}
235240

236-
//an iterator that will read fetched blocks off the queue as they arrive.
241+
override def totalBlocks: Int = numLocal + numRemote
242+
override def numLocalBlocks: Int = numLocal
243+
override def numRemoteBlocks: Int = numRemote
244+
override def remoteFetchTime: Long = _remoteFetchTime
245+
override def fetchWaitTime: Long = _fetchWaitTime
246+
override def remoteBytesRead: Long = _remoteBytesRead
247+
248+
249+
// Implementing the Iterator methods with an iterator that reads fetched blocks off the queue
250+
// as they arrive.
237251
@volatile protected var resultsGotten = 0
238252

239253
override def hasNext: Boolean = resultsGotten < _numBlocksToFetch
@@ -251,14 +265,6 @@ object BlockFetcherIterator {
251265
}
252266
(result.blockId, if (result.failed) None else Some(result.deserialize()))
253267
}
254-
255-
// Implementing BlockFetchTracker trait.
256-
override def totalBlocks: Int = numLocal + numRemote
257-
override def numLocalBlocks: Int = numLocal
258-
override def numRemoteBlocks: Int = numRemote
259-
override def remoteFetchTime: Long = _remoteFetchTime
260-
override def fetchWaitTime: Long = _fetchWaitTime
261-
override def remoteBytesRead: Long = _remoteBytesRead
262268
}
263269
// End of BasicBlockFetcherIterator
264270

0 commit comments

Comments
 (0)