diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 423f89fefa093..0c11830cf06dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -393,17 +393,6 @@ object UnsupportedOperationChecker extends Logging { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => - case Repartition(1, false, _) => - case node: Aggregate => - val aboveSinglePartitionCoalesce = node.find { - case Repartition(1, false, _) => true - case _ => false - }.isDefined - - if (!aboveSinglePartitionCoalesce) { - throwError(s"In continuous processing mode, coalesce(1) must be called before " + - s"aggregate operation ${node.nodeName}.") - } case node => throwError(s"Continuous processing does not support ${node.nodeName} operations.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index cca80c0cb6d57..f289a867e5ec0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalo import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} +import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} import org.apache.spark.sql.sources.{BaseRelation, TableScan} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -218,18 +218,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil - case Repartition(1, false, child) => - val isContinuous = child.find { - case r: StreamingDataSourceV2Relation => r.stream.isInstanceOf[ContinuousStream] - case _ => false - }.isDefined - - if (isContinuous) { - ContinuousCoalesceExec(1, planLater(child)) :: Nil - } else { - Nil - } - case desc @ DescribeNamespace(ResolvedNamespace(catalog, ns), extended) => DescribeNamespaceExec(desc.output, catalog.asNamespaceCatalog, ns, extended) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala deleted file mode 100644 index 4c621890c9793..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.continuous - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition} -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} - -/** - * Physical plan for coalescing a continuous processing plan. - * - * Currently, only coalesces to a single partition are supported. `numPartitions` must be 1. - */ -case class ContinuousCoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode { - override def output: Seq[Attribute] = child.output - - override def outputPartitioning: Partitioning = SinglePartition - - override def doExecute(): RDD[InternalRow] = { - assert(numPartitions == 1) - new ContinuousCoalesceRDD( - sparkContext, - numPartitions, - conf.continuousStreamingExecutorQueueSize, - sparkContext.getLocalProperty(ContinuousExecution.EPOCH_INTERVAL_KEY).toLong, - child.execute()) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala deleted file mode 100644 index 14046f6a99c24..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.continuous - -import java.util.UUID - -import org.apache.spark._ -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.execution.streaming.continuous.shuffle._ -import org.apache.spark.util.ThreadUtils - -case class ContinuousCoalesceRDDPartition( - index: Int, - endpointName: String, - queueSize: Int, - numShuffleWriters: Int, - epochIntervalMs: Long) - extends Partition { - // Initialized only on the executor, and only once even as we call compute() multiple times. - lazy val (reader: ContinuousShuffleReader, endpoint) = { - val env = SparkEnv.get.rpcEnv - val receiver = new RPCContinuousShuffleReader( - queueSize, numShuffleWriters, epochIntervalMs, env) - val endpoint = env.setupEndpoint(endpointName, receiver) - - TaskContext.get().addTaskCompletionListener[Unit] { ctx => - env.stop(endpoint) - } - (receiver, endpoint) - } - // This flag will be flipped on the executors to indicate that the threads processing - // partitions of the write-side RDD have been started. These will run indefinitely - // asynchronously as epochs of the coalesce RDD complete on the read side. - private[continuous] var writersInitialized: Boolean = false -} - -/** - * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local - * continuous shuffle, and then reads them in the task thread using `reader`. - */ -class ContinuousCoalesceRDD( - context: SparkContext, - numPartitions: Int, - readerQueueSize: Int, - epochIntervalMs: Long, - prev: RDD[InternalRow]) - extends RDD[InternalRow](context, Nil) { - - // When we support more than 1 target partition, we'll need to figure out how to pass in the - // required partitioner. - private val outputPartitioner = new HashPartitioner(1) - - private val readerEndpointNames = (0 until numPartitions).map { i => - s"ContinuousCoalesceRDD-part$i-${UUID.randomUUID()}" - } - - override def getPartitions: Array[Partition] = { - (0 until numPartitions).map { partIndex => - ContinuousCoalesceRDDPartition( - partIndex, - readerEndpointNames(partIndex), - readerQueueSize, - prev.getNumPartitions, - epochIntervalMs) - }.toArray - } - - private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( - prev.getNumPartitions, - this.name) - - override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - val part = split.asInstanceOf[ContinuousCoalesceRDDPartition] - - if (!part.writersInitialized) { - val rpcEnv = SparkEnv.get.rpcEnv - - // trigger lazy initialization - part.endpoint - val endpointRefs = readerEndpointNames.map { endpointName => - rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) - } - - val runnables = prev.partitions.map { prevSplit => - new Runnable() { - override def run(): Unit = { - TaskContext.setTaskContext(context) - - val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( - prevSplit.index, outputPartitioner, endpointRefs.toArray) - - EpochTracker.initializeCurrentEpoch( - context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) - while (!context.isInterrupted() && !context.isCompleted()) { - writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) - // Note that current epoch is a inheritable thread local but makes another instance, - // so each writer thread can properly increment its own epoch without affecting - // the main task thread. - EpochTracker.incrementCurrentEpoch() - } - } - } - } - - context.addTaskCompletionListener[Unit] { ctx => - threadPool.shutdownNow() - } - - part.writersInitialized = true - - runnables.foreach(threadPool.execute) - } - - part.reader.read() - } - - override def clearDependencies(): Unit = { - throw new IllegalStateException("Continuous RDDs cannot be checkpointed") - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index a109c2171f3d2..d225e65aabe11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -206,9 +206,6 @@ class ContinuousExecution( currentEpochCoordinatorId = epochCoordinatorId sparkSessionForQuery.sparkContext.setLocalProperty( ContinuousExecution.EPOCH_COORDINATOR_ID_KEY, epochCoordinatorId) - sparkSessionForQuery.sparkContext.setLocalProperty( - ContinuousExecution.EPOCH_INTERVAL_KEY, - trigger.asInstanceOf[ContinuousTrigger].intervalMs.toString) // Use the parent Spark session for the endpoint since it's where this query ID is registered. val epochEndpoint = EpochCoordinatorRef.create( @@ -436,5 +433,4 @@ class ContinuousExecution( object ContinuousExecution { val START_EPOCH_KEY = "__continuous_start_epoch" val EPOCH_COORDINATOR_ID_KEY = "__epoch_coordinator_id" - val EPOCH_INTERVAL_KEY = "__continuous_epoch_interval" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala deleted file mode 100644 index 9b13f6398d837..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.continuous.shuffle - -import java.util.UUID - -import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.rpc.RpcAddress -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.NextIterator - -case class ContinuousShuffleReadPartition( - index: Int, - endpointName: String, - queueSize: Int, - numShuffleWriters: Int, - epochIntervalMs: Long) - extends Partition { - // Initialized only on the executor, and only once even as we call compute() multiple times. - lazy val (reader: ContinuousShuffleReader, endpoint) = { - val env = SparkEnv.get.rpcEnv - val receiver = new RPCContinuousShuffleReader( - queueSize, numShuffleWriters, epochIntervalMs, env) - val endpoint = env.setupEndpoint(endpointName, receiver) - - TaskContext.get().addTaskCompletionListener[Unit] { ctx => - env.stop(endpoint) - } - (receiver, endpoint) - } -} - -/** - * RDD at the map side of each continuous processing shuffle task. Upstream tasks send their - * shuffle output to the wrapped receivers in partitions of this RDD; each of the RDD's tasks - * poll from their receiver until an epoch marker is sent. - * - * @param sc the RDD context - * @param numPartitions the number of read partitions for this RDD - * @param queueSize the size of the row buffers to use - * @param numShuffleWriters the number of continuous shuffle writers feeding into this RDD - * @param epochIntervalMs the checkpoint interval of the streaming query - */ -class ContinuousShuffleReadRDD( - sc: SparkContext, - numPartitions: Int, - queueSize: Int = 1024, - numShuffleWriters: Int = 1, - epochIntervalMs: Long = 1000, - val endpointNames: Seq[String] = Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}")) - extends RDD[UnsafeRow](sc, Nil) { - - override protected def getPartitions: Array[Partition] = { - (0 until numPartitions).map { partIndex => - ContinuousShuffleReadPartition( - partIndex, endpointNames(partIndex), queueSize, numShuffleWriters, epochIntervalMs) - }.toArray - } - - override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { - split.asInstanceOf[ContinuousShuffleReadPartition].reader.read() - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReader.scala deleted file mode 100644 index 42631c90ebc55..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReader.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.continuous.shuffle - -import org.apache.spark.sql.catalyst.expressions.UnsafeRow - -/** - * Trait for reading from a continuous processing shuffle. - */ -trait ContinuousShuffleReader { - /** - * Returns an iterator over the incoming rows in an epoch. Implementations should block waiting - * for new rows to arrive, and end the iterator once they've received epoch markers from all - * shuffle writers. - */ - def read(): Iterator[UnsafeRow] -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleWriter.scala deleted file mode 100644 index 47b1f78b24505..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleWriter.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.continuous.shuffle - -import org.apache.spark.sql.catalyst.expressions.UnsafeRow - -/** - * Trait for writing to a continuous processing shuffle. - */ -trait ContinuousShuffleWriter { - def write(epoch: Iterator[UnsafeRow]): Unit -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala deleted file mode 100644 index 502ae0d4822e8..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.continuous.shuffle - -import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicBoolean - -import org.apache.spark.internal.Logging -import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.util.NextIterator - -/** - * Messages for the RPCContinuousShuffleReader endpoint. Either an incoming row or an epoch marker. - * - * Each message comes tagged with writerId, identifying which writer the message is coming - * from. The receiver will only begin the next epoch once all writers have sent an epoch - * marker ending the current epoch. - */ -private[shuffle] sealed trait RPCContinuousShuffleMessage extends Serializable { - def writerId: Int -} -private[shuffle] case class ReceiverRow(writerId: Int, row: UnsafeRow) - extends RPCContinuousShuffleMessage -private[shuffle] case class ReceiverEpochMarker(writerId: Int) extends RPCContinuousShuffleMessage - -/** - * RPC endpoint for receiving rows into a continuous processing shuffle task. Continuous shuffle - * writers will send rows here, with continuous shuffle readers polling for new rows as needed. - * - * TODO: Support multiple source tasks. We need to output a single epoch marker once all - * source tasks have sent one. - */ -private[continuous] class RPCContinuousShuffleReader( - queueSize: Int, - numShuffleWriters: Int, - epochIntervalMs: Long, - override val rpcEnv: RpcEnv) - extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with Logging { - // Note that this queue will be drained from the main task thread and populated in the RPC - // response thread. - private val queues = Array.fill(numShuffleWriters) { - new ArrayBlockingQueue[RPCContinuousShuffleMessage](queueSize) - } - - // Exposed for testing to determine if the endpoint gets stopped on task end. - private[shuffle] val stopped = new AtomicBoolean(false) - - override def onStop(): Unit = { - stopped.set(true) - } - - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case r: RPCContinuousShuffleMessage => - // Note that this will block a thread the shared RPC handler pool! - // The TCP based shuffle handler (SPARK-24541) will avoid this problem. - queues(r.writerId).put(r) - context.reply(()) - } - - override def read(): Iterator[UnsafeRow] = { - new NextIterator[UnsafeRow] { - // An array of flags for whether each writer ID has gotten an epoch marker. - private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) - - private val executor = Executors.newFixedThreadPool(numShuffleWriters) - private val completion = new ExecutorCompletionService[RPCContinuousShuffleMessage](executor) - - private def completionTask(writerId: Int) = new Callable[RPCContinuousShuffleMessage] { - override def call(): RPCContinuousShuffleMessage = queues(writerId).take() - } - - // Initialize by submitting tasks to read the first row from each writer. - (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) - - /** - * In each call to getNext(), we pull the next row available in the completion queue, and then - * submit another task to read the next row from the writer which returned it. - * - * When a writer sends an epoch marker, we note that it's finished and don't submit another - * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. - */ - override def getNext(): UnsafeRow = { - var nextRow: UnsafeRow = null - while (!finished && nextRow == null) { - completion.poll(epochIntervalMs, TimeUnit.MILLISECONDS) match { - case null => - // Try again if the poll didn't wait long enough to get a real result. - // But we should be getting at least an epoch marker every checkpoint interval. - val writerIdsUncommitted = writerEpochMarkersReceived.zipWithIndex.collect { - case (flag, idx) if !flag => idx - } - logWarning( - s"Completion service failed to make progress after $epochIntervalMs ms. Waiting " + - s"for writers ${writerIdsUncommitted.mkString(",")} to send epoch markers.") - - // The completion service guarantees this future will be available immediately. - case future => future.get() match { - case ReceiverRow(writerId, r) => - // Start reading the next element in the queue we just took from. - completion.submit(completionTask(writerId)) - nextRow = r - case ReceiverEpochMarker(writerId) => - // Don't read any more from this queue. If all the writers have sent epoch markers, - // the epoch is over; otherwise we need to loop again to poll from the remaining - // writers. - writerEpochMarkersReceived(writerId) = true - if (writerEpochMarkersReceived.forall(_ == true)) { - finished = true - } - } - } - } - - nextRow - } - - override def close(): Unit = { - executor.shutdownNow() - } - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala deleted file mode 100644 index 1c6f3ddb395e6..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.continuous.shuffle - -import scala.concurrent.Future -import scala.concurrent.duration.Duration - -import org.apache.spark.Partitioner -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.util.ThreadUtils - -/** - * A [[ContinuousShuffleWriter]] sending data to [[RPCContinuousShuffleReader]] instances. - * - * @param writerId The partition ID of this writer. - * @param outputPartitioner The partitioner on the reader side of the shuffle. - * @param endpoints The [[RPCContinuousShuffleReader]] endpoints to write to. Indexed by - * partition ID within outputPartitioner. - */ -class RPCContinuousShuffleWriter( - writerId: Int, - outputPartitioner: Partitioner, - endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter { - - if (outputPartitioner.numPartitions != 1) { - throw new IllegalArgumentException("multiple readers not yet supported") - } - - if (outputPartitioner.numPartitions != endpoints.length) { - throw new IllegalArgumentException(s"partitioner size ${outputPartitioner.numPartitions} did " + - s"not match endpoint count ${endpoints.length}") - } - - def write(epoch: Iterator[UnsafeRow]): Unit = { - while (epoch.hasNext) { - val row = epoch.next() - endpoints(outputPartitioner.getPartition(row)).askSync[Unit](ReceiverRow(writerId, row)) - } - - val futures = endpoints.map(_.ask[Unit](ReceiverEpochMarker(writerId))).toSeq - implicit val ec = ThreadUtils.sameThread - ThreadUtils.awaitResult(Future.sequence(futures), Duration.Inf) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala index 0eb3dce1bbd27..90a53727aa317 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala @@ -74,20 +74,8 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( StateStoreId(checkpointLocation, operatorId, partition.index), queryRunId) - // If we're in continuous processing mode, we should get the store version for the current - // epoch rather than the one at planning time. - val isContinuous = Option(ctxt.getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING)) - .map(_.toBoolean).getOrElse(false) - val currentVersion = if (isContinuous) { - val epoch = EpochTracker.getCurrentEpoch - assert(epoch.isDefined, "Current epoch must be defined for continuous processing streams.") - epoch.get - } else { - storeVersion - } - store = StateStore.get( - storeProviderId, keySchema, valueSchema, indexOrdinal, currentVersion, + storeProviderId, keySchema, valueSchema, indexOrdinal, storeVersion, storeConf, hadoopConfBroadcast.value.value) val inputIter = dataRDD.iterator(partition, ctxt) storeUpdateFunction(store, inputIter) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleSuite.scala deleted file mode 100644 index 54ec4a8352c1b..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleSuite.scala +++ /dev/null @@ -1,423 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.continuous.shuffle - -import java.util.UUID - -import scala.language.implicitConversions - -import org.apache.spark.{HashPartitioner, TaskContext, TaskContextImpl} -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} -import org.apache.spark.sql.streaming.StreamTest -import org.apache.spark.sql.types.{DataType, IntegerType, StringType} -import org.apache.spark.unsafe.types.UTF8String - -class ContinuousShuffleSuite extends StreamTest { - // In this unit test, we emulate that we're in the task thread where - // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a task context - // thread local to be set. - var ctx: TaskContextImpl = _ - - override def beforeEach(): Unit = { - super.beforeEach() - ctx = TaskContext.empty() - TaskContext.setTaskContext(ctx) - } - - override def afterEach(): Unit = { - ctx.markTaskCompleted(None) - TaskContext.unset() - ctx = null - super.afterEach() - } - - private implicit def unsafeRow(value: Int) = { - UnsafeProjection.create(Array(IntegerType : DataType))( - new GenericInternalRow(Array(value: Any))) - } - - private def unsafeRow(value: String) = { - UnsafeProjection.create(Array(StringType : DataType))( - new GenericInternalRow(Array(UTF8String.fromString(value): Any))) - } - - private def send(endpoint: RpcEndpointRef, messages: RPCContinuousShuffleMessage*) = { - messages.foreach(endpoint.askSync[Unit](_)) - } - - private def readRDDEndpoint(rdd: ContinuousShuffleReadRDD) = { - rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint - } - - private def readEpoch(rdd: ContinuousShuffleReadRDD) = { - rdd.compute(rdd.partitions(0), ctx).toSeq.map(_.getInt(0)) - } - - test("reader - one epoch") { - val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) - val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint - send( - endpoint, - ReceiverRow(0, unsafeRow(111)), - ReceiverRow(0, unsafeRow(222)), - ReceiverRow(0, unsafeRow(333)), - ReceiverEpochMarker(0) - ) - - val iter = rdd.compute(rdd.partitions(0), ctx) - assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333)) - } - - test("reader - multiple epochs") { - val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) - val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint - send( - endpoint, - ReceiverRow(0, unsafeRow(111)), - ReceiverEpochMarker(0), - ReceiverRow(0, unsafeRow(222)), - ReceiverRow(0, unsafeRow(333)), - ReceiverEpochMarker(0) - ) - - val firstEpoch = rdd.compute(rdd.partitions(0), ctx) - assert(firstEpoch.toSeq.map(_.getInt(0)) == Seq(111)) - - val secondEpoch = rdd.compute(rdd.partitions(0), ctx) - assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333)) - } - - test("reader - empty epochs") { - val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) - val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint - - send( - endpoint, - ReceiverEpochMarker(0), - ReceiverEpochMarker(0), - ReceiverRow(0, unsafeRow(111)), - ReceiverEpochMarker(0), - ReceiverEpochMarker(0), - ReceiverEpochMarker(0) - ) - - assert(rdd.compute(rdd.partitions(0), ctx).isEmpty) - assert(rdd.compute(rdd.partitions(0), ctx).isEmpty) - - val thirdEpoch = rdd.compute(rdd.partitions(0), ctx) - assert(thirdEpoch.toSeq.map(_.getInt(0)) == Seq(111)) - - assert(rdd.compute(rdd.partitions(0), ctx).isEmpty) - assert(rdd.compute(rdd.partitions(0), ctx).isEmpty) - } - - test("reader - multiple partitions") { - val rdd = new ContinuousShuffleReadRDD( - sparkContext, - numPartitions = 5, - endpointNames = Seq.fill(5)(s"endpt-${UUID.randomUUID()}")) - // Send all data before processing to ensure there's no crossover. - for (p <- rdd.partitions) { - val part = p.asInstanceOf[ContinuousShuffleReadPartition] - // Send index for identification. - send( - part.endpoint, - ReceiverRow(0, unsafeRow(part.index)), - ReceiverEpochMarker(0) - ) - } - - for (p <- rdd.partitions) { - val part = p.asInstanceOf[ContinuousShuffleReadPartition] - val iter = rdd.compute(part, ctx) - assert(iter.next().getInt(0) == part.index) - assert(!iter.hasNext) - } - } - - test("reader - blocks waiting for new rows") { - val rdd = new ContinuousShuffleReadRDD( - sparkContext, numPartitions = 1, epochIntervalMs = Long.MaxValue) - val epoch = rdd.compute(rdd.partitions(0), ctx) - - val readRowThread = new Thread { - override def run(): Unit = { - try { - epoch.next().getInt(0) - } catch { - case _: InterruptedException => // do nothing - expected at test ending - } - } - } - - try { - readRowThread.start() - eventually(timeout(streamingTimeout)) { - assert(readRowThread.getState == Thread.State.TIMED_WAITING) - } - } finally { - readRowThread.interrupt() - readRowThread.join() - } - } - - test("reader - multiple writers") { - val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1, numShuffleWriters = 3) - val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint - send( - endpoint, - ReceiverRow(0, unsafeRow("writer0-row0")), - ReceiverRow(1, unsafeRow("writer1-row0")), - ReceiverRow(2, unsafeRow("writer2-row0")), - ReceiverEpochMarker(0), - ReceiverEpochMarker(1), - ReceiverEpochMarker(2) - ) - - val firstEpoch = rdd.compute(rdd.partitions(0), ctx) - assert(firstEpoch.toSeq.map(_.getUTF8String(0).toString).toSet == - Set("writer0-row0", "writer1-row0", "writer2-row0")) - } - - test("reader - epoch only ends when all writers send markers") { - val rdd = new ContinuousShuffleReadRDD( - sparkContext, numPartitions = 1, numShuffleWriters = 3, epochIntervalMs = Long.MaxValue) - val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint - send( - endpoint, - ReceiverRow(0, unsafeRow("writer0-row0")), - ReceiverRow(1, unsafeRow("writer1-row0")), - ReceiverRow(2, unsafeRow("writer2-row0")), - ReceiverEpochMarker(0), - ReceiverEpochMarker(2) - ) - - val epoch = rdd.compute(rdd.partitions(0), ctx) - val rows = (0 until 3).map(_ => epoch.next()).toSet - assert(rows.map(_.getUTF8String(0).toString) == - Set("writer0-row0", "writer1-row0", "writer2-row0")) - - // After checking the right rows, block until we get an epoch marker indicating there's no next. - // (Also fail the assertion if for some reason we get a row.) - - val readEpochMarkerThread = new Thread { - override def run(): Unit = { - assert(!epoch.hasNext) - } - } - - readEpochMarkerThread.start() - eventually(timeout(streamingTimeout)) { - assert(readEpochMarkerThread.getState == Thread.State.TIMED_WAITING) - } - - // Send the last epoch marker - now the epoch should finish. - send(endpoint, ReceiverEpochMarker(1)) - eventually(timeout(streamingTimeout)) { - !readEpochMarkerThread.isAlive - } - - // Join to pick up assertion failures. - readEpochMarkerThread.join(streamingTimeout.toMillis) - } - - test("reader - writer epochs non aligned") { - val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1, numShuffleWriters = 3) - val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint - // We send multiple epochs for 0, then multiple for 1, then multiple for 2. The receiver should - // collate them as though the markers were aligned in the first place. - send( - endpoint, - ReceiverRow(0, unsafeRow("writer0-row0")), - ReceiverEpochMarker(0), - ReceiverRow(0, unsafeRow("writer0-row1")), - ReceiverEpochMarker(0), - ReceiverEpochMarker(0), - - ReceiverEpochMarker(1), - ReceiverRow(1, unsafeRow("writer1-row0")), - ReceiverEpochMarker(1), - ReceiverRow(1, unsafeRow("writer1-row1")), - ReceiverEpochMarker(1), - - ReceiverEpochMarker(2), - ReceiverEpochMarker(2), - ReceiverRow(2, unsafeRow("writer2-row0")), - ReceiverEpochMarker(2) - ) - - val firstEpoch = rdd.compute(rdd.partitions(0), ctx).map(_.getUTF8String(0).toString).toSet - assert(firstEpoch == Set("writer0-row0")) - - val secondEpoch = rdd.compute(rdd.partitions(0), ctx).map(_.getUTF8String(0).toString).toSet - assert(secondEpoch == Set("writer0-row1", "writer1-row0")) - - val thirdEpoch = rdd.compute(rdd.partitions(0), ctx).map(_.getUTF8String(0).toString).toSet - assert(thirdEpoch == Set("writer1-row1", "writer2-row0")) - } - - test("one epoch") { - val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) - val writer = new RPCContinuousShuffleWriter( - 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) - - writer.write(Iterator(1, 2, 3)) - - assert(readEpoch(reader) == Seq(1, 2, 3)) - } - - test("multiple epochs") { - val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) - val writer = new RPCContinuousShuffleWriter( - 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) - - writer.write(Iterator(1, 2, 3)) - writer.write(Iterator(4, 5, 6)) - - assert(readEpoch(reader) == Seq(1, 2, 3)) - assert(readEpoch(reader) == Seq(4, 5, 6)) - } - - test("empty epochs") { - val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) - val writer = new RPCContinuousShuffleWriter( - 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) - - writer.write(Iterator()) - writer.write(Iterator(1, 2)) - writer.write(Iterator()) - writer.write(Iterator()) - writer.write(Iterator(3, 4)) - writer.write(Iterator()) - - assert(readEpoch(reader) == Seq()) - assert(readEpoch(reader) == Seq(1, 2)) - assert(readEpoch(reader) == Seq()) - assert(readEpoch(reader) == Seq()) - assert(readEpoch(reader) == Seq(3, 4)) - assert(readEpoch(reader) == Seq()) - } - - test("blocks waiting for writer") { - val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) - val writer = new RPCContinuousShuffleWriter( - 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) - - val readerEpoch = reader.compute(reader.partitions(0), ctx) - - val readRowThread = new Thread { - override def run(): Unit = { - assert(readerEpoch.toSeq.map(_.getInt(0)) == Seq(1)) - } - } - readRowThread.start() - - eventually(timeout(streamingTimeout)) { - assert(readRowThread.getState == Thread.State.TIMED_WAITING) - } - - // Once we write the epoch the thread should stop waiting and succeed. - writer.write(Iterator(1)) - readRowThread.join(streamingTimeout.toMillis) - } - - test("multiple writer partitions") { - val numWriterPartitions = 3 - - val reader = new ContinuousShuffleReadRDD( - sparkContext, numPartitions = 1, numShuffleWriters = numWriterPartitions) - val writers = (0 until 3).map { idx => - new RPCContinuousShuffleWriter(idx, new HashPartitioner(1), Array(readRDDEndpoint(reader))) - } - - writers(0).write(Iterator(1, 4, 7)) - writers(1).write(Iterator(2, 5)) - writers(2).write(Iterator(3, 6)) - - writers(0).write(Iterator(4, 7, 10)) - writers(1).write(Iterator(5, 8)) - writers(2).write(Iterator(6, 9)) - - // Since there are multiple asynchronous writers, the original row sequencing is not guaranteed. - // The epochs should be deterministically preserved, however. - assert(readEpoch(reader).toSet == Seq(1, 2, 3, 4, 5, 6, 7).toSet) - assert(readEpoch(reader).toSet == Seq(4, 5, 6, 7, 8, 9, 10).toSet) - } - - test("reader epoch only ends when all writer partitions write it") { - val numWriterPartitions = 3 - - val reader = new ContinuousShuffleReadRDD( - sparkContext, numPartitions = 1, numShuffleWriters = numWriterPartitions) - val writers = (0 until 3).map { idx => - new RPCContinuousShuffleWriter(idx, new HashPartitioner(1), Array(readRDDEndpoint(reader))) - } - - writers(1).write(Iterator()) - writers(2).write(Iterator()) - - val readerEpoch = reader.compute(reader.partitions(0), ctx) - - val readEpochMarkerThread = new Thread { - override def run(): Unit = { - assert(!readerEpoch.hasNext) - } - } - - readEpochMarkerThread.start() - eventually(timeout(streamingTimeout)) { - assert(readEpochMarkerThread.getState == Thread.State.TIMED_WAITING) - } - - writers(0).write(Iterator()) - readEpochMarkerThread.join(streamingTimeout.toMillis) - } - - test("receiver stopped with row last") { - val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) - val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint - send( - endpoint, - ReceiverEpochMarker(0), - ReceiverRow(0, unsafeRow(111)) - ) - - ctx.markTaskCompleted(None) - val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader - eventually(timeout(streamingTimeout)) { - assert(receiver.asInstanceOf[RPCContinuousShuffleReader].stopped.get()) - } - } - - test("receiver stopped with marker last") { - val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) - val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint - send( - endpoint, - ReceiverRow(0, unsafeRow(111)), - ReceiverEpochMarker(0) - ) - - ctx.markTaskCompleted(None) - val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader - eventually(timeout(streamingTimeout)) { - assert(receiver.asInstanceOf[RPCContinuousShuffleReader].stopped.get()) - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala deleted file mode 100644 index 3ec4750c59fc5..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming.continuous - -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.internal.SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED -import org.apache.spark.sql.streaming.OutputMode - -class ContinuousAggregationSuite extends ContinuousSuiteBase { - import testImplicits._ - - test("not enabled") { - val ex = intercept[AnalysisException] { - val input = ContinuousMemoryStream.singlePartition[Int] - testStream(input.toDF().agg(max('value)), OutputMode.Complete)() - } - - assert(ex.getMessage.contains( - "In continuous processing mode, coalesce(1) must be called before aggregate operation")) - } - - test("basic") { - withSQLConf((UNSUPPORTED_OPERATION_CHECK_ENABLED.key, "false")) { - val input = ContinuousMemoryStream.singlePartition[Int] - - testStream(input.toDF().agg(max('value)), OutputMode.Complete)( - AddData(input, 0, 1, 2), - CheckAnswer(2), - StopStream, - AddData(input, 3, 4, 5), - StartStream(), - CheckAnswer(5), - AddData(input, -1, -2, -3), - CheckAnswer(5)) - } - } - - test("multiple partitions with coalesce") { - val input = ContinuousMemoryStream[Int] - - val df = input.toDF().coalesce(1).agg(max('value)) - - testStream(df, OutputMode.Complete)( - AddData(input, 0, 1, 2), - CheckAnswer(2), - StopStream, - AddData(input, 3, 4, 5), - StartStream(), - CheckAnswer(5), - AddData(input, -1, -2, -3), - CheckAnswer(5)) - } - - test("multiple partitions with coalesce - multiple transformations") { - val input = ContinuousMemoryStream[Int] - - // We use a barrier to make sure predicates both before and after coalesce work - val df = input.toDF() - .select('value as 'copy, 'value) - .where('copy =!= 1) - .logicalPlan - .coalesce(1) - .where('copy =!= 2) - .agg(max('value)) - - testStream(df, OutputMode.Complete)( - AddData(input, 0, 1, 2), - CheckAnswer(0), - StopStream, - AddData(input, 3, 4, 5), - StartStream(), - CheckAnswer(5), - AddData(input, -1, -2, -3), - CheckAnswer(5)) - } - - test("multiple partitions with multiple coalesce") { - val input = ContinuousMemoryStream[Int] - - val df = input.toDF() - .coalesce(1) - .logicalPlan - .coalesce(1) - .select('value as 'copy, 'value) - .agg(max('value)) - - testStream(df, OutputMode.Complete)( - AddData(input, 0, 1, 2), - CheckAnswer(2), - StopStream, - AddData(input, 3, 4, 5), - StartStream(), - CheckAnswer(5), - AddData(input, -1, -2, -3), - CheckAnswer(5)) - } - - test("repeated restart") { - withSQLConf((UNSUPPORTED_OPERATION_CHECK_ENABLED.key, "false")) { - val input = ContinuousMemoryStream.singlePartition[Int] - - testStream(input.toDF().agg(max('value)), OutputMode.Complete)( - AddData(input, 0, 1, 2), - CheckAnswer(2), - StopStream, - StartStream(), - StopStream, - StartStream(), - StopStream, - StartStream(), - AddData(input, 0), - CheckAnswer(2), - AddData(input, 5), - CheckAnswer(5)) - } - } -}