Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ abstract class StreamingLinearAlgorithm[
throw new IllegalArgumentException("Model must be initialized before starting training.")
}
data.foreachRDD { (rdd, time) =>
model = Some(algorithm.run(rdd, model.get.weights))
logInfo("Model updated at time %s".format(time.toString))
val display = model.get.weights.size match {
case x if x > 100 => model.get.weights.toArray.take(100).mkString("[", ",", "...")
case _ => model.get.weights.toArray.mkString("[", ",", "]")
if (!rdd.isEmpty) {
model = Some(algorithm.run(rdd, model.get.weights))
logInfo(s"Model updated at time ${time.toString}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(no need for toString here but don't worry about it)

val display = model.get.weights.size match {
case x if x > 100 => model.get.weights.toArray.take(100).mkString("[", ",", "...")
case _ => model.get.weights.toArray.mkString("[", ",", "]")
}
logInfo(s"Current model: weights, ${display}")
}
logInfo("Current model: weights, %s".format (display))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,21 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase
val error = output.map(batch => batch.map(p => math.abs(p._1 - p._2)).sum / nPoints).toList
assert(error.head > 0.8 & error.last < 0.2)
}

// Test empty RDDs in a stream
test("handling empty RDDs in a stream") {
val model = new StreamingLogisticRegressionWithSGD()
.setInitialWeights(Vectors.dense(-0.1))
.setStepSize(0.01)
.setNumIterations(10)
val numBatches = 10
val emptyInput = Seq.empty[Seq[LabeledPoint]]
val ssc = setupStreams(emptyInput,
(inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
}
)
val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,22 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase {
val error = output.map(batch => batch.map(p => math.abs(p._1 - p._2)).sum / nPoints).toList
assert((error.head - error.last) > 2)
}

// Test empty RDDs in a stream
test("handling empty RDDs in a stream") {
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(0.0, 0.0))
.setStepSize(0.2)
.setNumIterations(25)
val numBatches = 10
val nPoints = 100
val emptyInput = Seq.empty[Seq[LabeledPoint]]
val ssc = setupStreams(emptyInput,
(inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
}
)
val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)
}
}