Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]],
*
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/
class TestOutputStream[T: ClassTag](parent: DStream[T],
val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
class TestOutputStream[T: ClassTag](
parent: DStream[T],
val output: SynchronizedBuffer[Seq[T]] =
new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
}) {
Expand All @@ -95,8 +97,10 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
* The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
* containing a sequence of items.
*/
class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]())
class TestOutputStreamWithPartitions[T: ClassTag](
parent: DStream[T],
val output: SynchronizedBuffer[Seq[Seq[T]]] =
new ArrayBuffer[Seq[Seq[T]]] with SynchronizedBuffer[Seq[Seq[T]]])
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq)
output += collected
Expand All @@ -108,10 +112,6 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
ois.defaultReadObject()
output.clear()
}

def toTestOutputStream: TestOutputStream[T] = {
new TestOutputStream[T](this.parent, this.output.map(_.flatten))
}
}

/**
Expand Down Expand Up @@ -425,12 +425,21 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
logInfo("--------------------------------")

// Match the output with the expected output
assert(output.size === expectedOutput.size, "Number of outputs do not match")
for (i <- 0 until output.size) {
if (useSet) {
assert(output(i).toSet === expectedOutput(i).toSet)
assert(
output(i).toSet === expectedOutput(i).toSet,
s"Set comparison failed\n" +
s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
s"Generated output (${output.size} items): ${output.mkString("\n")}"
)
} else {
assert(output(i).toList === expectedOutput(i).toList)
assert(
output(i).toList === expectedOutput(i).toList,
s"Ordered list comparison failed\n" +
s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
s"Generated output (${output.size} items): ${output.mkString("\n")}"
)
}
}
logInfo("Output verified successfully")
Expand Down