Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ private[kafka010] class KafkaSource(
currentPartitionOffsets = Some(untilPartitionOffsets)
}

sqlContext.internalCreateDataFrame(rdd, schema)
markAsStreaming(sqlContext.internalCreateDataFrame(rdd, schema))
}

/** Stop this source and free any resources it has allocated. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,33 @@ class KafkaSourceSuite extends KafkaSourceTest {
assert(query.exception.isEmpty)
}

test("getBatch should return a streaming DataFrame") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 5)
testUtils.sendMessages(topic, Array("-1"))
require(testUtils.getLatestOffsets(Set(topic)).size === 5)

val kafka = spark
.readStream
.format("kafka")
.option("subscribe", topic)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
StartStream(trigger = ProcessingTime(1)),
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
AssertOnQuery { query =>
query.lastExecution.logical.isStreaming
}
)
}

private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"

private def assignString(topic: String, partitions: Iterable[Int]): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1119,11 +1119,12 @@ case class DecimalAggregates(conf: CatalystConf) extends Rule[LogicalPlan] {
*/
object ConvertToLocalRelation extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Project(projectList, LocalRelation(output, data))
case Project(projectList, lr @ LocalRelation(output, data, dataFromStreaming))
if !projectList.exists(hasUnevaluableExpr) =>
val projection = new InterpretedProjection(projectList, output)
projection.initialize(0)
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
LocalRelation(projectList.map(_.toAttribute), data.map(projection),
dataFromStreaming = dataFromStreaming)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a streaming query, we will transfrom stream source to a batch LocalRelation whose isStreaming is true, so we should keep new LocalRelation's isStreaming is true in this rule.


private def hasUnevaluableExpr(expr: Expression): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,15 @@ object LocalRelation {
}
}

case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
/**
* @param dataFromStreaming indicate if this relation comes from a streaming source.
* In a streaming query, stream relation will be cut into a
* series of batch relations.
*/
case class LocalRelation(
output: Seq[Attribute],
data: Seq[InternalRow] = Nil,
var dataFromStreaming: Boolean = false)
extends LeafNode with analysis.MultiInstanceRelation {

// A local relation must have resolved output.
Expand All @@ -68,12 +76,14 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)

override def sameResult(plan: LogicalPlan): Boolean = {
plan.canonicalized match {
case LocalRelation(otherOutput, otherData) =>
case LocalRelation(otherOutput, otherData, _) =>
otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data
case _ => false
}
}

override def isStreaming: Boolean = dataFromStreaming

override def computeStats(conf: CatalystConf): Statistics =
Statistics(sizeInBytes =
output.map(n => BigInt(n.dataType.defaultSize)).sum * data.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,15 +348,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
case (true, SaveMode.Overwrite) =>
// Get all input data source or hive relations of the query.
val srcRelations = df.logicalPlan.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
case LogicalRelation(src: BaseRelation, _, _, _) => src
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
relation.tableMeta.identifier
}

val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
EliminateSubqueryAliases(tableRelation) match {
// check if the table is a data source table (the relation is a BaseRelation).
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
case LogicalRelation(dest: BaseRelation, _, _, _) if srcRelations.contains(dest) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
// check hive table relation when overwrite mode
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2728,7 +2728,7 @@ class Dataset[T] private[sql](
*/
def inputFiles: Array[String] = {
val files: Seq[String] = queryExecution.optimizedPlan.collect {
case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
case LogicalRelation(fsBasedRelation: FileRelation, _, _, _) =>
fsBasedRelation.inputFiles
case fr: FileRelation =>
fr.inputFiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,18 @@ case class ExternalRDDScanExec[T](
}
}

/** Logical plan node for scanning data from an RDD of InternalRow. */
/** Logical plan node for scanning data from an RDD of InternalRow.
*
* @param dataFromStreaming indicate if this relation comes from a streaming source.
* In a streaming query, stream relation will be cut into a
* series of batch relations.
*/
case class LogicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow],
outputPartitioning: Partitioning = UnknownPartitioning(0),
outputOrdering: Seq[SortOrder] = Nil)(session: SparkSession)
outputOrdering: Seq[SortOrder] = Nil,
var dataFromStreaming: Boolean = false)(session: SparkSession)
extends LeafNode with MultiInstanceRelation {

override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil
Expand Down Expand Up @@ -163,11 +169,13 @@ case class LogicalRDD(

override def sameResult(plan: LogicalPlan): Boolean = {
plan.canonicalized match {
case LogicalRDD(_, otherRDD, _, _) => rdd.id == otherRDD.id
case LogicalRDD(_, otherRDD, _, _, _) => rdd.id == otherRDD.id
case _ => false
}
}

override def isStreaming: Boolean = dataFromStreaming

override protected def stringArgs: Iterator[Any] = Iterator(output)

@transient override def computeStats(conf: CatalystConf): Statistics = Statistics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class OptimizeMetadataOnlyQuery(
child transform {
case plan if plan eq relation =>
relation match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) =>
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
val partitionData = fsRelation.location.listFiles(filters = Nil)
LocalRelation(partAttrs, partitionData.map(_.values))
Expand Down Expand Up @@ -132,7 +132,7 @@ case class OptimizeMetadataOnlyQuery(
object PartitionedRelation {

def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _)
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
if fsRelation.partitionSchema.nonEmpty =>
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
Some(AttributeSet(partAttrs), l)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case EventTimeWatermark(columnName, delay, child) =>
EventTimeWatermarkExec(columnName, delay, planLater(child)) :: Nil

case PhysicalAggregation(
namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) =>
case agg @ PhysicalAggregation(
namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child)
if agg.isStreaming =>

Copy link
Contributor Author

@uncleGen uncleGen Feb 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apply this strategy only if the logical plan is streaming.

aggregate.AggUtils.planStreamingAggregation(
namedGroupingExpressions,
Expand Down Expand Up @@ -408,7 +409,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.window.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child) =>
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
case logical.LocalRelation(output, data, _) =>
LocalTableScanExec(output, data) :: Nil
case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ case class DataSource(
}
val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location
}.head
}
// For partitioned relation r, r.schema's column ordering can be different from the column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)

case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _),
case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
parts, query, overwrite, false) if parts.isEmpty =>
InsertIntoDataSourceCommand(l, query, overwrite)

case InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) =>
l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, query, overwrite, false) =>
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
// the user has specified static partitions, we add a Project operator on top of the query
// to include those constant column values in the query result.
Expand Down Expand Up @@ -181,7 +181,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {

val outputPath = t.location.rootPaths.head
val inputPaths = actualQuery.collect {
case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
}.flatten

val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
Expand Down Expand Up @@ -264,29 +264,30 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
*/
object DataSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) =>
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
pruneFilterProjectRaw(
l,
projects,
filters,
(requestedColumns, allPredicates, _) =>
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil

case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _)) =>
case PhysicalOperation(
projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil

case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _)) =>
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil

case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
case l @ LogicalRelation(baseRelation: TableScan, _, _, _) =>
RowDataSourceScanExec(
l.output,
toCatalystRDD(l, baseRelation.buildScan()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.spark.sql.execution.SparkPlan
object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters,
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table)) =>
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@ import org.apache.spark.util.Utils
* Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without
* changing the output attributes' IDs. The `expectedOutputAttributes` parameter is used for
* this purpose. See https://issues.apache.org/jira/browse/SPARK-10741 for more details.
*
* @param dataFromStreaming indicate if this relation comes from a streaming source.
* In a streaming query, stream relation will be cut into a
* series of batch relations.
*/
case class LogicalRelation(
relation: BaseRelation,
expectedOutputAttributes: Option[Seq[Attribute]] = None,
catalogTable: Option[CatalogTable] = None)
catalogTable: Option[CatalogTable] = None,
var dataFromStreaming: Boolean = false)
extends LeafNode with MultiInstanceRelation {

override val output: Seq[AttributeReference] = {
Expand All @@ -53,7 +58,8 @@ case class LogicalRelation(

// Logical Relations are distinct if they have different output for the sake of transformations.
override def equals(other: Any): Boolean = other match {
case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output
case l @ LogicalRelation(otherRelation, _, _, _) =>
relation == otherRelation && output == l.output
case _ => false
}

Expand All @@ -63,11 +69,13 @@ case class LogicalRelation(

override def sameResult(otherPlan: LogicalPlan): Boolean = {
otherPlan.canonicalized match {
case LogicalRelation(otherRelation, _, _) => relation == otherRelation
case LogicalRelation(otherRelation, _, _, _) => relation == otherRelation
case _ => false
}
}

override def isStreaming: Boolean = dataFromStreaming

// When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need
// LogicalRelation.cleanArgs to return Seq(relation), since expectedOutputAttribute's
// expId can be different but the relation is still the same.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
_,
_),
_,
_,
_))
if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined =>
// The attribute name of predicate could be different than the one in schema in case of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,10 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
case relation: CatalogRelation =>
val metadata = relation.tableMeta
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
case LogicalRelation(h: HadoopFsRelation, _, catalogTable, _) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, h.partitionSchema.map(_.name))
case LogicalRelation(_: InsertableRelation, _, catalogTable) =>
case LogicalRelation(_: InsertableRelation, _, catalogTable, _) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, Nil)
case _ => i
Expand Down Expand Up @@ -414,10 +414,10 @@ object PreWriteCheck extends (LogicalPlan => Unit) {

def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case InsertIntoTable(l @ LogicalRelation(relation, _, _), partition, query, _, _) =>
case InsertIntoTable(l @ LogicalRelation(relation, _, _, _), partition, query, _, _) =>
// Get all input data source relations of the query.
val srcRelations = query.collect {
case LogicalRelation(src, _, _) => src
case LogicalRelation(src, _, _, _) => src
}
if (srcRelations.contains(relation)) {
failAnalysis("Cannot insert into table that is also being read from.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,10 @@ class FileStreamSource(
partitionColumns = partitionColumns,
className = fileFormatClassName,
options = optionsWithPartitionBasePath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
checkFilesExist = false)))
markAsStreaming(
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
checkFilesExist = false), dataFromStreaming = true))
)
}

/**
Expand Down
Loading