Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.util.{UninterruptibleThread, UninterruptibleThreadRunner}

/**
* This class uses Kafka's own [[Admin]] API to read data offsets from Kafka.
Expand All @@ -58,13 +57,6 @@ private[kafka010] class KafkaOffsetReaderAdmin(
private[kafka010] val offsetFetchAttemptIntervalMs =
readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, "1000").toLong

/**
* [[UninterruptibleThreadRunner]] ensures that all [[Admin]] communication called in an
* [[UninterruptibleThread]]. In the case of streaming queries, we are already running in an
* [[UninterruptibleThread]], however for batch mode this is not the case.
*/
val uninterruptibleThreadRunner = new UninterruptibleThreadRunner("Kafka Offset Reader")

/**
* An AdminClient used in the driver to query the latest Kafka offsets.
* This only queries the offsets because AdminClient has no functionality to commit offsets like
Expand All @@ -73,7 +65,6 @@ private[kafka010] class KafkaOffsetReaderAdmin(
@volatile protected var _admin: Admin = null

protected def admin: Admin = synchronized {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
if (_admin == null) {
_admin = consumerStrategy.createAdmin(driverKafkaParams)
}
Expand Down Expand Up @@ -121,8 +112,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
* Closes the connection to Kafka, and cleans up state.
*/
override def close(): Unit = {
if (_admin != null) uninterruptibleThreadRunner.runUninterruptibly { stopAdmin() }
uninterruptibleThreadRunner.shutdown()
stopAdmin()
}

/**
Expand All @@ -141,9 +131,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
logDebug(s"Assigned partitions: $partitions. Seeking to $partitionOffsets")
partitionOffsets
}
val partitions = uninterruptibleThreadRunner.runUninterruptibly {
consumerStrategy.assignedTopicPartitions(admin)
}
val partitions = consumerStrategy.assignedTopicPartitions(admin)
// Obtain TopicPartition offsets with late binding support
offsetRangeLimit match {
case EarliestOffsetRangeLimit => partitions.map {
Expand Down Expand Up @@ -224,7 +212,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit,
fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long]
): KafkaSourceOffset = {
val fetched = partitionsAssignedToConsumer {
val fetched = partitionsAssignedToAdmin {
partitions => {
fnAssertParametersWithPartitions(partitions)

Expand Down Expand Up @@ -262,7 +250,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
* Fetch the earliest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].
*/
override def fetchEarliestOffsets(): Map[TopicPartition, Long] = partitionsAssignedToConsumer(
override def fetchEarliestOffsets(): Map[TopicPartition, Long] = partitionsAssignedToAdmin(
partitions => {
val listOffsetsParams = partitions.asScala.map(p => p -> OffsetSpec.earliest()).toMap.asJava
val partitionOffsets = listOffsets(admin, listOffsetsParams)
Expand All @@ -274,19 +262,16 @@ private[kafka010] class KafkaOffsetReaderAdmin(
* Fetch the latest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].
*
* Kafka may return earliest offsets when we are requesting latest offsets if `poll` is called
* right before `seekToEnd` (KAFKA-7703). As a workaround, we will call `position` right after
* `poll` to wait until the potential offset request triggered by `poll(0)` is done.
*
* In addition, to avoid other unknown issues, we also use the given `knownOffsets` to audit the
* In order to avoid unknown issues, we use the given `knownOffsets` to audit the
* latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less
* than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. When
* a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We cannot
* distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after retrying.
* distinguish this with issues like KAFKA-7703, so we just return whatever we get from Kafka
* after retrying.
*/
override def fetchLatestOffsets(
knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap =
partitionsAssignedToConsumer { partitions => {
partitionsAssignedToAdmin { partitions => {
val listOffsetsParams = partitions.asScala.map(_ -> OffsetSpec.latest()).toMap.asJava
if (knownOffsets.isEmpty) {
val partitionOffsets = listOffsets(admin, listOffsetsParams)
Expand Down Expand Up @@ -314,11 +299,10 @@ private[kafka010] class KafkaOffsetReaderAdmin(
}

// Retry to fetch latest offsets when detecting incorrect offsets. We don't use
// `withRetriesWithoutInterrupt` to retry because:
// `withRetries` to retry because:
//
// - `withRetriesWithoutInterrupt` will reset the consumer for each attempt but a fresh
// consumer has a much bigger chance to hit KAFKA-7703.
// - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
// - `withRetries` will reset the admin for each attempt but a fresh
// admin has a much bigger chance to hit KAFKA-7703 like issues.
var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
var attempt = 0
do {
Expand Down Expand Up @@ -351,7 +335,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
if (newPartitions.isEmpty) {
Map.empty[TopicPartition, Long]
} else {
partitionsAssignedToConsumer(partitions => {
partitionsAssignedToAdmin(partitions => {
// Get the earliest offset of each partition
val listOffsetsParams = newPartitions.filter { newPartition =>
// When deleting topics happen at the same time, some partitions may not be in
Expand Down Expand Up @@ -501,11 +485,11 @@ private[kafka010] class KafkaOffsetReaderAdmin(
rangeCalculator.getRanges(ranges, getSortedExecutorList)
}

private def partitionsAssignedToConsumer(
private def partitionsAssignedToAdmin(
body: ju.Set[TopicPartition] => Map[TopicPartition, Long])
: Map[TopicPartition, Long] = uninterruptibleThreadRunner.runUninterruptibly {
: Map[TopicPartition, Long] = {

withRetriesWithoutInterrupt {
withRetries {
val partitions = consumerStrategy.assignedTopicPartitions(admin).asJava
logDebug(s"Partitions assigned: $partitions.")
body(partitions)
Expand All @@ -516,37 +500,23 @@ private[kafka010] class KafkaOffsetReaderAdmin(
* Helper function that does multiple retries on a body of code that returns offsets.
* Retries are needed to handle transient failures. For e.g. race conditions between getting
* assignment and getting position while topics/partitions are deleted can cause NPEs.
*
* This method also makes sure `body` won't be interrupted to workaround similar issues like in
* `KafkaConsumer.poll`. (KAFKA-1894)
*/
private def withRetriesWithoutInterrupt(
body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])

private def withRetries(body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
synchronized {
var result: Option[Map[TopicPartition, Long]] = None
var attempt = 1
var lastException: Throwable = null
while (result.isEmpty && attempt <= maxOffsetFetchAttempts
&& !Thread.currentThread().isInterrupted) {
Thread.currentThread match {
case ut: UninterruptibleThread =>
ut.runUninterruptibly {
try {
result = Some(body)
} catch {
case NonFatal(e) =>
lastException = e
logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e)
attempt += 1
Thread.sleep(offsetFetchAttemptIntervalMs)
resetAdmin()
}
}
case _ =>
throw new IllegalStateException(
"Kafka APIs must be executed on a o.a.spark.util.UninterruptibleThread")
try {
result = Some(body)
} catch {
case NonFatal(e) =>
lastException = e
logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e)
attempt += 1
Thread.sleep(offsetFetchAttemptIntervalMs)
resetAdmin()
}
}
if (Thread.interrupted()) {
Expand All @@ -562,7 +532,6 @@ private[kafka010] class KafkaOffsetReaderAdmin(
}

private def stopAdmin(): Unit = synchronized {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
if (_admin != null) _admin.close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ import java.nio.charset.StandardCharsets.UTF_8
import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.tailrec

import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkConf
import org.apache.spark.SparkException
import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.sql.{DataFrameReader, QueryTest}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
Expand Down Expand Up @@ -270,7 +267,9 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession
test("no matched offset for timestamp - startingOffsets") {
val (topic, timestamps) = prepareTimestampRelatedUnitTest

val e = intercept[SparkException] {
// KafkaOffsetReaderConsumer and KafkaOffsetReaderAdmin both throws AssertionError
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to help reviewers I've created a small extract how the exception difference look like.
With UninterruptibleThread test execution:

org.apache.spark.SparkException(Exception thrown in awaitResult: )
java.util.concurrent.ExecutionException(Boxed Error)
java.lang.AssertionError(assertion failed: No offset matched from request of topic-partition topic-0-2 and timestamp 9223372036854775807.)

Without UninterruptibleThread test execution:

java.lang.AssertionError(assertion failed: No offset matched from request of topic-partition topic-0-2 and timestamp 9223372036854775807.)

// but the UninterruptibleThread used by KafkaOffsetReaderConsumer wraps it with SparkException
val e = intercept[Throwable] {
verifyTimestampRelatedQueryResult({ df =>
// partition 2 will make query fail
val startTopicTimestamps = Map(
Expand All @@ -283,19 +282,7 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession
}, topic, Seq.empty)
}

@tailrec
def assertionErrorInExceptionChain(e: Throwable): Boolean = {
if (e.isInstanceOf[AssertionError]) {
true
} else if (e.getCause == null) {
false
} else {
assertionErrorInExceptionChain(e.getCause)
}
}

assert(assertionErrorInExceptionChain(e),
"Cannot find expected AssertionError in chained exceptions")
TestUtils.assertExceptionMsg(e, "No offset matched from request")
}

test("no matched offset for timestamp - endingOffsets") {
Expand Down