Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import org.json4s.jackson.Serialization
*/
private object JsonUtils {
private implicit val formats = Serialization.formats(NoTypeHints)
implicit val ordering = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
}

/**
* Read TopicPartitions from json string
Expand All @@ -51,7 +56,7 @@ private object JsonUtils {
* Write TopicPartitions as json string
*/
def partitions(partitions: Iterable[TopicPartition]): String = {
val result = new HashMap[String, List[Int]]
val result = HashMap.empty[String, List[Int]]
partitions.foreach { tp =>
val parts: List[Int] = result.getOrElse(tp.topic, Nil)
result += tp.topic -> (tp.partition::parts)
Expand Down Expand Up @@ -80,19 +85,31 @@ private object JsonUtils {
* Write per-TopicPartition offsets as json string
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]()
implicit val ordering = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
}
val result = HashMap.empty[String, HashMap[Int, Long]]
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp =>
val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
parts += tp.partition -> off
result += tp.topic -> parts
}
Serialization.write(result)
}

/**
* Write per-topic partition lag as json string
*/
def partitionLags(
latestOffsets: Map[TopicPartition, Long],
processedOffsets: Map[TopicPartition, Long]): String = {
val result = HashMap.empty[String, HashMap[Int, Long]]
val partitions = latestOffsets.keySet.toSeq.sorted
partitions.foreach { tp =>
val lag = latestOffsets(tp) - processedOffsets.getOrElse(tp, 0L)
val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
parts += tp.partition -> lag
result += tp.topic -> parts
}
Serialization.write(Map("lag" -> result))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._

import org.apache.commons.io.IOUtils
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
Expand All @@ -33,9 +34,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset, SupportsCustomReaderMetrics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.UninterruptibleThread

Expand All @@ -62,7 +63,7 @@ private[kafka010] class KafkaMicroBatchReader(
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends MicroBatchReader with Logging {
extends MicroBatchReader with SupportsCustomReaderMetrics with Logging {

private var startPartitionOffsets: PartitionOffsetMap = _
private var endPartitionOffsets: PartitionOffsetMap = _
Expand Down Expand Up @@ -158,6 +159,10 @@ private[kafka010] class KafkaMicroBatchReader(
KafkaSourceOffset(endPartitionOffsets)
}

override def getCustomMetrics: CustomMetrics = {
KafkaCustomMetrics(kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets)
}

override def deserializeOffset(json: String): Offset = {
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
}
Expand Down Expand Up @@ -380,3 +385,18 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader(
}
}
}

/**
* Currently reports per topic-partition lag.
* This is the difference between the offset of the latest available data
* in a topic-partition and the latest offset that has been processed.
*/
private[kafka010] case class KafkaCustomMetrics(
latestOffsets: Map[TopicPartition, Long],
processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics {
override def json(): String = {
JsonUtils.partitionLags(latestOffsets, processedOffsets)
}

override def toString: String = json()
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ import scala.util.Random

import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
Expand Down Expand Up @@ -701,6 +702,41 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) }
}

test("custom lag metrics") {
import testImplicits._
val topic = newTopic()
testUtils.createTopic(topic, partitions = 2)
testUtils.sendMessages(topic, (1 to 100).map(_.toString).toArray)
require(testUtils.getLatestOffsets(Set(topic)).size === 2)

val kafka = spark
.readStream
.format("kafka")
.option("subscribe", topic)
.option("startingOffsets", s"earliest")
.option("maxOffsetsPerTrigger", 10)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

implicit val formats = DefaultFormats

val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
StartStream(trigger = OneTimeTrigger),
AssertOnQuery { query =>
query.awaitTermination()
val source = query.lastProgress.sources(0)
// masOffsetsPerTrigger is 10, and there are two partitions containing 50 events each
// so 5 events should be processed from each partition and a lag of 45 events
val custom = parse(source.customMetrics)
.extract[Map[String, Map[String, Map[String, Long]]]]
custom("lag")(topic)("0") == 45 && custom("lag")(topic)("1") == 45
}
)
}

}

abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
Expand Down