diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 01f6ba4445162..48dc0f27bfa41 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -27,8 +27,7 @@ import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} -import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset} -import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream +import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.UninterruptibleThread @@ -55,7 +54,7 @@ private[kafka010] class KafkaMicroBatchStream( options: CaseInsensitiveStringMap, metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, - failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging { + failOnDataLoss: Boolean) extends SupportsAdmissionControl with MicroBatchStream with Logging { private[kafka010] val pollTimeoutMs = options.getLong( KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, @@ -79,13 +78,23 @@ private[kafka010] class KafkaMicroBatchStream( KafkaSourceOffset(getOrCreateInitialPartitionOffsets()) } - override def latestOffset(start: Offset): Offset = { + override def getDefaultReadLimit: ReadLimit = { + maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit) + } + + override def latestOffset(): Offset = { + throw new UnsupportedOperationException( + "latestOffset(Offset, ReadLimit) should be called instead of this method") + } + + override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = { val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) - endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets => - rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets) - }.getOrElse { - latestPartitionOffsets + endPartitionOffsets = KafkaSourceOffset(readLimit match { + case rows: ReadMaxRows => + rateLimit(rows.maxRows(), startPartitionOffsets, latestPartitionOffsets) + case _: ReadAllAvailable => + latestPartitionOffsets }) endPartitionOffsets } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index e1392b6215d3a..51420a96f5e11 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -32,6 +32,8 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.kafka010.KafkaSource._ import org.apache.spark.sql.kafka010.KafkaSourceProvider._ @@ -79,7 +81,7 @@ private[kafka010] class KafkaSource( metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) - extends Source with Logging { + extends SupportsAdmissionControl with Source with Logging { private val sc = sqlContext.sparkContext @@ -114,6 +116,10 @@ private[kafka010] class KafkaSource( }.partitionToOffsets } + override def getDefaultReadLimit: ReadLimit = { + maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit) + } + private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None private val converter = new KafkaRecordToRowConverter() @@ -122,23 +128,30 @@ private[kafka010] class KafkaSource( /** Returns the maximum available offset for this source. */ override def getOffset: Option[Offset] = { + throw new UnsupportedOperationException( + "latestOffset(Offset, ReadLimit) should be called instead of this method") + } + + override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = { // Make sure initialPartitionOffsets is initialized initialPartitionOffsets val latest = kafkaReader.fetchLatestOffsets( currentPartitionOffsets.orElse(Some(initialPartitionOffsets))) - val offsets = maxOffsetsPerTrigger match { - case None => + val offsets = limit match { + case rows: ReadMaxRows => + if (currentPartitionOffsets.isEmpty) { + rateLimit(rows.maxRows(), initialPartitionOffsets, latest) + } else { + rateLimit(rows.maxRows(), currentPartitionOffsets.get, latest) + } + case _: ReadAllAvailable => latest - case Some(limit) if currentPartitionOffsets.isEmpty => - rateLimit(limit, initialPartitionOffsets, latest) - case Some(limit) => - rateLimit(limit, currentPartitionOffsets.get, latest) } currentPartitionOffsets = Some(offsets) logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}") - Some(KafkaSourceOffset(offsets)) + KafkaSourceOffset(offsets) } /** Proportionally distribute limit number of offsets among topicpartitions */ diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 3ee59e57a6edf..b38d82f2b8135 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -297,6 +297,28 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { 13, 126, 127, 128, 129, 130, 131, 132, 133, 134 ) ) + + // When Trigger.Once() is used, the read limit should be ignored + val allData = Seq(1) ++ (10 to 20) ++ (100 to 200) + withTempDir { dir => + testStream(mapped)( + StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath), + AssertOnQuery { q => + q.processAllAvailable() + true + }, + CheckAnswer(allData: _*), + StopStream, + + AddKafkaData(Set(topic), 1000 to 1010: _*), + StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath), + AssertOnQuery { q => + q.processAllAvailable() + true + }, + CheckAnswer((allData ++ 1000.to(1010)): _*) + ) + } } test("input row metrics") { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java new file mode 100644 index 0000000000000..5a946ad14b3a2 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.streaming; + +import org.apache.spark.annotation.Evolving; + +/** + * Represents a {@link ReadLimit} where the {@link MicroBatchStream} must scan all the data + * available at the streaming source. This is meant to be a hard specification as being able + * to return all available data is necessary for Trigger.Once() to work correctly. + * If a source is unable to scan all available data, then it must throw an error. + * + * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit) + * @since 3.0.0 + */ +@Evolving +public final class ReadAllAvailable implements ReadLimit { + static final ReadAllAvailable INSTANCE = new ReadAllAvailable(); + + private ReadAllAvailable() {} + + @Override + public String toString() { + return "All Available"; + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java similarity index 53% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java index fb46f76682688..121ed1ad116f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java @@ -15,17 +15,24 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.streaming.sources +package org.apache.spark.sql.connector.read.streaming; -import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset} +import org.apache.spark.annotation.Evolving; -// A special `MicroBatchStream` that can get latestOffset with a start offset. -trait RateControlMicroBatchStream extends MicroBatchStream { +/** + * Interface representing limits on how much to read from a {@link MicroBatchStream} when it + * implements {@link SupportsAdmissionControl}. There are several child interfaces representing + * various kinds of limits. + * + * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit) + * @see ReadAllAvailable + * @see ReadMaxRows + */ +@Evolving +public interface ReadLimit { + static ReadLimit maxRows(long rows) { return new ReadMaxRows(rows); } - override def latestOffset(): Offset = { - throw new IllegalAccessException( - "latestOffset should not be called for RateControlMicroBatchReadSupport") - } + static ReadLimit maxFiles(int files) { return new ReadMaxFiles(files); } - def latestOffset(start: Offset): Offset + static ReadLimit allAvailable() { return ReadAllAvailable.INSTANCE; } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxFiles.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxFiles.java new file mode 100644 index 0000000000000..441a6c8e77a6f --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxFiles.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.streaming; + +import org.apache.spark.annotation.Evolving; + +/** + * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan approximately the + * given maximum number of files. + * + * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit) + * @since 3.0.0 + */ +@Evolving +public class ReadMaxFiles implements ReadLimit { + private int files; + + ReadMaxFiles(int maxFiles) { + this.files = maxFiles; + } + + /** Approximate maximum rows to scan. */ + public int maxFiles() { return this.files; } + + @Override + public String toString() { + return "MaxFiles: " + maxFiles(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReadMaxFiles other = (ReadMaxFiles) o; + return other.maxFiles() == maxFiles(); + } + + @Override + public int hashCode() { return files; } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxRows.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxRows.java new file mode 100644 index 0000000000000..65a68c543ff71 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxRows.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.streaming; + +import org.apache.spark.annotation.Evolving; + +/** + * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan approximately the + * given maximum number of rows. + * + * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit) + * @since 3.0.0 + */ +@Evolving +public final class ReadMaxRows implements ReadLimit { + private long rows; + + ReadMaxRows(long rows) { + this.rows = rows; + } + + /** Approximate maximum rows to scan. */ + public long maxRows() { return this.rows; } + + @Override + public String toString() { + return "MaxRows: " + maxRows(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReadMaxRows other = (ReadMaxRows) o; + return other.maxRows() == maxRows(); + } + + @Override + public int hashCode() { return Long.hashCode(this.rows); } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java new file mode 100644 index 0000000000000..027763ce6fcdf --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.streaming; + +import org.apache.spark.annotation.Evolving; + +/** + * A mix-in interface for {@link SparkDataStream} streaming sources to signal that they can control + * the rate of data ingested into the system. These rate limits can come implicitly from the + * contract of triggers, e.g. Trigger.Once() requires that a micro-batch process all data + * available to the system at the start of the micro-batch. Alternatively, sources can decide to + * limit ingest through data source options. + * + * Through this interface, a MicroBatchStream should be able to return the next offset that it will + * process until given a {@link ReadLimit}. + * + * @since 3.0.0 + */ +@Evolving +public interface SupportsAdmissionControl extends SparkDataStream { + + /** + * Returns the read limits potentially passed to the data source through options when creating + * the data source. + */ + default ReadLimit getDefaultReadLimit() { return ReadLimit.allAvailable(); } + + /** + * Returns the most recent offset available given a read limit. The start offset can be used + * to figure out how much new data should be read given the limit. Users should implement this + * method instead of latestOffset for a MicroBatchStream or getOffset for Source. + * + * When this method is called on a `Source`, the source can return `null` if there is no + * data to process. In addition, for the very first micro-batch, the `startOffset` will be + * null as well. + * + * When this method is called on a MicroBatchStream, the `startOffset` will be `initialOffset` + * for the very first micro-batch. The source can return `null` if there is no data to process. + */ + Offset latestOffset(Offset startOffset, ReadLimit limit); +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 36f70024c57f8..e8ce8e1487093 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -30,6 +30,8 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl} import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -45,7 +47,7 @@ class FileStreamSource( override val schema: StructType, partitionColumns: Seq[String], metadataPath: String, - options: Map[String, String]) extends Source with Logging { + options: Map[String, String]) extends SupportsAdmissionControl with Source with Logging { import FileStreamSource._ @@ -115,15 +117,17 @@ class FileStreamSource( * `synchronized` on this method is for solving race conditions in tests. In the normal usage, * there is no race here, so the cost of `synchronized` should be rare. */ - private def fetchMaxOffset(): FileStreamSourceOffset = synchronized { + private def fetchMaxOffset(limit: ReadLimit): FileStreamSourceOffset = synchronized { // All the new files found - ignore aged files and files that we have seen. val newFiles = fetchAllFiles().filter { case (path, timestamp) => seenFiles.isNewFile(path, timestamp) } // Obey user's setting to limit the number of files in this batch trigger. - val batchFiles = - if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles + val batchFiles = limit match { + case files: ReadMaxFiles => newFiles.take(files.maxFiles()) + case _: ReadAllAvailable => newFiles + } batchFiles.foreach { file => seenFiles.add(file._1, file._2) @@ -150,6 +154,10 @@ class FileStreamSource( FileStreamSourceOffset(metadataLogCurrentOffset) } + override def getDefaultReadLimit: ReadLimit = { + maxFilesPerBatch.map(ReadLimit.maxFiles).getOrElse(super.getDefaultReadLimit) + } + /** * For test only. Run `func` with the internal lock to make sure when `func` is running, * the current offset won't be changed and no new batch will be emitted. @@ -269,7 +277,14 @@ class FileStreamSource( files } - override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.logOffset == -1) + override def getOffset: Option[Offset] = { + throw new UnsupportedOperationException( + "latestOffset(Offset, ReadLimit) should be called instead of this method") + } + + override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = { + Some(fetchMaxOffset(limit)).filterNot(_.logOffset == -1).orNull + } override def toString: String = s"FileStreamSource[$qualifiedBasePath]" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 872c36764104f..83bc347e23ed4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -25,10 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatch import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} -import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream} +import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} -import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource} +import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{OutputMode, Trigger} import org.apache.spark.util.Clock @@ -79,7 +79,7 @@ class MicroBatchExecution( import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ val _logicalPlan = analyzedPlan.transform { - case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) => + case streamingRelation @ StreamingRelation(dataSourceV1, sourceName, output) => toExecutionRelationMap.getOrElseUpdate(streamingRelation, { // Materialize source to avoid creating it in every batch val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" @@ -122,7 +122,18 @@ class MicroBatchExecution( // v2 source case r: StreamingDataSourceV2Relation => r.stream } - uniqueSources = sources.distinct + uniqueSources = sources.distinct.map { + case source: SupportsAdmissionControl => + val limit = source.getDefaultReadLimit + if (trigger == OneTimeTrigger && limit != ReadLimit.allAvailable()) { + logWarning(s"The read limit $limit for $source is ignored when Trigger.Once() is used.") + source -> ReadLimit.allAvailable() + } else { + source -> limit + } + case other => + other -> ReadLimit.allAvailable() + }.toMap // TODO (SPARK-27484): we should add the writing node before the plan is analyzed. sink match { @@ -354,25 +365,33 @@ class MicroBatchExecution( // Generate a map from each unique source to the next available offset. val latestOffsets: Map[SparkDataStream, Option[OffsetV2]] = uniqueSources.map { - case s: Source => + case (s: SupportsAdmissionControl, limit) => updateStatusMessage(s"Getting offsets from $s") - reportTimeTaken("getOffset") { - (s, s.getOffset) + reportTimeTaken("latestOffset") { + val startOffsetOpt = availableOffsets.get(s) + val startOffset = s match { + case _: Source => + startOffsetOpt.orNull + case v2: MicroBatchStream => + startOffsetOpt.map(offset => v2.deserializeOffset(offset.json)) + .getOrElse(v2.initialOffset()) + } + (s, Option(s.latestOffset(startOffset, limit))) } - case s: RateControlMicroBatchStream => + case (s: Source, _) => updateStatusMessage(s"Getting offsets from $s") - reportTimeTaken("latestOffset") { - val startOffset = availableOffsets - .get(s).map(off => s.deserializeOffset(off.json)) - .getOrElse(s.initialOffset()) - (s, Option(s.latestOffset(startOffset))) + reportTimeTaken("getOffset") { + (s, s.getOffset) } - case s: MicroBatchStream => + case (s: MicroBatchStream, _) => updateStatusMessage(s"Getting offsets from $s") reportTimeTaken("latestOffset") { (s, Option(s.latestOffset())) } - }.toMap + case (s, _) => + // for some reason, the compiler is unhappy and thinks the match is not exhaustive + throw new IllegalStateException(s"Unexpected source: $s") + } availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get) // Update the query metadata diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6dff5c6f26ee7..4b7f5aadd71eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table} -import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, SparkDataStream} +import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit, SparkDataStream} import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate} import org.apache.spark.sql.connector.write.streaming.StreamingWrite import org.apache.spark.sql.execution.QueryExecution @@ -206,7 +206,7 @@ abstract class StreamExecution( /** * A list of unique sources in the query plan. This will be set when generating logical plan. */ - @volatile protected var uniqueSources: Seq[SparkDataStream] = Seq.empty + @volatile protected var uniqueSources: Map[SparkDataStream, ReadLimit] = Map.empty /** Defines the internal state of execution */ protected val state = new AtomicReference[State](INITIALIZING) @@ -424,7 +424,7 @@ abstract class StreamExecution( /** Stops all streaming sources safely. */ protected def stopSources(): Unit = { - uniqueSources.foreach { source => + uniqueSources.foreach { case (source, _) => try { source.stop() } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 481552a2e4a0e..a9b724a73a18e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability} -import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Offset => OffsetV2, PartitionOffset} +import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Offset => OffsetV2, PartitionOffset, ReadLimit} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _} @@ -84,7 +84,7 @@ class ContinuousExecution( sources = _logicalPlan.collect { case r: StreamingDataSourceV2Relation => r.stream.asInstanceOf[ContinuousStream] } - uniqueSources = sources.distinct + uniqueSources = sources.distinct.map(s => s -> ReadLimit.allAvailable()).toMap // TODO (SPARK-27484): we should add the writing node before the plan is analyzed. WriteToContinuousDataSource( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 632e007fc9444..fa320333143ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1174,6 +1174,62 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("SPARK-30669: maxFilesPerTrigger - ignored when using Trigger.Once") { + withTempDirs { (src, target) => + val checkpoint = new File(target, "chk").getCanonicalPath + val targetDir = new File(target, "data").getCanonicalPath + var lastFileModTime: Option[Long] = None + + /** Create a text file with a single data item */ + def createFile(data: Int): File = { + val file = stringToFile(new File(src, s"$data.txt"), data.toString) + if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000) + lastFileModTime = Some(file.lastModified) + file + } + + createFile(1) + createFile(2) + createFile(3) + + // Set up a query to read text files one at a time + val df = spark + .readStream + .option("maxFilesPerTrigger", 1) + .text(src.getCanonicalPath) + + def startQuery(): StreamingQuery = { + df.writeStream + .format("parquet") + .trigger(Trigger.Once) + .option("checkpointLocation", checkpoint) + .start(targetDir) + } + val q = startQuery() + + try { + assert(q.awaitTermination(streamingTimeout.toMillis)) + assert(q.recentProgress.count(_.numInputRows != 0) == 1) // only one trigger was run + checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"), (1 to 3).map(_.toString).toDF) + } finally { + q.stop() + } + + createFile(4) + createFile(5) + + // run a second batch + val q2 = startQuery() + try { + assert(q2.awaitTermination(streamingTimeout.toMillis)) + assert(q2.recentProgress.count(_.numInputRows != 0) == 1) // only one trigger was run + checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"), (1 to 5).map(_.toString).toDF) + } finally { + q2.stop() + } + } + } + test("explain") { withTempDirs { case (src, tmp) => src.mkdirs()