Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.UninterruptibleThread
Expand All @@ -55,7 +54,7 @@ private[kafka010] class KafkaMicroBatchStream(
options: CaseInsensitiveStringMap,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
failOnDataLoss: Boolean) extends SupportsAdmissionControl with MicroBatchStream with Logging {

private[kafka010] val pollTimeoutMs = options.getLong(
KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
Expand All @@ -79,13 +78,23 @@ private[kafka010] class KafkaMicroBatchStream(
KafkaSourceOffset(getOrCreateInitialPartitionOffsets())
}

override def latestOffset(start: Offset): Offset = {
override def getDefaultReadLimit: ReadLimit = {
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
}

override def latestOffset(): Offset = {
throw new UnsupportedOperationException(
"latestOffset(Offset, ReadLimit) should be called instead of this method")
}

override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
}.getOrElse {
latestPartitionOffsets
endPartitionOffsets = KafkaSourceOffset(readLimit match {
case rows: ReadMaxRows =>
rateLimit(rows.maxRows(), startPartitionOffsets, latestPartitionOffsets)
case _: ReadAllAvailable =>
latestPartitionOffsets
})
endPartitionOffsets
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSource._
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
Expand Down Expand Up @@ -79,7 +81,7 @@ private[kafka010] class KafkaSource(
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends Source with Logging {
extends SupportsAdmissionControl with Source with Logging {

private val sc = sqlContext.sparkContext

Expand Down Expand Up @@ -114,6 +116,10 @@ private[kafka010] class KafkaSource(
}.partitionToOffsets
}

override def getDefaultReadLimit: ReadLimit = {
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
}

private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None

private val converter = new KafkaRecordToRowConverter()
Expand All @@ -122,23 +128,30 @@ private[kafka010] class KafkaSource(

/** Returns the maximum available offset for this source. */
override def getOffset: Option[Offset] = {
throw new UnsupportedOperationException(
"latestOffset(Offset, ReadLimit) should be called instead of this method")
}

override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = {
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets

val latest = kafkaReader.fetchLatestOffsets(
currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
val offsets = maxOffsetsPerTrigger match {
case None =>
val offsets = limit match {
case rows: ReadMaxRows =>
if (currentPartitionOffsets.isEmpty) {
rateLimit(rows.maxRows(), initialPartitionOffsets, latest)
} else {
rateLimit(rows.maxRows(), currentPartitionOffsets.get, latest)
}
case _: ReadAllAvailable =>
latest
case Some(limit) if currentPartitionOffsets.isEmpty =>
rateLimit(limit, initialPartitionOffsets, latest)
case Some(limit) =>
rateLimit(limit, currentPartitionOffsets.get, latest)
}

currentPartitionOffsets = Some(offsets)
logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
Some(KafkaSourceOffset(offsets))
KafkaSourceOffset(offsets)
}

/** Proportionally distribute limit number of offsets among topicpartitions */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,28 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
13, 126, 127, 128, 129, 130, 131, 132, 133, 134
)
)

// When Trigger.Once() is used, the read limit should be ignored
val allData = Seq(1) ++ (10 to 20) ++ (100 to 200)
withTempDir { dir =>
testStream(mapped)(
StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath),
AssertOnQuery { q =>
q.processAllAvailable()
true
},
CheckAnswer(allData: _*),
StopStream,

AddKafkaData(Set(topic), 1000 to 1010: _*),
StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath),
AssertOnQuery { q =>
q.processAllAvailable()
true
},
CheckAnswer((allData ++ 1000.to(1010)): _*)
)
}
}

test("input row metrics") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.annotation.Evolving;

/**
* Represents a {@link ReadLimit} where the {@link MicroBatchStream} must scan all the data
* available at the streaming source. This is meant to be a hard specification as being able
* to return all available data is necessary for Trigger.Once() to work correctly.
* If a source is unable to scan all available data, then it must throw an error.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
* @since 3.0.0
*/
@Evolving
public final class ReadAllAvailable implements ReadLimit {
static final ReadAllAvailable INSTANCE = new ReadAllAvailable();

private ReadAllAvailable() {}

@Override
public String toString() {
return "All Available";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.sources
package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
import org.apache.spark.annotation.Evolving;

// A special `MicroBatchStream` that can get latestOffset with a start offset.
trait RateControlMicroBatchStream extends MicroBatchStream {
/**
* Interface representing limits on how much to read from a {@link MicroBatchStream} when it
* implements {@link SupportsAdmissionControl}. There are several child interfaces representing
* various kinds of limits.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
* @see ReadAllAvailable
* @see ReadMaxRows
*/
@Evolving
public interface ReadLimit {
static ReadLimit maxRows(long rows) { return new ReadMaxRows(rows); }

override def latestOffset(): Offset = {
throw new IllegalAccessException(
"latestOffset should not be called for RateControlMicroBatchReadSupport")
}
static ReadLimit maxFiles(int files) { return new ReadMaxFiles(files); }

def latestOffset(start: Offset): Offset
static ReadLimit allAvailable() { return ReadAllAvailable.INSTANCE; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.annotation.Evolving;

/**
* Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan approximately the
* given maximum number of files.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
* @since 3.0.0
*/
@Evolving
public class ReadMaxFiles implements ReadLimit {
private int files;
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

files -> maxFiles? Or, can we have a better name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's consistent this way with rows and maxRows


ReadMaxFiles(int maxFiles) {
this.files = maxFiles;
}

/** Approximate maximum rows to scan. */
public int maxFiles() { return this.files; }

@Override
public String toString() {
return "MaxFiles: " + maxFiles();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReadMaxFiles other = (ReadMaxFiles) o;
return other.maxFiles() == maxFiles();
}

@Override
public int hashCode() { return files; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.annotation.Evolving;

/**
* Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan approximately the
* given maximum number of rows.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
* @since 3.0.0
*/
@Evolving
public final class ReadMaxRows implements ReadLimit {
private long rows;

ReadMaxRows(long rows) {
this.rows = rows;
}

/** Approximate maximum rows to scan. */
public long maxRows() { return this.rows; }

@Override
public String toString() {
return "MaxRows: " + maxRows();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReadMaxRows other = (ReadMaxRows) o;
return other.maxRows() == maxRows();
}

@Override
public int hashCode() { return Long.hashCode(this.rows); }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.annotation.Evolving;

/**
* A mix-in interface for {@link SparkDataStream} streaming sources to signal that they can control
* the rate of data ingested into the system. These rate limits can come implicitly from the
* contract of triggers, e.g. Trigger.Once() requires that a micro-batch process all data
* available to the system at the start of the micro-batch. Alternatively, sources can decide to
* limit ingest through data source options.
*
* Through this interface, a MicroBatchStream should be able to return the next offset that it will
* process until given a {@link ReadLimit}.
*
* @since 3.0.0
*/
@Evolving
public interface SupportsAdmissionControl extends SparkDataStream {

/**
* Returns the read limits potentially passed to the data source through options when creating
* the data source.
*/
default ReadLimit getDefaultReadLimit() { return ReadLimit.allAvailable(); }

/**
* Returns the most recent offset available given a read limit. The start offset can be used
* to figure out how much new data should be read given the limit. Users should implement this
* method instead of latestOffset for a MicroBatchStream or getOffset for Source.
*
* When this method is called on a `Source`, the source can return `null` if there is no
* data to process. In addition, for the very first micro-batch, the `startOffset` will be
* null as well.
*
* When this method is called on a MicroBatchStream, the `startOffset` will be `initialOffset`
* for the very first micro-batch. The source can return `null` if there is no data to process.
*/
Offset latestOffset(Offset startOffset, ReadLimit limit);
}
Loading