Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.UninterruptibleThread
Expand All @@ -55,7 +54,7 @@ private[kafka010] class KafkaMicroBatchStream(
options: CaseInsensitiveStringMap,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
failOnDataLoss: Boolean) extends SupportsAdmissionControl with MicroBatchStream with Logging {

private[kafka010] val pollTimeoutMs = options.getLong(
KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
Expand All @@ -79,13 +78,23 @@ private[kafka010] class KafkaMicroBatchStream(
KafkaSourceOffset(getOrCreateInitialPartitionOffsets())
}

override def latestOffset(start: Offset): Offset = {
override def getDefaultReadLimit: ReadLimit = {
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
}

override def latestOffset(): Offset = {
throw new IllegalStateException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is UnsupportedOperationException better in this context?

"latestOffset(Offset, ReadLimit) should be called instead of this method")
}

override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
}.getOrElse {
latestPartitionOffsets
endPartitionOffsets = KafkaSourceOffset(readLimit match {
case rows: ReadMaxRows =>
rateLimit(rows.maxRows(), startPartitionOffsets, latestPartitionOffsets)
case _: ReadAllAvailable =>
latestPartitionOffsets
})
endPartitionOffsets
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSource._
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
Expand Down Expand Up @@ -79,7 +81,7 @@ private[kafka010] class KafkaSource(
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends Source with Logging {
extends SupportsAdmissionControl with Source with Logging {

private val sc = sqlContext.sparkContext

Expand Down Expand Up @@ -114,6 +116,10 @@ private[kafka010] class KafkaSource(
}.partitionToOffsets
}

override def getDefaultReadLimit: ReadLimit = {
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
}

private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None

private val converter = new KafkaRecordToRowConverter()
Expand All @@ -122,23 +128,30 @@ private[kafka010] class KafkaSource(

/** Returns the maximum available offset for this source. */
override def getOffset: Option[Offset] = {
throw new IllegalStateException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnsupportedOperationException?

"latestOffset(Offset, ReadLimit) should be called instead of this method")
}

override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = {
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets

val latest = kafkaReader.fetchLatestOffsets(
currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
val offsets = maxOffsetsPerTrigger match {
case None =>
val offsets = limit match {
case rows: ReadMaxRows =>
if (currentPartitionOffsets.isEmpty) {
rateLimit(rows.maxRows(), initialPartitionOffsets, latest)
} else {
rateLimit(rows.maxRows(), currentPartitionOffsets.get, latest)
}
case _: ReadAllAvailable =>
latest
case Some(limit) if currentPartitionOffsets.isEmpty =>
rateLimit(limit, initialPartitionOffsets, latest)
case Some(limit) =>
rateLimit(limit, currentPartitionOffsets.get, latest)
}

currentPartitionOffsets = Some(offsets)
logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
Some(KafkaSourceOffset(offsets))
KafkaSourceOffset(offsets)
}

/** Proportionally distribute limit number of offsets among topicpartitions */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,28 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
13, 126, 127, 128, 129, 130, 131, 132, 133, 134
)
)

// When Trigger.Once() is used, the read limit should be ignored
val allData = Seq(1) ++ (10 to 20) ++ (100 to 200)
withTempDir { dir =>
testStream(mapped)(
StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath),
AssertOnQuery { q =>
q.processAllAvailable()
true
},
CheckAnswer(allData: _*),
StopStream,

AddKafkaData(Set(topic), 1000 to 1010: _*),
StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath),
AssertOnQuery { q =>
q.processAllAvailable()
true
},
CheckAnswer((allData ++ 1000.to(1010)): _*)
)
}
}

test("input row metrics") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.annotation.Evolving;

/**
* Represents a {@link ReadLimit} where the {@link MicroBatchStream} must scan all the data
* available at the streaming source. This is meant to be a hard specification as being able
* to return all available data is necessary for Trigger.Once() to work correctly.
* If a source is unable to scan all available data, then it must throw an error.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
* @since 3.0.0
*/
@Evolving
public final class ReadAllAvailable implements ReadLimit {
static final ReadAllAvailable SINGLETON = new ReadAllAvailable();

private ReadAllAvailable() {}

@Override
public String toString() {
return "All Available";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,28 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.sources
package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
import org.apache.spark.annotation.Evolving;

// A special `MicroBatchStream` that can get latestOffset with a start offset.
trait RateControlMicroBatchStream extends MicroBatchStream {
/**
* Interface representing limits on how much to read from a {@link MicroBatchStream} when it
* implements {@link SupportsAdmissionControl}. There are several child interfaces representing
* various kinds of limits.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
* @see ReadAllAvailable
* @see ReadMaxRows
*/
@Evolving
public interface ReadLimit {
static ReadLimit maxRows(long rows) { return new ReadMaxRows(rows); }

override def latestOffset(): Offset = {
throw new IllegalAccessException(
"latestOffset should not be called for RateControlMicroBatchReadSupport")
}
static ReadLimit maxFiles(int files) {
return new ReadMaxFiles(files);
}

def latestOffset(start: Offset): Offset
static ReadLimit allAvailable() {
return ReadAllAvailable.SINGLETON;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.annotation.Evolving;

/**
* Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan approximately the
* given maximum number of files.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
* @since 3.0.0
*/
@Evolving
public class ReadMaxFiles implements ReadLimit {
private int files;

ReadMaxFiles(int maxFiles) {
this.files = maxFiles;
}

/** Approximate maximum rows to scan. */
public int maxFiles() { return this.files; }

@Override
public String toString() {
return "MaxFiles: " + maxFiles();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReadMaxFiles other = (ReadMaxFiles) o;
return other.maxFiles() == maxFiles();
}

@Override
public int hashCode() { return files; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.annotation.Evolving;

/**
* Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan approximately the
* given maximum number of rows.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
* @since 3.0.0
*/
@Evolving
public final class ReadMaxRows implements ReadLimit {
private long rows;

ReadMaxRows(long rows) {
this.rows = rows;
}

/** Approximate maximum rows to scan. */
public long maxRows() { return this.rows; }

@Override
public String toString() {
return "MaxRows: " + maxRows();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReadMaxRows other = (ReadMaxRows) o;
return other.maxRows() == maxRows();
}

@Override
public int hashCode() { return Long.hashCode(this.rows); }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.annotation.Evolving;

/**
* A mix-in interface for {@link SparkDataStream} streaming sources to signal that they can control
* the rate of data ingested into the system. These rate limits can come implicitly from the
* contract of triggers, e.g. Trigger.Once() requires that a micro-batch process all data
* available to the system at the start of the micro-batch. Alternatively, sources can decide to
* limit ingest through data source options.
*
* Through this interface, a MicroBatchStream should be able to return the next offset that it will
* process until given a {@link ReadLimit}.
*
* @since 3.0.0
*/
@Evolving
public interface SupportsAdmissionControl extends SparkDataStream {

/**
* Returns the read limits potentially passed to the data source through options when creating
* the data source.
*/
default ReadLimit getDefaultReadLimit() {
return ReadLimit.allAvailable();
}

/**
* Returns the most recent offset available given a read limit. The start offset can be used
* to figure out how much new data should be read given the limit. Users should implement this
* method instead of latestOffset for a MicroBatchStream or getOffset for Source.
*
* When this method is called on a `Source`, the source can return `null` if there is no
* data to process. In addition, for the very first micro-batch, the `startOffset` will be
* null as well.
*
* When this method is called on a MicroBatchStream, the `startOffset` will be `initialOffset`
* for the very first micro-batch. The source can return `null` if there is no data to process.
*/
Offset latestOffset(Offset startOffset, ReadLimit limit);
}
Loading