Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.UninterruptibleThread
Expand All @@ -55,7 +54,7 @@ private[kafka010] class KafkaMicroBatchStream(
options: CaseInsensitiveStringMap,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
failOnDataLoss: Boolean) extends SupportsAdmissionControl with MicroBatchStream with Logging {

private[kafka010] val pollTimeoutMs = options.getLong(
KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
Expand All @@ -79,13 +78,22 @@ private[kafka010] class KafkaMicroBatchStream(
KafkaSourceOffset(getOrCreateInitialPartitionOffsets())
}

override def latestOffset(start: Offset): Offset = {
override def getDefaultReadLimit: ReadLimit = {
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
}

override def latestOffset(): Offset = {
throw new IllegalStateException("This method should not be called")
}

override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
}.getOrElse {
latestPartitionOffsets
endPartitionOffsets = KafkaSourceOffset(readLimit match {
case rows: ReadMaxRows =>
rateLimit(rows.maxRows(), startPartitionOffsets, latestPartitionOffsets)
case _: ReadAllAvailable =>
latestPartitionOffsets
})
endPartitionOffsets
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSource._
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
Expand Down Expand Up @@ -79,7 +81,7 @@ private[kafka010] class KafkaSource(
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends Source with Logging {
extends SupportsAdmissionControl with Source with Logging {

private val sc = sqlContext.sparkContext

Expand Down Expand Up @@ -114,6 +116,10 @@ private[kafka010] class KafkaSource(
}.partitionToOffsets
}

override def getDefaultReadLimit: ReadLimit = {
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
}

private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None

private val converter = new KafkaRecordToRowConverter()
Expand All @@ -122,23 +128,29 @@ private[kafka010] class KafkaSource(

/** Returns the maximum available offset for this source. */
override def getOffset: Option[Offset] = {
throw new IllegalStateException("latestOffset should be called instead of this method")
}

override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = {
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets

val latest = kafkaReader.fetchLatestOffsets(
currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
val offsets = maxOffsetsPerTrigger match {
case None =>
val offsets = limit match {
case rows: ReadMaxRows =>
if (currentPartitionOffsets.isEmpty) {
rateLimit(rows.maxRows(), initialPartitionOffsets, latest)
} else {
rateLimit(rows.maxRows(), currentPartitionOffsets.get, latest)
}
case _: ReadAllAvailable =>
latest
case Some(limit) if currentPartitionOffsets.isEmpty =>
rateLimit(limit, initialPartitionOffsets, latest)
case Some(limit) =>
rateLimit(limit, currentPartitionOffsets.get, latest)
}

currentPartitionOffsets = Some(offsets)
logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
Some(KafkaSourceOffset(offsets))
KafkaSourceOffset(offsets)
}

/** Proportionally distribute limit number of offsets among topicpartitions */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,16 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
13, 126, 127, 128, 129, 130, 131, 132, 133, 134
)
)

// When Trigger.Once() is used, the read limit should be ignored
testStream(mapped)(
StartStream(Trigger.Once(), clock),
waitUntilBatchProcessed,
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
13, 126, 127, 128, 129, 130, 131, 132, 133, 134)
)
}

test("input row metrics") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.annotation.Evolving;

/**
* Represents a {@link ReadLimit} where the {@link MicroBatchStream} must scan all the data
* available at the streaming source. This is meant to be a hard specification as being able
* to return all available data is necessary for Trigger.Once() to work correctly.
* If a source is unable to scan all available data, then it must throw an error.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
* @since 3.0.0
*/
@Evolving
public final class ReadAllAvailable implements ReadLimit {
static final ReadAllAvailable SINGLETON = new ReadAllAvailable();

private ReadAllAvailable() {}

@Override
public String toString() {
return "All Available";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.sources
package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
import org.apache.spark.annotation.Evolving;

// A special `MicroBatchStream` that can get latestOffset with a start offset.
trait RateControlMicroBatchStream extends MicroBatchStream {

override def latestOffset(): Offset = {
throw new IllegalAccessException(
"latestOffset should not be called for RateControlMicroBatchReadSupport")
}
/**
* Interface representing limits on how much to read from a {@link MicroBatchStream} when it
* implements {@link SupportsAdmissionControl}. There are several child interfaces representing
* various kinds of limits.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
* @see ReadAllAvailable
* @see ReadMaxRows
*/
@Evolving
public interface ReadLimit {
static ReadLimit maxRows(long rows) {
return new ReadMaxRows(rows);
}

def latestOffset(start: Offset): Offset
static ReadLimit allAvailable() {
return ReadAllAvailable.SINGLETON;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.annotation.Evolving;

/**
* Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan approximately the
* given maximum number of rows.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
* @since 3.0.0
*/
@Evolving
public final class ReadMaxRows implements ReadLimit {
private long rows;

ReadMaxRows(long rows) {
this.rows = rows;
}

/** Approximate maximum rows to scan. */
public long maxRows() { return this.rows; }

@Override
public String toString() {
return "MaxRows: " + maxRows();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReadMaxRows other = (ReadMaxRows) o;
return other.maxRows() == maxRows();
}

@Override
public int hashCode() { return Long.hashCode(this.rows); }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.annotation.Evolving;

/**
* A mix-in interface for {@link SparkDataStream} streaming sources to signal that they can control
* the rate of data ingested into the system. These rate limits can come implicitly from the
* contract of triggers, e.g. Trigger.Once() requires that a micro-batch process all data
* available to the system at the start of the micro-batch. Alternatively, sources can decide to
* limit ingest through data source options.
*
* Through this interface, a MicroBatchStream should be able to return the next offset that it will
* process until given a {@link ReadLimit}.
*
* @since 3.0.0
*/
@Evolving
public interface SupportsAdmissionControl extends SparkDataStream {

/**
* Returns the read limits potentially passed to the data source through options when creating
* the data source.
*/
default ReadLimit getDefaultReadLimit() {
return ReadLimit.allAvailable();
}

/**
* Returns the most recent offset available given a read limit. The start offset can be used
* to figure out
*/
Offset latestOffset(Offset startOffset, ReadLimit limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatch
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource}
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.util.Clock
Expand Down Expand Up @@ -79,7 +79,7 @@ class MicroBatchExecution(

import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) =>
case streamingRelation @ StreamingRelation(dataSourceV1, sourceName, output) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
Expand Down Expand Up @@ -122,7 +122,18 @@ class MicroBatchExecution(
// v2 source
case r: StreamingDataSourceV2Relation => r.stream
}
uniqueSources = sources.distinct
uniqueSources = sources.distinct.map {
case source: SupportsAdmissionControl =>
val limit = source.getDefaultReadLimit
if (trigger == OneTimeTrigger && limit != ReadLimit.allAvailable()) {
logWarning(s"The read limit $limit for $source is ignored when Trigger.Once() is used.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can do this at Analyzer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Triggers are a property of the system, not the query, so I don't think it fits into analysis

source -> ReadLimit.allAvailable()
} else {
source -> limit
}
case other =>
other -> ReadLimit.allAvailable()
}.toMap

// TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
sink match {
Expand Down Expand Up @@ -354,25 +365,28 @@ class MicroBatchExecution(

// Generate a map from each unique source to the next available offset.
val latestOffsets: Map[SparkDataStream, Option[OffsetV2]] = uniqueSources.map {
case s: Source =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
(s, s.getOffset)
}
case s: RateControlMicroBatchStream =>
case (s: SupportsAdmissionControl, limit) =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("latestOffset") {
val startOffset = availableOffsets
.get(s).map(off => s.deserializeOffset(off.json))
.getOrElse(s.initialOffset())
(s, Option(s.latestOffset(startOffset)))
(s, Option(s.latestOffset(startOffset, limit)))
}
case s: MicroBatchStream =>
case (s: Source, _) =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
(s, s.getOffset)
}
case (s: MicroBatchStream, _) =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("latestOffset") {
(s, Option(s.latestOffset()))
}
}.toMap
case (s, _) =>
// for some reason, the compiler is unhappy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean Match is not exhaustive?

throw new IllegalStateException(s"Unexpected source: $s")
}
availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)

// Update the query metadata
Expand Down
Loading