Skip to content

Commit 91bfa72

Browse files
committed
Fixed bugs.
1 parent 8533094 commit 91bfa72

File tree

5 files changed

+9
-5
lines changed

5 files changed

+9
-5
lines changed

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,11 @@ private[spark] object UIUtils extends Logging {
121121
(records, "")
122122
}
123123
}
124-
"%.1f%s".formatLocal(Locale.US, value, unit)
124+
if (unit.isEmpty) {
125+
"%d".formatLocal(Locale.US, value)
126+
} else {
127+
"%.1f%s".formatLocal(Locale.US, value, unit)
128+
}
125129
}
126130

127131
// Yarn has to go through a proxy so the base uri is provided and has to be on all links

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ class KafkaStreamSuite extends TestSuiteBase {
3737
val test3: NetworkInputDStream[(String, String)] =
3838
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
3939
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
40-
assert(test1.isInstanceOf)
4140

4241
// TODO: Actually test receiving data
4342
ssc.stop()

streaming/src/main/scala/org/apache/spark/streaming/receiver/NetworkReceiverExecutor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ private[streaming] abstract class NetworkReceiverExecutor(
174174
}
175175

176176
/** Check if receiver has been marked for stopping */
177-
def isReceiverStarted() = {
177+
def isReceiverStarted() = synchronized {
178178
logDebug("state = " + receiverState)
179179
receiverState == Started
180180
}

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
9090
val receiverInfo = listener.receiverInfo(receiverId)
9191
val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
9292
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
93-
val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId))
93+
val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId))
9494
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
95-
d.getQuantiles().map(r => formatDurationVerbose(r.toLong))
95+
d.getQuantiles().map(r => formatNumber(r.toLong))
9696
}.getOrElse {
9797
Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
9898
}

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ class TestReceiver extends NetworkReceiver[Int](StorageLevel.MEMORY_ONLY) with L
283283
def onStart() {
284284
val thread = new Thread() {
285285
override def run() {
286+
logInfo("Receiving started")
286287
while (!isStopped) {
287288
store(TestReceiver.counter.getAndIncrement)
288289
}

0 commit comments

Comments
 (0)