Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,14 @@ class MicroBatchExecution(
nextSourceId += 1
StreamingExecutionRelation(reader, output)(sparkSession)
})
case s @ StreamingRelationV2(_, _, _, output, v1Relation) =>
case s @ StreamingRelationV2(_, sourceName, _, output, v1Relation) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
assert(v1Relation.isDefined, "v2 execution didn't match but v1 was unavailable")
if (v1Relation.isEmpty) {
throw new UnsupportedOperationException(
s"Data source $sourceName does not support microbatch processing.")
}
val source = v1Relation.get.dataSource.createSource(metadataPath)
nextSourceId += 1
StreamingExecutionRelation(source, output)(sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,36 @@

package org.apache.spark.sql.execution.streaming

import org.apache.spark.internal.Logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this file into the sources subdirectory to make it consistent with other v2 sources?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact this file can be merged into the ConsoleWriter.scala. The combined file will be named console.scala

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do this in a followup PR. It's not as simple as just moving it; we have to add an alias so that .format("org.apache.spark.sql.execution.streaming.ConsoleSinkProvider") continues to work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

argh. okay. later then.

import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
// Number of rows to display, by default 20 rows
private val numRowsToShow = options.get("numRows").map(_.toInt).getOrElse(20)

// Truncate the displayed data if it is too long, by default it is true
private val isTruncated = options.get("truncate").map(_.toBoolean).getOrElse(true)
import java.util.Optional

// Track the batch id
private var lastBatchId = -1L

override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
val batchIdStr = if (batchId <= lastBatchId) {
s"Rerun batch: $batchId"
} else {
lastBatchId = batchId
s"Batch: $batchId"
}

// scalastyle:off println
println("-------------------------------------------")
println(batchIdStr)
println("-------------------------------------------")
// scalastyle:off println
data.sparkSession.createDataFrame(
data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
.show(numRowsToShow, isTruncated)
}
import scala.collection.JavaConverters._

override def toString(): String = s"ConsoleSink[numRows=$numRowsToShow, truncate=$isTruncated]"
}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
extends BaseRelation {
override def schema: StructType = data.schema
}

class ConsoleSinkProvider extends StreamSinkProvider
class ConsoleSinkProvider extends DataSourceV2
with MicroBatchWriteSupport
with DataSourceRegister
with CreatableRelationProvider {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new ConsoleSink(parameters)

override def createMicroBatchWriter(
queryId: String,
epochId: Long,
schema: StructType,
mode: OutputMode,
options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
Optional.of(new ConsoleWriter(epochId, schema, options))
}

def createRelation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is createRelation used for? For batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume so. I'm not familiar with it, but it's not on the streaming source codepath.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ContinuousExecution(
sparkSession, name, checkpointRoot, analyzedPlan, sink,
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {

@volatile protected var continuousSources: Seq[ContinuousReader] = _
@volatile protected var continuousSources: Seq[ContinuousReader] = Seq()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change. is it related to this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. As mentioned in an earlier comment, initializing to null means the StreamingQueryException won't construct if it happens before sources are set.

override protected def sources: Seq[BaseStreamingSource] = continuousSources

override lazy val logicalPlan: LogicalPlan = {
Expand All @@ -69,7 +69,7 @@ class ContinuousExecution(
ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
})
case StreamingRelationV2(_, sourceName, _, _, _) =>
throw new AnalysisException(
throw new UnsupportedOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? An incorrect data source is not an operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's an argument that it is - you're asking the data source (which is correct in the sense that it's a real, existing source) to do a type of read/write it doesn't support.

The primary motivation is that the existing code has already made the choice to throw an UnsupportedOperationException when you try to stream from a source that only outputs in batch mode.

s"Data source $sourceName does not support continuous processing.")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.sources

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.types.StructType

class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs and link it to the ConsoleSinkProvider since it's in a different file.

extends DataSourceV2Writer with Logging {
// Number of rows to display, by default 20 rows
private val numRowsToShow = options.getInt("numRows", 20)

// Truncate the displayed data if it is too long, by default it is true
private val isTruncated = options.getBoolean("truncate", true)

assert(SparkSession.getActiveSession.isDefined)
private val spark = SparkSession.getActiveSession.get

override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory

override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized {
val batch = messages.collect {
case PackedRowCommitMessage(rows) => rows
}.fold(Array())(_ ++ _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this complicated fold? Just array.collect { ... } returns an Array .. isnt it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns an array of arrays of rows, which isn't what we need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use flatten instead of fold. Much cleaner.


// scalastyle:off println
println("-------------------------------------------")
println(s"Batch: $batchId")
println("-------------------------------------------")
// scalastyle:off println
spark.createDataFrame(
spark.sparkContext.parallelize(batch), schema)
.show(numRowsToShow, isTruncated)
}

override def abort(messages: Array[WriterCommitMessage]): Unit = {}

override def toString(): String = s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.sources

import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage}

/**
* A simple [[DataWriterFactory]] whose tasks just pack rows into the commit message for delivery
* to a [[org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer]] on the driver.
*/
case object PackedRowWriterFactory extends DataWriterFactory[Row] {
def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
new PackedRowDataWriter()
}
}

case class PackedRowCommitMessage(rows: Array[Row]) extends WriterCommitMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs.


class PackedRowDataWriter() extends DataWriter[Row] with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs.

private val data = mutable.Buffer[Row]()

override def write(row: Row): Unit = data.append(row)

override def commit(): PackedRowCommitMessage = {
val msg = PackedRowCommitMessage(data.clone().toArray)
Copy link
Contributor

@tdas tdas Jan 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you cloning and then calling toArray? Just data.toArray will create an immutable copy.

data.clear()
msg
}

override def abort(): Unit = data.clear()
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.sources.{MemoryPlanV2, MemorySinkV2}
import org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport
import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}

/**
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
Expand Down Expand Up @@ -280,14 +280,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
useTempCheckpointLocation = true,
trigger = trigger)
} else {
val sink = trigger match {
case _: ContinuousTrigger =>
val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
ds.newInstance() match {
case w: ContinuousWriteSupport => w
case _ => throw new AnalysisException(
s"Data source $source does not support continuous writing")
}
val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are checking for the same conditions here as well as in the StreamingQueryManager.createQuery. I think we need to refactor this, probably sometime in the future once we get rid of v1 completely.

Either way, we should immediately add a general test suite (say StreamingDataSourceV2Suite) that tests these cases with various fake data sources.

val sink = (ds.newInstance(), trigger) match {
case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w
case (_, _: ContinuousTrigger) => throw new UnsupportedOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalysisException.
Incorrect trigger or incompatible data source is not an operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

s"Data source $source does not support continuous writing")
case (w: MicroBatchWriteSupport, _) => w
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt there a case where it does not have MicroBatchWriteSupport, but the trigger is ProcessingTime/OneTime? That should have a different error message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we have to just fall back to the V1 path, because V1 sinks don't have MicroBatchWriteSupport.

case _ =>
val ds = DataSource(
df.sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@ org.apache.spark.sql.sources.FakeSourceFour
org.apache.fakesource.FakeExternalSourceOne
org.apache.fakesource.FakeExternalSourceTwo
org.apache.fakesource.FakeExternalSourceThree
org.apache.spark.sql.streaming.sources.FakeReadMicroBatchOnly
org.apache.spark.sql.streaming.sources.FakeReadContinuousOnly
org.apache.spark.sql.streaming.sources.FakeReadBothModes
org.apache.spark.sql.streaming.sources.FakeReadNeitherMode
org.apache.spark.sql.streaming.sources.FakeWriteMicroBatchOnly
org.apache.spark.sql.streaming.sources.FakeWriteContinuousOnly
org.apache.spark.sql.streaming.sources.FakeWriteBothModes
org.apache.spark.sql.streaming.sources.FakeWriteNeitherMode
Loading