Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class StreamExecution(
nextSourceId += 1
// We still need to use the previous `output` instead of `source.schema` as attributes in
// "df.logicalPlan" has already used attributes of the previous `output`.
StreamingExecutionRelation(source, output)
StreamingExecutionRelation(source, output)(sparkSession)
})
}
sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource

Expand Down Expand Up @@ -48,9 +50,21 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
* Used to link a streaming [[Source]] of data into a
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].
*/
case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode {
case class StreamingExecutionRelation(
source: Source,
output: Seq[Attribute])(session: SparkSession)
extends LeafNode {

override def isStreaming: Boolean = true
override def toString: String = source.toString

// There's no sensible value here. On the execution path, this relation will be
// swapped out with microbatches. But some dataframe operations (in particular explain) do lead
// to this node surviving analysis. So we satisfy the LeafNode contract with the session default
// value.
override def computeStats(): Statistics = Statistics(
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
)
}

/**
Expand All @@ -65,7 +79,7 @@ case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) ext
}

object StreamingExecutionRelation {
def apply(source: Source): StreamingExecutionRelation = {
StreamingExecutionRelation(source, source.schema.toAttributes)
def apply(source: Source, session: SparkSession): StreamingExecutionRelation = {
StreamingExecutionRelation(source, source.schema.toAttributes)(session)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object MemoryStream {
case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
extends Source with Logging {
protected val encoder = encoderFor[A]
protected val logicalPlan = StreamingExecutionRelation(this)
protected val logicalPlan = StreamingExecutionRelation(this, sqlContext.sparkSession)
protected val output = logicalPlan.output

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@ class StreamSuite extends StreamTest {
CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
}


test("explain join") {
// Make a table and ensure it will be broadcast.
val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")

// Join the input stream with a table.
val inputData = MemoryStream[Int]
val joined = inputData.toDF().join(smallTable, smallTable("number") === $"value")

val outputStream = new java.io.ByteArrayOutputStream()
Console.withOut(outputStream) {
joined.explain()
}
assert(outputStream.toString.contains("StreamingRelation"))
}

test("SPARK-20432: union one stream with itself") {
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
val unioned = df.union(df)
Expand Down Expand Up @@ -335,7 +351,9 @@ class StreamSuite extends StreamTest {

override def stop(): Unit = {}
}
val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
val df = Dataset[Int](
sqlContext.sparkSession,
StreamingExecutionRelation(source, sqlContext.sparkSession))
testStream(df)(
// `ExpectFailure(isFatalError = true)` verifies two things:
// - Fatal errors can be propagated to `StreamingQuery.exception` and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
override def stop(): Unit = {}
}
StreamingExecutionRelation(source)
StreamingExecutionRelation(source, spark)
}

/** Returns the query progress at the end of the first trigger of streaming DF */
Expand Down