Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import scala.reflect.ClassTag

import java.io.{IOException, ObjectInputStream, ObjectOutputStream}

import org.apache.spark.Logging
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.MetadataCleaner
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.streaming.Duration

/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
Expand Down Expand Up @@ -144,7 +143,7 @@ abstract class DStream[T: ClassTag] (
*/
private[streaming] def initialize(time: Time) {
if (zeroTime != null && zeroTime != time) {
throw new Exception("ZeroTime is already initialized to " + zeroTime
throw new SparkException("ZeroTime is already initialized to " + zeroTime
+ ", cannot initialize it again to " + time)
}
zeroTime = time
Expand Down Expand Up @@ -235,7 +234,7 @@ abstract class DStream[T: ClassTag] (

private[streaming] def setContext(s: StreamingContext) {
if (ssc != null && ssc != s) {
throw new Exception("Context is already set in " + this + ", cannot set it again")
throw new SparkException("Context is already set in " + this + ", cannot set it again")
}
ssc = s
logInfo("Set context for " + this)
Expand All @@ -244,7 +243,7 @@ abstract class DStream[T: ClassTag] (

private[streaming] def setGraph(g: DStreamGraph) {
if (graph != null && graph != g) {
throw new Exception("Graph is already set in " + this + ", cannot set it again")
throw new SparkException("Graph is already set in " + this + ", cannot set it again")
}
graph = g
dependencies.foreach(_.setGraph(graph))
Expand All @@ -261,7 +260,7 @@ abstract class DStream[T: ClassTag] (
/** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
private[streaming] def isTimeValid(time: Time): Boolean = {
if (!isInitialized) {
throw new Exception (this + " has not been initialized")
throw new SparkException (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
" and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
Expand Down Expand Up @@ -716,6 +715,9 @@ abstract class DStream[T: ClassTag] (
* Return all the RDDs between 'fromTime' to 'toTime' (both included)
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
if (!isInitialized) {
throw new SparkException(this + " has not been initialized")
}
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

import util.ManualClock
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkException, SparkConf}
import org.apache.spark.streaming.dstream.{WindowedDStream, DStream}
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag
Expand Down Expand Up @@ -396,6 +396,16 @@ class BasicOperationsSuite extends TestSuiteBase {
Thread.sleep(1000)
}

test("slice - has not been initialized") {
val ssc = new StreamingContext(conf, Seconds(1))
val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
val stream = new TestInputStream[Int](ssc, input, 2)
val thrown = intercept[SparkException] {
stream.slice(new Time(0), new Time(1000))
}
assert(thrown.getMessage.contains("has not been initialized"))
}

val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq

test("rdd cleanup - map and window") {
Expand Down