From 469d69aefea17abbb889a8983a59d83988aaff45 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 23 May 2016 16:40:13 -0700 Subject: [PATCH 01/16] First commit to support complete mode --- .../OutputMode.scala => OutputMode.java} | 11 ++-- .../UnsupportedOperationChecker.scala | 3 +- .../analysis/UnsupportedOperationsSuite.scala | 3 +- .../spark/sql/ContinuousQueryManager.scala | 6 +- .../apache/spark/sql/DataFrameWriter.scala | 4 ++ .../spark/sql/execution/SparkStrategies.scala | 19 ------ .../spark/sql/execution/aggregate/utils.scala | 7 ++- .../streaming/IncrementalExecution.scala | 36 +++++++++--- .../streaming/StatefulAggregate.scala | 58 +++++++++++++------ .../execution/streaming/StreamExecution.scala | 1 - .../org/apache/spark/sql/StreamTest.scala | 12 ++-- .../ContinuousQueryManagerSuite.scala | 5 +- .../streaming/StreamingAggregationSuite.scala | 38 +++++++++--- 13 files changed, 128 insertions(+), 75 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/{catalyst/analysis/OutputMode.scala => OutputMode.java} (84%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/OutputMode.java similarity index 84% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/OutputMode.java index a4d387eae3c8..4053a7c0337d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/OutputMode.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst.analysis +package org.apache.spark.sql; -sealed trait OutputMode - -case object Append extends OutputMode -case object Update extends OutputMode +public enum OutputMode { + Append, + Update, + Complete +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 0e08bf013c8d..d596c8622cdf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.OutputMode import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -56,7 +57,7 @@ object UnsupportedOperationChecker { throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets") case Aggregate(_, _, child) if child.isStreaming => - if (outputMode == Append) { + if (outputMode == OutputMode.Append) { throwError( "Aggregations are not supported on streaming DataFrames/Datasets in " + "Append output mode. Consider changing output mode to Update.") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 674277bdbe15..68fb1b44458e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, OutputMode} +import org.apache.spark.sql.OutputMode._ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index eab557443d1d..eadfb2ef86ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import scala.collection.mutable import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.catalyst.analysis.{Append, OutputMode, UnsupportedOperationChecker} +import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf @@ -175,9 +175,9 @@ class ContinuousQueryManager(sparkSession: SparkSession) { checkpointLocation: String, df: DataFrame, sink: Sink, + outputMode: OutputMode, trigger: Trigger = ProcessingTime(0), - triggerClock: Clock = new SystemClock(), - outputMode: OutputMode = Append): ContinuousQuery = { + triggerClock: Clock = new SystemClock()): ContinuousQuery = { activeQueriesLock.synchronized { if (activeQueries.contains(name)) { throw new IllegalArgumentException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6f5fb69ea377..bc04fe44c06e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -327,6 +327,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { checkpointLocation, df, sink, + outputMode, trigger) continuousQuery } else { @@ -353,6 +354,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { checkpointLocation, df, dataSource.createSink(), + outputMode, trigger) } } @@ -705,6 +707,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { private var mode: SaveMode = SaveMode.ErrorIfExists + private var outputMode: OutputMode = OutputMode.Append + private var trigger: Trigger = ProcessingTime(0L) private var extraOptions = new scala.collection.mutable.HashMap[String, String] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c46cecc71f37..463c00563899 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -199,25 +199,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - /** - * Used to plan aggregation queries that are computed incrementally as part of a - * [[org.apache.spark.sql.ContinuousQuery]]. Currently this rule is injected into the planner - * on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]] - */ - object StatefulAggregationStrategy extends Strategy { - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalAggregation( - namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) => - - aggregate.Utils.planStreamingAggregation( - namedGroupingExpressions, - aggregateExpressions, - rewrittenResultExpressions, - planLater(child)) - - case _ => Nil - } - } /** * Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala index f93c44600742..a7d739e2f4ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.aggregate +import org.apache.spark.sql.OutputMode import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.execution.SparkPlan @@ -33,7 +34,7 @@ object Utils { resultExpressions: Seq[NamedExpression], child: SparkPlan): Seq[SparkPlan] = { - val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete)) + val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = aggregate.Complete)) val completeAggregateAttributes = completeAggregateExpressions.map(_.resultAttribute) SortBasedAggregateExec( requiredChildDistributionExpressions = Some(groupingExpressions), @@ -261,6 +262,7 @@ object Utils { groupingExpressions: Seq[NamedExpression], functionsWithoutDistinct: Seq[AggregateExpression], resultExpressions: Seq[NamedExpression], + outputMode: OutputMode, child: SparkPlan): Seq[SparkPlan] = { val groupingAttributes = groupingExpressions.map(_.toAttribute) @@ -311,8 +313,9 @@ object Utils { aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes), child = restored) } + val returnAllStates = if (outputMode == OutputMode.Complete) true else false - val saved = StateStoreSaveExec(groupingAttributes, None, partialMerged2) + val saved = StateStoreSaveExec(groupingAttributes, None, returnAllStates, partialMerged2) val finalAndCompleteAggregate: SparkPlan = { val finalAggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = Final)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 8b96f65bc31a..ee05489f0e9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis.OutputMode +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.planning.PhysicalAggregation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} @@ -35,15 +35,36 @@ class IncrementalExecution private[sql]( val currentBatchId: Long) extends QueryExecution(sparkSession, logicalPlan) { - // TODO: make this always part of planning. - val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy :: Nil - // Modified planner with stateful operations. override def planner: SparkPlanner = new SparkPlanner( sparkSession.sparkContext, sparkSession.sessionState.conf, - stateStrategy) + Nil) { + + override def strategies: Seq[Strategy] = { + StatefulAggregationStrategy +: super.strategies + } + + /** + * Used to plan aggregation queries that are computed incrementally as part of a + * [[org.apache.spark.sql.ContinuousQuery]]. + */ + object StatefulAggregationStrategy extends Strategy { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalAggregation( + namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) => + execution.aggregate.Utils.planStreamingAggregation( + namedGroupingExpressions, + aggregateExpressions, + rewrittenResultExpressions, + outputMode, + planLater(child)) + + case _ => Nil + } + } + } /** * Records the current id for a given stateful operator in the query plan as the `state` @@ -54,7 +75,7 @@ class IncrementalExecution private[sql]( /** Locates save/restore pairs surrounding aggregation. */ val state = new Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = plan transform { - case StateStoreSaveExec(keys, None, + case StateStoreSaveExec(keys, None, outputMode, UnaryExecNode(agg, StateStoreRestoreExec(keys2, None, child))) => val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId) @@ -63,6 +84,7 @@ class IncrementalExecution private[sql]( StateStoreSaveExec( keys, Some(stateId), + outputMode, agg.withNewChildren( StateStoreRestoreExec( keys, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala index d5e4dd8f78ac..62d60ad27c6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala @@ -82,10 +82,12 @@ case class StateStoreRestoreExec( case class StateStoreSaveExec( keyExpressions: Seq[Attribute], stateId: Option[OperatorStateId], + returnAllStates: Boolean, child: SparkPlan) extends execution.UnaryExecNode with StatefulOperator { override protected def doExecute(): RDD[InternalRow] = { + val saveAndReturnFunc = if (returnAllStates) saveAndReturnAll _ else saveAndReturnUpdated _ child.execute().mapPartitionsWithStateStore( getStateId.checkpointLocation, operatorId = getStateId.operatorId, @@ -93,29 +95,47 @@ case class StateStoreSaveExec( keyExpressions.toStructType, child.output.toStructType, sqlContext.sessionState, - Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) => - new Iterator[InternalRow] { - private[this] val baseIterator = iter - private[this] val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) + Some(sqlContext.streams.stateStoreCoordinator) + )(saveAndReturnFunc) + } + + override def output: Seq[Attribute] = child.output - override def hasNext: Boolean = { - if (!baseIterator.hasNext) { - store.commit() - false - } else { - true - } - } + private def saveAndReturnUpdated( + store: StateStore, + iter: Iterator[InternalRow]): Iterator[InternalRow] = { + new Iterator[InternalRow] { + private[this] val baseIterator = iter + private[this] val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) - override def next(): InternalRow = { - val row = baseIterator.next().asInstanceOf[UnsafeRow] - val key = getKey(row) - store.put(key.copy(), row.copy()) - row - } + override def hasNext: Boolean = { + if (!baseIterator.hasNext) { + store.commit() + false + } else { + true } + } + + override def next(): InternalRow = { + val row = baseIterator.next().asInstanceOf[UnsafeRow] + val key = getKey(row) + store.put(key.copy(), row.copy()) + row + } } } - override def output: Seq[Attribute] = child.output + private def saveAndReturnAll( + store: StateStore, + iter: Iterator[InternalRow]): Iterator[InternalRow] = { + val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) + while (iter.hasNext) { + val row = iter.next().asInstanceOf[UnsafeRow] + val key = getKey(row) + store.put(key.copy(), row.copy()) + } + store.commit() + store.iterator().map(_._2.asInstanceOf[InternalRow]) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 7d09bdcebdc3..ab0900d7f6ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.OutputMode import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 1ab562f87334..f5093d295783 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -33,7 +33,6 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ -import org.apache.spark.sql.catalyst.analysis.{Append, OutputMode} import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ @@ -69,8 +68,6 @@ trait StreamTest extends QueryTest with Timeouts { /** How long to wait for an active stream to catch up when checking a result. */ val streamingTimeout = 10.seconds - val outputMode: OutputMode = Append - /** A trait for actions that can be performed while testing a streaming DataFrame. */ trait StreamAction @@ -191,7 +188,10 @@ trait StreamTest extends QueryTest with Timeouts { * Note that if the stream is not explicitly started before an action that requires it to be * running then it will be automatically started before performing any other actions. */ - def testStream(_stream: Dataset[_])(actions: StreamAction*): Unit = { + def testStream( + _stream: Dataset[_], + outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = { + val stream = _stream.toDF() var pos = 0 var currentPlan: LogicalPlan = stream.logicalPlan @@ -297,9 +297,9 @@ trait StreamTest extends QueryTest with Timeouts { metadataRoot, stream, sink, + outputMode, trigger, - triggerClock, - outputMode = outputMode) + triggerClock) .asInstanceOf[StreamExecution] currentStream.microBatchThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index a743cdde408f..77590d53f79a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException -import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest} +import org.apache.spark.sql.{ContinuousQuery, Dataset, OutputMode, StreamTest} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -244,7 +244,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with StreamExecution.nextName, metadataRoot, df, - new MemorySink(df.schema)) + new MemorySink(df.schema), + OutputMode.Append) .asInstanceOf[StreamExecution] } catch { case NonFatal(e) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 7104d01c4a2a..16d369073124 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.streaming import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException -import org.apache.spark.sql.StreamTest -import org.apache.spark.sql.catalyst.analysis.Update +import org.apache.spark.sql.{OutputMode, StreamTest} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed @@ -41,9 +40,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be import testImplicits._ - override val outputMode = Update - - test("simple count") { + test("simple count, update mode") { val inputData = MemoryStream[Int] val aggregated = @@ -52,7 +49,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be .agg(count("*")) .as[(Int, Long)] - testStream(aggregated)( + testStream(aggregated, OutputMode.Update)( AddData(inputData, 3), CheckLastBatch((3, 1)), AddData(inputData, 3, 2), @@ -67,6 +64,29 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be ) } + test("simple count, complete mode") { + val inputData = MemoryStream[Int] + + val aggregated = + inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + testStream(aggregated, OutputMode.Complete)( + AddData(inputData, 3), + CheckLastBatch((3, 1)), + AddData(inputData, 2), + CheckLastBatch((3, 1), (2, 1)), + StopStream, + StartStream(), + AddData(inputData, 3, 2, 1), + CheckLastBatch((3, 2), (2, 2), (1, 1)), + AddData(inputData, 4, 4, 4, 4), + CheckLastBatch((4, 4), (3, 2), (2, 2), (1, 1)) + ) + } + test("multiple keys") { val inputData = MemoryStream[Int] @@ -76,7 +96,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be .agg(count("*")) .as[(Int, Int, Long)] - testStream(aggregated)( + testStream(aggregated, OutputMode.Update)( AddData(inputData, 1, 2), CheckLastBatch((1, 2, 1), (2, 3, 1)), AddData(inputData, 1, 2), @@ -101,7 +121,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be .agg(count("*")) .as[(Int, Long)] - testStream(aggregated)( + testStream(aggregated, OutputMode.Update)( StartStream(), AddData(inputData, 1, 2, 3, 4), ExpectFailure[SparkException](), @@ -114,7 +134,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be val inputData = MemoryStream[(String, Int)] val aggregated = inputData.toDS().groupByKey(_._1).agg(typed.sumLong(_._2)) - testStream(aggregated)( + testStream(aggregated, OutputMode.Update)( AddData(inputData, ("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)), CheckLastBatch(("a", 30), ("b", 3), ("c", 1)) ) From 49746f4b5f8a5167fe033f858711fa5643031097 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 24 May 2016 14:33:48 -0700 Subject: [PATCH 02/16] Add public API for output mode and upgraded memory sink to support complete mode --- .../UnsupportedOperationChecker.scala | 37 ++++++++++-------- .../analysis/UnsupportedOperationsSuite.scala | 36 ++++++++++++++++- .../apache/spark/sql/DataFrameWriter.scala | 34 ++++++++++++++-- .../execution/datasources/DataSource.scala | 9 ++++- .../sql/execution/streaming/console.scala | 5 ++- .../sql/execution/streaming/memory.scala | 39 ++++++++++++------- .../apache/spark/sql/sources/interfaces.scala | 3 +- .../org/apache/spark/sql/StreamTest.scala | 2 +- .../ContinuousQueryManagerSuite.scala | 18 ++++----- .../DataFrameReaderWriterSuite.scala | 5 ++- .../streaming/StreamingAggregationSuite.scala | 16 +++++++- 11 files changed, 152 insertions(+), 52 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index d596c8622cdf..6ebed1adbd41 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -37,7 +37,7 @@ object UnsupportedOperationChecker { } } - def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = { + def checkForStreaming(implicit plan: LogicalPlan, outputMode: OutputMode): Unit = { if (!plan.isStreaming) { throwError( @@ -56,21 +56,6 @@ object UnsupportedOperationChecker { case _: InsertIntoTable => throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets") - case Aggregate(_, _, child) if child.isStreaming => - if (outputMode == OutputMode.Append) { - throwError( - "Aggregations are not supported on streaming DataFrames/Datasets in " + - "Append output mode. Consider changing output mode to Update.") - } - val moreStreamingAggregates = child.find { - case Aggregate(_, _, grandchild) if grandchild.isStreaming => true - case _ => false - } - if (moreStreamingAggregates.nonEmpty) { - throwError("Multiple streaming aggregations are not supported with " + - "streaming DataFrames/Datasets") - } - case Join(left, right, joinType, _) => joinType match { @@ -139,6 +124,26 @@ object UnsupportedOperationChecker { case _ => } } + + // Checks related to aggregations + val aggregates = plan.collect { case a @ Aggregate(_, _, _) if a.isStreaming => a } + outputMode match { + case OutputMode.Append if aggregates.nonEmpty => + throwError( + s"$outputMode output mode not supported with streaming aggregates on " + + s"streaming DataFrames/DataSets") + + case OutputMode.Complete | OutputMode.Update if aggregates.isEmpty => + throwError( + s"$outputMode output mode not supported when not streaming aggregates are present on " + + s"streaming DataFrames/Datasets") + + case _ => + } + if (aggregates.size > 1) { + throwError( + "Multiple streaming aggregations are not supported with streaming DataFrames/Datasets") + } } private def throwErrorIf( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 68fb1b44458e..354b4ccf2f61 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -205,7 +205,6 @@ class UnsupportedOperationsSuite extends SparkFunSuite { _.intersect(_), streamStreamSupported = false) - // Unary operations testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _)) testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort") @@ -214,6 +213,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite { testUnaryOperatorInStreamingPlan( "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows") + // Output modes with aggregation and non-aggregation plans + testOutputMode(OutputMode.Append, shouldSupportAggregation = false) + testOutputMode(OutputMode.Update, shouldSupportAggregation = true) + testOutputMode(OutputMode.Complete, shouldSupportAggregation = true) /* ======================================================================================= @@ -312,6 +315,37 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode) } + def testOutputMode( + outputMode: OutputMode, + shouldSupportAggregation: Boolean): Unit = { + + // aggregation + if (shouldSupportAggregation) { + assertNotSupportedInStreamingPlan( + s"$outputMode output mode - no aggregation", + streamRelation.where($"a" > 1), + outputMode = outputMode, + Seq("aggregation", s"$outputMode output mode")) + + assertSupportedInStreamingPlan( + s"$outputMode output mode - aggregation", + streamRelation.groupBy("a")("count(*)"), + outputMode = outputMode) + + } else { + assertSupportedInStreamingPlan( + s"$outputMode output mode - no aggregation", + streamRelation.where($"a" > 1), + outputMode = outputMode) + + assertNotSupportedInStreamingPlan( + s"$outputMode output mode - aggregation", + streamRelation.groupBy("a")("count(*)"), + outputMode = outputMode, + Seq("aggregation", s"$outputMode output mode")) + } + } + /** * Assert that the logical plan is supported as subplan insider a streaming plan. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index bc04fe44c06e..f26ce7dbcff6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -77,7 +77,31 @@ final class DataFrameWriter private[sql](df: DataFrame) { case "ignore" => SaveMode.Ignore case "error" | "default" => SaveMode.ErrorIfExists case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + - "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.") + "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.") + } + this + } + + /** + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `update`: only the changed rows in the streaming DataFrame/Dataset will be written to + * the sink every time there is some updates + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataFrameWriter = { + assertStreaming("outputMode() can only be called on continuous queries") + this.outputMode = outputMode.toLowerCase match { + case "append" => OutputMode.Append + case "update" => OutputMode.Update + case "complete" => OutputMode.Complete + case _ => throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append', 'update', 'complete'") } this } @@ -319,7 +343,11 @@ final class DataFrameWriter private[sql](df: DataFrame) { checkpointPath.toUri.toString } - val sink = new MemorySink(df.schema) + if (!Seq(OutputMode.Append, OutputMode.Complete).contains(outputMode)) { + throw new IllegalArgumentException(s"Memory sink does not support output mode $outputMode") + } + + val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) resultDf.createOrReplaceTempView(queryName) val continuousQuery = df.sparkSession.sessionState.continuousQueryManager.startQuery( @@ -353,7 +381,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { queryName, checkpointLocation, df, - dataSource.createSink(), + dataSource.createSink(outputMode), outputMode, trigger) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e5dd4d81d677..1f5f23140dec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -214,15 +214,20 @@ case class DataSource( } /** Returns a sink that can be used to continually write data. */ - def createSink(): Sink = { + def createSink(outputMode: OutputMode): Sink = { providingClass.newInstance() match { - case s: StreamSinkProvider => s.createSink(sparkSession.sqlContext, options, partitionColumns) + case s: StreamSinkProvider => + s.createSink(sparkSession.sqlContext, options, partitionColumns, outputMode) case parquet: parquet.DefaultSource => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") }) + if (outputMode != OutputMode.Append) { + throw new IllegalArgumentException( + s"Data source $className does not support $outputMode output mode") + } new FileStreamSink(sparkSession, path, parquet, partitionColumns, options) case _ => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index f11a3fb969db..391f1e54b754 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.{DataFrame, OutputMode, SQLContext} import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} class ConsoleSink(options: Map[String, String]) extends Sink with Logging { @@ -52,7 +52,8 @@ class ConsoleSinkProvider extends StreamSinkProvider with DataSourceRegister { def createSink( sqlContext: SQLContext, parameters: Map[String, String], - partitionColumns: Seq[String]): Sink = { + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { new ConsoleSink(parameters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index bcc33ae8c88b..0338fb83f933 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SQLContext} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LeafNode @@ -114,35 +114,48 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit * tests and does not provide durability. */ -class MemorySink(val schema: StructType) extends Sink with Logging { +class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink with Logging { + + private case class AddedData(batchId: Long, data: Array[Row]) + /** An order list of batches that have been written to this [[Sink]]. */ @GuardedBy("this") - private val batches = new ArrayBuffer[Array[Row]]() + private val batches = new ArrayBuffer[AddedData]() /** Returns all rows that are stored in this [[Sink]]. */ def allData: Seq[Row] = synchronized { - batches.flatten + batches.map(_.data).flatten } - def latestBatchId: Option[Int] = synchronized { - if (batches.size == 0) None else Some(batches.size - 1) + def latestBatchId: Option[Long] = synchronized { + batches.lastOption.map(_.batchId) } - def lastBatch: Seq[Row] = synchronized { batches.last } + def lastBatch: Seq[Row] = synchronized { batches.lastOption.toSeq.flatten(_.data) } def toDebugString: String = synchronized { - batches.zipWithIndex.map { case (b, i) => - val dataStr = try b.mkString(" ") catch { + batches.map { case AddedData(batchId, data) => + val dataStr = try data.mkString(" ") catch { case NonFatal(e) => "[Error converting to string]" } - s"$i: $dataStr" + s"$batchId: $dataStr" }.mkString("\n") } override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { - if (batchId == batches.size) { - logDebug(s"Committing batch $batchId") - batches.append(data.collect()) + if (latestBatchId.isEmpty || batchId > latestBatchId.get) { + logDebug(s"Committing batch $batchId to $this") + outputMode match { + case OutputMode.Append | OutputMode.Update => + batches.append(AddedData(batchId, data.collect())) + + case OutputMode.Complete => + batches.clear() + batches.append(AddedData(batchId, data.collect())) + + case _ => + throw new IllegalArgumentException("Data source ") + } } else { logDebug(s"Skipping already committed batch: $batchId") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 26285bde31ad..3d4edbb93d69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -137,7 +137,8 @@ trait StreamSinkProvider { def createSink( sqlContext: SQLContext, parameters: Map[String, String], - partitionColumns: Seq[String]): Sink + partitionColumns: Seq[String], + outputMode: OutputMode): Sink } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index f5093d295783..c4a0ce1c7bb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -198,7 +198,7 @@ trait StreamTest extends QueryTest with Timeouts { var currentStream: StreamExecution = null var lastStream: StreamExecution = null val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for - val sink = new MemorySink(stream.schema) + val sink = new MemorySink(stream.schema, outputMode) @volatile var streamDeathCause: Throwable = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 77590d53f79a..52991fb8e7e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -237,16 +237,14 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with try { val df = ds.toDF val metadataRoot = - Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath - query = spark - .streams - .startQuery( - StreamExecution.nextName, - metadataRoot, - df, - new MemorySink(df.schema), - OutputMode.Append) - .asInstanceOf[StreamExecution] + Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath + query = + df.write + .format("memory") + .option("checkpointLocation", "memory") + .outputMode("append") + .startStream() + .asInstanceOf[StreamExecution] } catch { case NonFatal(e) => if (query != null) query.stop() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index e6c0ce95e7b5..4daadfe8e548 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -90,10 +90,11 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { override def createSink( spark: SQLContext, parameters: Map[String, String], - partitionColumns: Seq[String]): Sink = { + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { LastOptions.parameters = parameters LastOptions.partitionColumns = partitionColumns - LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns) + LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) new Sink { override def addBatch(batchId: Long, data: DataFrame): Unit = {} } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 16d369073124..a3218d7cbb3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException -import org.apache.spark.sql.{OutputMode, StreamTest} +import org.apache.spark.sql.{AnalysisException, OutputMode, StreamTest} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed @@ -87,6 +87,20 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be ) } + test("simple count, append mode") { + val inputData = MemoryStream[Int] + + val aggregated = + inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + intercept[AnalysisException] { + testStream(aggregated, OutputMode.Append)() + } + } + test("multiple keys") { val inputData = MemoryStream[Int] From 2786090bccd64945c273f9344c0493e4d93eec14 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 24 May 2016 15:11:32 -0700 Subject: [PATCH 03/16] Added unit test for MemorySink --- .../sql/execution/streaming/memory.scala | 2 +- .../org/apache/spark/sql/StreamTest.scala | 2 +- .../spark/sql/streaming/MemorySinkSuite.scala | 84 ++++++++++++++++++- 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 0338fb83f933..44889da03e1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -131,7 +131,7 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi batches.lastOption.map(_.batchId) } - def lastBatch: Seq[Row] = synchronized { batches.lastOption.toSeq.flatten(_.data) } + def latestBatchData: Seq[Row] = synchronized { batches.lastOption.toSeq.flatten(_.data) } def toDebugString: String = synchronized { batches.map { case AddedData(batchId, data) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index c4a0ce1c7bb4..b033725f18b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -429,7 +429,7 @@ trait StreamTest extends QueryTest with Timeouts { } } - val sparkAnswer = try if (lastOnly) sink.lastBatch else sink.allData catch { + val sparkAnswer = try if (lastOnly) sink.latestBatchData else sink.allData catch { case e: Exception => failTest("Exception while getting data from sink", e) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala index 09c35bbf2c34..8fcf4a6cb4a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala @@ -17,14 +17,85 @@ package org.apache.spark.sql.streaming -import org.apache.spark.sql.{AnalysisException, Row, StreamTest} +import scala.language.implicitConversions + +import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils class MemorySinkSuite extends StreamTest with SharedSQLContext { import testImplicits._ + test("add data in Append output mode") { + implicit val schema = new StructType().add(new StructField("value", IntegerType)) + val sink = new MemorySink(schema, OutputMode.Append) + + // Before adding data, check output + assert(sink.latestBatchId === None) + checkAnswer(sink.latestBatchData, Seq.empty) + checkAnswer(sink.allData, Seq.empty) + + // Add batch 0 and check outputs + sink.addBatch(0, 1 to 3) + assert(sink.latestBatchId === Some(0)) + checkAnswer(sink.latestBatchData, 1 to 3) + checkAnswer(sink.allData, 1 to 3) + + // Add batch 1 and check outputs + sink.addBatch(1, 4 to 6) + assert(sink.latestBatchId === Some(1)) + checkAnswer(sink.latestBatchData, 4 to 6) + checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data + + // Re-add batch 1 with different data, should not be added and outputs should not be changed + sink.addBatch(1, 7 to 9) + assert(sink.latestBatchId === Some(1)) + checkAnswer(sink.latestBatchData, 4 to 6) + checkAnswer(sink.allData, 1 to 6) + + // Add batch 2 and check outputs + sink.addBatch(2, 7 to 9) + assert(sink.latestBatchId === Some(2)) + checkAnswer(sink.latestBatchData, 7 to 9) + checkAnswer(sink.allData, 1 to 9) + } + + test("add data in Complete output mode") { + implicit val schema = new StructType().add(new StructField("value", IntegerType)) + val sink = new MemorySink(schema, OutputMode.Complete) + + // Before adding data, check output + assert(sink.latestBatchId === None) + checkAnswer(sink.latestBatchData, Seq.empty) + checkAnswer(sink.allData, Seq.empty) + + // Add batch 0 and check outputs + sink.addBatch(0, 1 to 3) + assert(sink.latestBatchId === Some(0)) + checkAnswer(sink.latestBatchData, 1 to 3) + checkAnswer(sink.allData, 1 to 3) + + // Add batch 1 and check outputs + sink.addBatch(1, 4 to 6) + assert(sink.latestBatchId === Some(1)) + checkAnswer(sink.latestBatchData, 4 to 6) + checkAnswer(sink.allData, 4 to 6) // new data should replace old data + + // Re-add batch 1 with different data, should not be added and outputs should not be changed + sink.addBatch(1, 7 to 9) + assert(sink.latestBatchId === Some(1)) + checkAnswer(sink.latestBatchData, 4 to 6) + checkAnswer(sink.allData, 4 to 6) + + // Add batch 2 and check outputs + sink.addBatch(2, 7 to 9) + assert(sink.latestBatchId === Some(2)) + checkAnswer(sink.latestBatchData, 7 to 9) + checkAnswer(sink.allData, 7 to 9) + } + test("registering as a table") { testRegisterAsTable() } @@ -88,4 +159,15 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext { .startStream() } } + + private def checkAnswer(rows: Seq[Row], expected: Seq[Int])(implicit schema: StructType): Unit = { + checkAnswer( + sqlContext.createDataFrame(sparkContext.makeRDD(rows), schema), + intsToDF(expected)(schema)) + } + + implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): DataFrame = { + require(schema.fields.size === 1) + sqlContext.createDataset(seq).toDF(schema.fieldNames.head) + } } From 02b10ac4419f657e3756a95f352a29e20d01ad7d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 24 May 2016 15:25:41 -0700 Subject: [PATCH 04/16] Added unit test to DataFrameReaderWriterSuite --- .../apache/spark/sql/DataFrameWriter.scala | 19 ++++++++++++ .../DataFrameReaderWriterSuite.scala | 29 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index f26ce7dbcff6..3f8b95e7f105 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -82,6 +82,25 @@ final class DataFrameWriter private[sql](df: DataFrame) { this } + /** + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append`: only the new rows in the streaming DataFrame/Dataset will be + * written to the sink + * - `OutputMode.Update`: only the changed rows in the streaming DataFrame/Dataset will be + * written to the sink every time there is some updates + * - `OutputMode.Complete`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataFrameWriter = { + assertStreaming("outputMode() can only be called on continuous queries") + this.outputMode = outputMode + this + } + + /** * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index 4daadfe8e548..2c734a29bb4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -418,6 +418,35 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B assert(e.getMessage == "mode() can only be called on non-continuous queries;") } + test("check outputMode(OutputMode) can only be called on continuous queries") { + val df = spark.read.text(newTextInput) + val w = df.write.option("checkpointLocation", newMetadataDir) + val e = intercept[AnalysisException](w.outputMode(OutputMode.Append)) + Seq("outputmode", "continuous queries").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + } + + test("check outputMode(string) can only be called on continuous queries") { + val df = spark.read.text(newTextInput) + val w = df.write.option("checkpointLocation", newMetadataDir) + val e = intercept[AnalysisException](w.outputMode("append")) + Seq("outputmode", "continuous queries").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + } + + test("check outputMode(string) throws expcetion on incorrect mode") { + val df = spark.read + .format("org.apache.spark.sql.streaming.test") + .stream() + val w = df.write + val e = intercept[IllegalArgumentException](w.outputMode("xyz")) + Seq("output mode", "unknown", "xyz").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + } + test("check bucketBy() can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") From 61af0573a112a54bab05c070de7a36b8c74703dc Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 24 May 2016 15:53:12 -0700 Subject: [PATCH 05/16] Added python API for output mode --- python/pyspark/sql/readwriter.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 855c9d666f0b..75f10422dc09 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -497,6 +497,23 @@ def mode(self, saveMode): self._jwrite = self._jwrite.mode(saveMode) return self + @since(2.0) + def outputMode(self, outputMode): + """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + + Options include: + * `append`: only the new rows in the streaming DataFrame/Dataset will be written to + the sink + * `update`: only the changed rows in the streaming DataFrame/Dataset will be written to + the sink every time there is some updates + * `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + every time these is some updates + >>> sdf.write.outputMode('append') + """ + if outputMode is not None: + self._jwrite = self._jwrite.outputMode(outputMode) + return self + @since(1.4) def format(self, source): """Specifies the underlying output data source. From bb0314d32dfd42ea6081427042e182706a83b6ff Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 24 May 2016 16:44:52 -0700 Subject: [PATCH 06/16] Fixed python style --- python/pyspark/sql/readwriter.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 6aa9396be6ce..4ffe6fe0a967 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -505,12 +505,14 @@ def outputMode(self, outputMode): """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. Options include: - * `append`: only the new rows in the streaming DataFrame/Dataset will be written to - the sink - * `update`: only the changed rows in the streaming DataFrame/Dataset will be written to - the sink every time there is some updates - * `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink - every time these is some updates + + * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to + the sink + * `update`:Only the changed rows in the streaming DataFrame/Dataset will be written to + the sink every time there is some updates + * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink + every time these is some updates + >>> sdf.write.outputMode('append') """ if outputMode is not None: From 3a79d41b14535ffe4f65595126614832a094bf9b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 24 May 2016 16:53:34 -0700 Subject: [PATCH 07/16] Refactored injection of output mode in StateStoreSaveExec --- .../spark/sql/execution/SparkStrategies.scala | 19 ++++++++++ .../spark/sql/execution/aggregate/utils.scala | 6 +-- .../streaming/IncrementalExecution.scala | 38 +++++-------------- .../streaming/StatefulAggregate.scala | 6 ++- 4 files changed, 34 insertions(+), 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 463c00563899..c46cecc71f37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -199,6 +199,25 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + /** + * Used to plan aggregation queries that are computed incrementally as part of a + * [[org.apache.spark.sql.ContinuousQuery]]. Currently this rule is injected into the planner + * on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]] + */ + object StatefulAggregationStrategy extends Strategy { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalAggregation( + namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) => + + aggregate.Utils.planStreamingAggregation( + namedGroupingExpressions, + aggregateExpressions, + rewrittenResultExpressions, + planLater(child)) + + case _ => Nil + } + } /** * Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala index a7d739e2f4ee..ecb37d132308 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala @@ -262,7 +262,6 @@ object Utils { groupingExpressions: Seq[NamedExpression], functionsWithoutDistinct: Seq[AggregateExpression], resultExpressions: Seq[NamedExpression], - outputMode: OutputMode, child: SparkPlan): Seq[SparkPlan] = { val groupingAttributes = groupingExpressions.map(_.toAttribute) @@ -313,9 +312,8 @@ object Utils { aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes), child = restored) } - val returnAllStates = if (outputMode == OutputMode.Complete) true else false - - val saved = StateStoreSaveExec(groupingAttributes, None, returnAllStates, partialMerged2) + val saved = StateStoreSaveExec( + groupingAttributes, stateId = None, returnAllStates = None, partialMerged2) val finalAndCompleteAggregate: SparkPlan = { val finalAggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = Final)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index ee05489f0e9b..1a11c0a86248 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -17,8 +17,7 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.planning.PhysicalAggregation +import org.apache.spark.sql.{OutputMode, SparkSession} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} @@ -35,36 +34,15 @@ class IncrementalExecution private[sql]( val currentBatchId: Long) extends QueryExecution(sparkSession, logicalPlan) { + // TODO: make this always part of planning. + val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy :: Nil + // Modified planner with stateful operations. override def planner: SparkPlanner = new SparkPlanner( sparkSession.sparkContext, sparkSession.sessionState.conf, - Nil) { - - override def strategies: Seq[Strategy] = { - StatefulAggregationStrategy +: super.strategies - } - - /** - * Used to plan aggregation queries that are computed incrementally as part of a - * [[org.apache.spark.sql.ContinuousQuery]]. - */ - object StatefulAggregationStrategy extends Strategy { - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalAggregation( - namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) => - execution.aggregate.Utils.planStreamingAggregation( - namedGroupingExpressions, - aggregateExpressions, - rewrittenResultExpressions, - outputMode, - planLater(child)) - - case _ => Nil - } - } - } + stateStrategy) /** * Records the current id for a given stateful operator in the query plan as the `state` @@ -74,17 +52,19 @@ class IncrementalExecution private[sql]( /** Locates save/restore pairs surrounding aggregation. */ val state = new Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = plan transform { - case StateStoreSaveExec(keys, None, outputMode, + case StateStoreSaveExec(keys, None, None, UnaryExecNode(agg, StateStoreRestoreExec(keys2, None, child))) => val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId) + val returnAllStates = if (outputMode == OutputMode.Complete) true else false operatorId += 1 StateStoreSaveExec( keys, Some(stateId), - outputMode, + Some(returnAllStates), agg.withNewChildren( StateStoreRestoreExec( keys, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala index 62d60ad27c6a..aa19afe3d906 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala @@ -82,12 +82,14 @@ case class StateStoreRestoreExec( case class StateStoreSaveExec( keyExpressions: Seq[Attribute], stateId: Option[OperatorStateId], - returnAllStates: Boolean, + returnAllStates: Option[Boolean], child: SparkPlan) extends execution.UnaryExecNode with StatefulOperator { override protected def doExecute(): RDD[InternalRow] = { - val saveAndReturnFunc = if (returnAllStates) saveAndReturnAll _ else saveAndReturnUpdated _ + assert(returnAllStates.nonEmpty, + "Incorrect planning in IncrementalExecution, returnAllStates have not been set") + val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ else saveAndReturnUpdated _ child.execute().mapPartitionsWithStateStore( getStateId.checkpointLocation, operatorId = getStateId.operatorId, From 074299ca9bf04a3b14d9c54ba7fc2cd2b4bce94b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 24 May 2016 19:24:26 -0700 Subject: [PATCH 08/16] Fixed test bug --- .../spark/sql/streaming/ContinuousQueryManagerSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 52991fb8e7e5..513ccdaef63b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -241,7 +241,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with query = df.write .format("memory") - .option("checkpointLocation", "memory") + .queryName(s"query${Random.nextInt(100000)}") + .option("checkpointLocation", metadataRoot) .outputMode("append") .startStream() .asInstanceOf[StreamExecution] From 58f88b8f829b7a1408b2ce471b4d5d0a23031ee7 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 25 May 2016 01:06:01 -0700 Subject: [PATCH 09/16] Fixed more unit tests --- .../UnsupportedOperationChecker.scala | 4 ++-- .../analysis/UnsupportedOperationsSuite.scala | 24 +------------------ 2 files changed, 3 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 6ebed1adbd41..470d5fbf1ed0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -130,12 +130,12 @@ object UnsupportedOperationChecker { outputMode match { case OutputMode.Append if aggregates.nonEmpty => throwError( - s"$outputMode output mode not supported with streaming aggregates on " + + s"$outputMode output mode not supported when there are streaming aggregations on " + s"streaming DataFrames/DataSets") case OutputMode.Complete | OutputMode.Update if aggregates.isEmpty => throwError( - s"$outputMode output mode not supported when not streaming aggregates are present on " + + s"$outputMode output mode not supported when there are no streaming aggregations on " + s"streaming DataFrames/Datasets") case _ => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 354b4ccf2f61..08a309f5698d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -75,35 +75,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode = Append, expectedMsgs = "commands" :: Nil) - // Aggregates: Not supported on streams in Append mode - assertSupportedInStreamingPlan( - "aggregate - batch with update output mode", - batchRelation.groupBy("a")("count(*)"), - outputMode = Update) - - assertSupportedInStreamingPlan( - "aggregate - batch with append output mode", - batchRelation.groupBy("a")("count(*)"), - outputMode = Append) - - assertSupportedInStreamingPlan( - "aggregate - stream with update output mode", - streamRelation.groupBy("a")("count(*)"), - outputMode = Update) - - assertNotSupportedInStreamingPlan( - "aggregate - stream with append output mode", - streamRelation.groupBy("a")("count(*)"), - outputMode = Append, - Seq("aggregation", "append output mode")) - // Multiple streaming aggregations not supported def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name)) assertSupportedInStreamingPlan( "aggregate - multiple batch aggregations", Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), batchRelation)), - Update) + Append) assertSupportedInStreamingPlan( "aggregate - multiple aggregations but only one streaming aggregation", From bbd6022cf3238a78604c8ad43df4c917d792f37f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 26 May 2016 15:03:22 -0700 Subject: [PATCH 10/16] Refactored OutputMode --- python/pyspark/sql/readwriter.py | 2 ++ .../org/apache/spark/sql/OutputMode.java | 13 ++++++--- .../spark/sql/InternalOutputModes.scala | 24 +++++++++++++++++ .../UnsupportedOperationChecker.scala | 27 +++++++++---------- .../analysis/UnsupportedOperationsSuite.scala | 8 +++--- .../apache/spark/sql/DataFrameWriter.scala | 21 +++++++-------- .../streaming/IncrementalExecution.scala | 4 +-- .../sql/execution/streaming/memory.scala | 4 +-- .../spark/sql/streaming/MemorySinkSuite.scala | 6 ++--- .../streaming/StreamingAggregationSuite.scala | 15 ++++++----- .../test/DataFrameReaderWriterSuite.scala | 20 ++++++++------ 11 files changed, 88 insertions(+), 56 deletions(-) rename sql/catalyst/src/main/{scala => java}/org/apache/spark/sql/OutputMode.java (79%) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1a7aabfac123..11a1b21ba2c5 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -513,6 +513,8 @@ def outputMode(self, outputMode): * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink every time these is some updates + .. note:: Experimental. + >>> sdf.write.outputMode('append') """ if outputMode is not None: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java similarity index 79% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/OutputMode.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java index 4053a7c0337d..9795c84916cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/OutputMode.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java @@ -17,8 +17,13 @@ package org.apache.spark.sql; -public enum OutputMode { - Append, - Update, - Complete +public class OutputMode { + + public static OutputMode Append() { + return InternalOutputModes.Append$.MODULE$; + } + + public static OutputMode Complete() { + return InternalOutputModes.Complete$.MODULE$; + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala new file mode 100644 index 000000000000..e40abb604368 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +private[sql] object InternalOutputModes { + case object Append extends OutputMode + case object Complete extends OutputMode + case object Update extends OutputMode +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 470d5fbf1ed0..f4c03476096d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -17,8 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.OutputMode +import org.apache.spark.sql.{AnalysisException, InternalOutputModes, OutputMode} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -30,24 +29,23 @@ object UnsupportedOperationChecker { def checkForBatch(plan: LogicalPlan): Unit = { plan.foreachUp { case p if p.isStreaming => - throwError( - "Queries with streaming sources must be executed with write.startStream()")(p) + throwError("Queries with streaming sources must be executed with write.startStream()")(p) case _ => } } - def checkForStreaming(implicit plan: LogicalPlan, outputMode: OutputMode): Unit = { + def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = { if (!plan.isStreaming) { throwError( "Queries without streaming sources cannot be executed with write.startStream()")(plan) } - plan.foreachUp { implicit plan => + plan.foreachUp { implicit subPlan => // Operations that cannot exists anywhere in a streaming plan - plan match { + subPlan match { case _: Command => throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " + @@ -105,10 +103,10 @@ object UnsupportedOperationChecker { case GroupingSets(_, _, child, _) if child.isStreaming => throwError("GroupingSets is not supported on streaming DataFrames/Datasets") - case GlobalLimit(_, _) | LocalLimit(_, _) if plan.children.forall(_.isStreaming) => + case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) => throwError("Limits are not supported on streaming DataFrames/Datasets") - case Sort(_, _, _) | SortPartitions(_, _) if plan.children.forall(_.isStreaming) => + case Sort(_, _, _) | SortPartitions(_, _) if subPlan.children.forall(_.isStreaming) => throwError("Sorting is not supported on streaming DataFrames/Datasets") case Sample(_, _, _, _, child) if child.isStreaming => @@ -128,21 +126,22 @@ object UnsupportedOperationChecker { // Checks related to aggregations val aggregates = plan.collect { case a @ Aggregate(_, _, _) if a.isStreaming => a } outputMode match { - case OutputMode.Append if aggregates.nonEmpty => + case InternalOutputModes.Append if aggregates.nonEmpty => throwError( s"$outputMode output mode not supported when there are streaming aggregations on " + - s"streaming DataFrames/DataSets") + s"streaming DataFrames/DataSets")(plan) - case OutputMode.Complete | OutputMode.Update if aggregates.isEmpty => + case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty => throwError( s"$outputMode output mode not supported when there are no streaming aggregations on " + - s"streaming DataFrames/Datasets") + s"streaming DataFrames/Datasets")(plan) case _ => } if (aggregates.size > 1) { throwError( - "Multiple streaming aggregations are not supported with streaming DataFrames/Datasets") + "Multiple streaming aggregations are not supported with " + + "streaming DataFrames/Datasets")(plan) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index c87023caf26f..c2e3d474506d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{AnalysisException, OutputMode} -import org.apache.spark.sql.OutputMode._ +import org.apache.spark.sql.InternalOutputModes._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -197,9 +197,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite { "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows") // Output modes with aggregation and non-aggregation plans - testOutputMode(OutputMode.Append, shouldSupportAggregation = false) - testOutputMode(OutputMode.Update, shouldSupportAggregation = true) - testOutputMode(OutputMode.Complete, shouldSupportAggregation = true) + testOutputMode(Append, shouldSupportAggregation = false) + testOutputMode(Update, shouldSupportAggregation = true) + testOutputMode(Complete, shouldSupportAggregation = true) /* ======================================================================================= diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index b4491a3cd896..071e7997d7ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -84,11 +84,9 @@ final class DataFrameWriter private[sql](df: DataFrame) { /** * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. - * - `OutputMode.Append`: only the new rows in the streaming DataFrame/Dataset will be + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be * written to the sink - * - `OutputMode.Update`: only the changed rows in the streaming DataFrame/Dataset will be - * written to the sink every time there is some updates - * - `OutputMode.Complete`: all the rows in the streaming DataFrame/Dataset will be written + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written * to the sink every time these is some updates * * @since 2.0.0 @@ -100,13 +98,10 @@ final class DataFrameWriter private[sql](df: DataFrame) { this } - /** * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to * the sink - * - `update`: only the changed rows in the streaming DataFrame/Dataset will be written to - * the sink every time there is some updates * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink * every time these is some updates * @@ -116,11 +111,13 @@ final class DataFrameWriter private[sql](df: DataFrame) { def outputMode(outputMode: String): DataFrameWriter = { assertStreaming("outputMode() can only be called on continuous queries") this.outputMode = outputMode.toLowerCase match { - case "append" => OutputMode.Append - case "update" => OutputMode.Update - case "complete" => OutputMode.Complete - case _ => throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + - "Accepted output modes are 'append', 'update', 'complete'") + case "append" => + OutputMode.Append + case "complete" => + OutputMode.Complete + case _ => + throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append' and 'complete'") } this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index dc574b8730d7..5c8604985116 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.{OutputMode, SparkSession} +import org.apache.spark.sql.{InternalOutputModes, OutputMode, SparkSession} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} @@ -59,7 +59,7 @@ class IncrementalExecution private[sql]( UnaryExecNode(agg, StateStoreRestoreExec(keys2, None, child))) => val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId) - val returnAllStates = if (outputMode == OutputMode.Complete) true else false + val returnAllStates = if (outputMode == InternalOutputModes.Complete) true else false operatorId += 1 StateStoreSaveExec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 44889da03e1d..130e9ebac5f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -146,10 +146,10 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi if (latestBatchId.isEmpty || batchId > latestBatchId.get) { logDebug(s"Committing batch $batchId to $this") outputMode match { - case OutputMode.Append | OutputMode.Update => + case InternalOutputModes.Append | InternalOutputModes.Update => batches.append(AddedData(batchId, data.collect())) - case OutputMode.Complete => + case InternalOutputModes.Complete => batches.clear() batches.append(AddedData(batchId, data.collect())) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala index 1678734787db..a1e70f8e4ac9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala @@ -37,7 +37,7 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext with BeforeAndAft test("directly add data in Append output mode") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySink(schema, OutputMode.Append) + val sink = new MemorySink(schema, InternalOutputModes.Append) // Before adding data, check output assert(sink.latestBatchId === None) @@ -71,7 +71,7 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext with BeforeAndAft test("directly add data in Update output mode") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySink(schema, OutputMode.Append) + val sink = new MemorySink(schema, InternalOutputModes.Update) // Before adding data, check output assert(sink.latestBatchId === None) @@ -105,7 +105,7 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext with BeforeAndAft test("directly add data in Complete output mode") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySink(schema, OutputMode.Complete) + val sink = new MemorySink(schema, InternalOutputModes.Complete) // Before adding data, check output assert(sink.latestBatchId === None) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index a3218d7cbb3a..dd269b0e67d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.streaming import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, OutputMode, StreamTest} +import org.apache.spark.sql.{AnalysisException, StreamTest} +import org.apache.spark.sql.InternalOutputModes._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed @@ -49,7 +50,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be .agg(count("*")) .as[(Int, Long)] - testStream(aggregated, OutputMode.Update)( + testStream(aggregated, Update)( AddData(inputData, 3), CheckLastBatch((3, 1)), AddData(inputData, 3, 2), @@ -73,7 +74,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be .agg(count("*")) .as[(Int, Long)] - testStream(aggregated, OutputMode.Complete)( + testStream(aggregated, Complete)( AddData(inputData, 3), CheckLastBatch((3, 1)), AddData(inputData, 2), @@ -97,7 +98,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be .as[(Int, Long)] intercept[AnalysisException] { - testStream(aggregated, OutputMode.Append)() + testStream(aggregated, Append)() } } @@ -110,7 +111,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be .agg(count("*")) .as[(Int, Int, Long)] - testStream(aggregated, OutputMode.Update)( + testStream(aggregated, Update)( AddData(inputData, 1, 2), CheckLastBatch((1, 2, 1), (2, 3, 1)), AddData(inputData, 1, 2), @@ -135,7 +136,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be .agg(count("*")) .as[(Int, Long)] - testStream(aggregated, OutputMode.Update)( + testStream(aggregated, Update)( StartStream(), AddData(inputData, 1, 2, 3, 4), ExpectFailure[SparkException](), @@ -148,7 +149,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be val inputData = MemoryStream[(String, Int)] val aggregated = inputData.toDS().groupByKey(_._1).agg(typed.sumLong(_._2)) - testStream(aggregated, OutputMode.Update)( + testStream(aggregated, Update)( AddData(inputData, ("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)), CheckLastBatch(("a", 30), ("b", 3), ("c", 1)) ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index fcc3d388da77..38a0534ab6be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -435,15 +435,19 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B } } - test("check outputMode(string) throws expcetion on incorrect mode") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[IllegalArgumentException](w.outputMode("xyz")) - Seq("output mode", "unknown", "xyz").foreach { s => - assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + test("check outputMode(string) throws exception on unsupported modes") { + def testError(outputMode: String): Unit = { + val df = spark.read + .format("org.apache.spark.sql.streaming.test") + .stream() + val w = df.write + val e = intercept[IllegalArgumentException](w.outputMode(outputMode)) + Seq("output mode", "unknown", outputMode).foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } } + testError("Update") + testError("Xyz") } test("check bucketBy() can only be called on non-continuous queries") { From 4973621c23aac441c26c172a6e6322454b9797f7 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 26 May 2016 16:01:50 -0700 Subject: [PATCH 11/16] Fix python test --- python/pyspark/sql/readwriter.py | 7 ++++--- python/pyspark/sql/tests.py | 5 +++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 11a1b21ba2c5..2a0367ea9e68 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -515,10 +515,11 @@ def outputMode(self, outputMode): .. note:: Experimental. - >>> sdf.write.outputMode('append') + >>> writer = sdf.write.outputMode('append') """ - if outputMode is not None: - self._jwrite = self._jwrite.outputMode(outputMode) + if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0: + raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode) + self._jwrite = self._jwrite.outputMode(outputMode) return self @since(1.4) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 1790432edd5d..3c4873956ee7 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -926,7 +926,7 @@ def test_stream_save_options(self): out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') cq = df.write.option('checkpointLocation', chk).queryName('this_query') \ - .format('parquet').option('path', out).startStream() + .format('parquet').outputMode('append').option('path', out).startStream() try: self.assertEqual(cq.name, 'this_query') self.assertTrue(cq.isActive) @@ -952,7 +952,8 @@ def test_stream_save_options_overwrite(self): fake1 = os.path.join(tmpPath, 'fake1') fake2 = os.path.join(tmpPath, 'fake2') cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \ - .queryName('fake_query').startStream(path=out, format='parquet', queryName='this_query', + .queryName('fake_query').outputMode('append') \ + .startStream(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: self.assertEqual(cq.name, 'this_query') From 369e9d5d58a762d9af794ed791410249d7032fa5 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 26 May 2016 16:18:53 -0700 Subject: [PATCH 12/16] Improved tests --- python/pyspark/sql/tests.py | 4 +-- .../apache/spark/sql/DataFrameWriter.scala | 4 --- .../spark/sql/streaming/MemorySinkSuite.scala | 29 ++++++++++++++++++- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 3c4873956ee7..0d9dd5ea2a36 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -953,8 +953,8 @@ def test_stream_save_options_overwrite(self): fake2 = os.path.join(tmpPath, 'fake2') cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \ .queryName('fake_query').outputMode('append') \ - .startStream(path=out, format='parquet', queryName='this_query', - checkpointLocation=chk) + .startStream(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) + try: self.assertEqual(cq.name, 'this_query') self.assertTrue(cq.isActive) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 071e7997d7ab..674895582a64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -359,10 +359,6 @@ final class DataFrameWriter private[sql](df: DataFrame) { checkpointPath.toUri.toString } - if (!Seq(OutputMode.Append, OutputMode.Complete).contains(outputMode)) { - throw new IllegalArgumentException(s"Memory sink does not support output mode $outputMode") - } - val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) resultDf.createOrReplaceTempView(queryName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala index a1e70f8e4ac9..41247d66fd2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala @@ -138,10 +138,11 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext with BeforeAndAft } - test("registering as a table: append mode") { + test("registering as a table in Append output mode") { val input = MemoryStream[Int] val query = input.toDF().write .format("memory") + .outputMode("append") .queryName("memStream") .startStream() input.addData(1, 2, 3) @@ -160,6 +161,32 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext with BeforeAndAft query.stop() } + test("registering as a table in Complete output mode") { + val input = MemoryStream[Int] + val query = input.toDF() + .groupBy("value") + .count() + .write + .format("memory") + .outputMode("complete") + .queryName("memStream") + .startStream() + input.addData(1, 2, 3) + query.processAllAvailable() + + checkDataset( + spark.table("memStream").as[(Int, Long)], + (1, 1L), (2, 1L), (3, 1L)) + + input.addData(4, 5, 6) + query.processAllAvailable() + checkDataset( + spark.table("memStream").as[(Int, Long)], + (1, 1L), (2, 1L), (3, 1L), (4, 1L), (5, 1L), (6, 1L)) + + query.stop() + } + ignore("stress test") { // Ignore the stress test as it takes several minutes to run (0 until 1000).foreach { _ => From ab32567926c3efc727fee0948757d7db2f58d3f2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 26 May 2016 17:26:15 -0700 Subject: [PATCH 13/16] Added docs --- .../java/org/apache/spark/sql/OutputMode.java | 25 +++++++++++++++++++ .../spark/sql/InternalOutputModes.scala | 21 ++++++++++++++++ .../apache/spark/sql/DataFrameWriter.scala | 4 +-- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java index 9795c84916cf..1936d53e5e83 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java @@ -17,12 +17,37 @@ package org.apache.spark.sql; +import org.apache.spark.annotation.Experimental; + +/** + * :: Experimental :: + * + * OutputMode is used to what data will be written to a streaming sink when there is + * new data available in a streaming DataFrame/Dataset. + * + * @since 2.0.0 + */ +@Experimental public class OutputMode { + /** + * OutputMode in which only the new rows in the streaming DataFrame/Dataset will be + * written to the sink. This output mode can be only be used in queries that do not + * contain any aggregation. + * + * @since 2.0.0 + */ public static OutputMode Append() { return InternalOutputModes.Append$.MODULE$; } + /** + * OutputMode in which all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates. This output mode can only be used in queries + * that contain aggregations. + * + * @since 2.0.0 + */ public static OutputMode Complete() { return InternalOutputModes.Complete$.MODULE$; } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala index e40abb604368..8ef5d9a65327 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala @@ -17,8 +17,29 @@ package org.apache.spark.sql +/** + * Internal helper class to generate objects representing various [[OutputMode]]s, + */ private[sql] object InternalOutputModes { + + /** + * OutputMode in which only the new rows in the streaming DataFrame/Dataset will be + * written to the sink. This output mode can be only be used in queries that do not + * contain any aggregation. + */ case object Append extends OutputMode + + /** + * OutputMode in which all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates. This output mode can only be used in queries + * that contain aggregations. + */ case object Complete extends OutputMode + + /** + * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be + * written to the sink every time these is some updates. This output mode can only be used in + * queries that contain aggregations. + */ case object Update extends OutputMode } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 674895582a64..05a7172691c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -84,10 +84,10 @@ final class DataFrameWriter private[sql](df: DataFrame) { /** * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. - * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be * written to the sink * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written - * to the sink every time these is some updates + * to the sink every time these is some updates * * @since 2.0.0 */ From 85ce2638cf9c9150e2258749bb894a39779d24cc Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 26 May 2016 18:20:20 -0700 Subject: [PATCH 14/16] Fix python docs --- python/pyspark/sql/readwriter.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 2a0367ea9e68..36464b576405 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -508,8 +508,6 @@ def outputMode(self, outputMode): * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to the sink - * `update`:Only the changed rows in the streaming DataFrame/Dataset will be written to - the sink every time there is some updates * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink every time these is some updates From 4784e18efcc552fc99c2fd9d5ccfe1c78e479c79 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 27 May 2016 17:42:36 -0700 Subject: [PATCH 15/16] Addressed comments --- .../apache/spark/sql/JavaOutputModeSuite.java | 18 ++++++++++++++++++ .../streaming/StatefulAggregate.scala | 10 ++++++++++ .../spark/sql/execution/streaming/memory.scala | 3 ++- .../ContinuousQueryManagerSuite.scala | 4 ++-- .../spark/sql/streaming/MemorySinkSuite.scala | 2 +- .../spark/sql/streaming/StreamSuite.scala | 7 +++++++ .../streaming/StreamingAggregationSuite.scala | 5 ++++- 7 files changed, 44 insertions(+), 5 deletions(-) create mode 100644 sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java new file mode 100644 index 000000000000..fa22c77cde9d --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java @@ -0,0 +1,18 @@ +package org.apache.spark.sql; + + +import org.apache.spark.sql.OutputMode; +import org.junit.After; +import org.junit.Test; + + +public class JavaOutputModeSuite { + + @Test + public void testOutputModes() { + OutputMode o1 = OutputMode.Append(); + assert(o1.toString().toLowerCase().contains("append")); + OutputMode o2 = OutputMode.Complete(); + assert (o2.toString().toLowerCase().contains("complete")); + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala index aa19afe3d906..4d0283fbef1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala @@ -103,6 +103,11 @@ case class StateStoreSaveExec( override def output: Seq[Attribute] = child.output + /** + * Save all the rows to the state store, and return all the rows in the state store. + * Note that this returns an iterator that pipelines the saving to store with downstream + * processing. + */ private def saveAndReturnUpdated( store: StateStore, iter: Iterator[InternalRow]): Iterator[InternalRow] = { @@ -128,6 +133,11 @@ case class StateStoreSaveExec( } } + /** + * Save all the rows to the state store, and return all the rows in the state store. + * Note that the saving to store is blocking; only after all the rows have been saved + * is the iterator on the update store data is generated. + */ private def saveAndReturnAll( store: StateStore, iter: Iterator[InternalRow]): Iterator[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 130e9ebac5f0..e4a95e733530 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -154,7 +154,8 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi batches.append(AddedData(batchId, data.collect())) case _ => - throw new IllegalArgumentException("Data source ") + throw new IllegalArgumentException( + s"Output mode $outputMode is not supported by MemorySink") } } else { logDebug(s"Skipping already committed batch: $batchId") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 513ccdaef63b..b75c3ea106e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -232,7 +232,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[ContinuousQuery] => Unit): Unit = { failAfter(streamingTimeout) { val queries = withClue("Error starting queries") { - datasets.map { ds => + datasets.zipWithIndex.map { case (ds, i) => @volatile var query: StreamExecution = null try { val df = ds.toDF @@ -241,7 +241,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with query = df.write .format("memory") - .queryName(s"query${Random.nextInt(100000)}") + .queryName(s"query$i") .option("checkpointLocation", metadataRoot) .outputMode("append") .startStream() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala index 41247d66fd2a..e5bd0b47443e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala @@ -251,7 +251,7 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext with BeforeAndAft intsToDF(expected)(schema)) } - implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): DataFrame = { + private implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): DataFrame = { require(schema.fields.size === 1) sqlContext.createDataset(seq).toDF(schema.fieldNames.head) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index ae89a6887a6d..c17cb1de6ce9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -235,6 +235,13 @@ class StreamSuite extends StreamTest with SharedSQLContext { spark.experimental.extraStrategies = Nil } } + + test("output mode API in Scala") { + val o1 = OutputMode.Append + assert(o1 === InternalOutputModes.Append) + val o2 = OutputMode.Complete + assert(o2 === InternalOutputModes.Complete) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index dd269b0e67d3..322bbb9ea0a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -97,9 +97,12 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be .agg(count("*")) .as[(Int, Long)] - intercept[AnalysisException] { + val e = intercept[AnalysisException] { testStream(aggregated, Append)() } + Seq("append", "not supported").foreach { m => + assert(e.getMessage.toLowerCase.contains(m.toLowerCase)) + } } test("multiple keys") { From e951798bf4511cabc08d242e7f1a3d7d1e653263 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 27 May 2016 17:49:31 -0700 Subject: [PATCH 16/16] Fixed RAT --- .../apache/spark/sql/JavaOutputModeSuite.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java index fa22c77cde9d..1764f3348d8f 100644 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java @@ -1,11 +1,24 @@ -package org.apache.spark.sql; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; -import org.apache.spark.sql.OutputMode; -import org.junit.After; import org.junit.Test; - public class JavaOutputModeSuite { @Test