diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index 868edb5dcdc0c..92b13f2b555d1 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -29,6 +29,11 @@ import org.json4s.jackson.Serialization */ private object JsonUtils { private implicit val formats = Serialization.formats(NoTypeHints) + implicit val ordering = new Ordering[TopicPartition] { + override def compare(x: TopicPartition, y: TopicPartition): Int = { + Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) + } + } /** * Read TopicPartitions from json string @@ -51,7 +56,7 @@ private object JsonUtils { * Write TopicPartitions as json string */ def partitions(partitions: Iterable[TopicPartition]): String = { - val result = new HashMap[String, List[Int]] + val result = HashMap.empty[String, List[Int]] partitions.foreach { tp => val parts: List[Int] = result.getOrElse(tp.topic, Nil) result += tp.topic -> (tp.partition::parts) @@ -80,19 +85,31 @@ private object JsonUtils { * Write per-TopicPartition offsets as json string */ def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = { - val result = new HashMap[String, HashMap[Int, Long]]() - implicit val ordering = new Ordering[TopicPartition] { - override def compare(x: TopicPartition, y: TopicPartition): Int = { - Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) - } - } + val result = HashMap.empty[String, HashMap[Int, Long]] val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism partitions.foreach { tp => val off = partitionOffsets(tp) - val parts = result.getOrElse(tp.topic, new HashMap[Int, Long]) + val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long]) parts += tp.partition -> off result += tp.topic -> parts } Serialization.write(result) } + + /** + * Write per-topic partition lag as json string + */ + def partitionLags( + latestOffsets: Map[TopicPartition, Long], + processedOffsets: Map[TopicPartition, Long]): String = { + val result = HashMap.empty[String, HashMap[Int, Long]] + val partitions = latestOffsets.keySet.toSeq.sorted + partitions.foreach { tp => + val lag = latestOffsets(tp) - processedOffsets.getOrElse(tp, 0L) + val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long]) + parts += tp.partition -> lag + result += tp.topic -> parts + } + Serialization.write(Map("lag" -> result)) + } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala index 6c95b2b2560c4..900c9f4e7fbf3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import org.apache.commons.io.IOUtils +import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging @@ -33,9 +34,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} -import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions} import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader} -import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset, SupportsCustomReaderMetrics} import org.apache.spark.sql.types.StructType import org.apache.spark.util.UninterruptibleThread @@ -62,7 +63,7 @@ private[kafka010] class KafkaMicroBatchReader( metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) - extends MicroBatchReader with Logging { + extends MicroBatchReader with SupportsCustomReaderMetrics with Logging { private var startPartitionOffsets: PartitionOffsetMap = _ private var endPartitionOffsets: PartitionOffsetMap = _ @@ -158,6 +159,10 @@ private[kafka010] class KafkaMicroBatchReader( KafkaSourceOffset(endPartitionOffsets) } + override def getCustomMetrics: CustomMetrics = { + KafkaCustomMetrics(kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets) + } + override def deserializeOffset(json: String): Offset = { KafkaSourceOffset(JsonUtils.partitionOffsets(json)) } @@ -380,3 +385,18 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader( } } } + +/** + * Currently reports per topic-partition lag. + * This is the difference between the offset of the latest available data + * in a topic-partition and the latest offset that has been processed. + */ +private[kafka010] case class KafkaCustomMetrics( + latestOffsets: Map[TopicPartition, Long], + processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics { + override def json(): String = { + JsonUtils.partitionLags(latestOffsets, processedOffsets) + } + + override def toString: String = json() +} diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index aa898686c77ca..4a22f5507fd4c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -31,12 +31,13 @@ import scala.util.Random import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods._ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkContext import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession} -import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution @@ -701,6 +702,41 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) } } + test("custom lag metrics") { + import testImplicits._ + val topic = newTopic() + testUtils.createTopic(topic, partitions = 2) + testUtils.sendMessages(topic, (1 to 100).map(_.toString).toArray) + require(testUtils.getLatestOffsets(Set(topic)).size === 2) + + val kafka = spark + .readStream + .format("kafka") + .option("subscribe", topic) + .option("startingOffsets", s"earliest") + .option("maxOffsetsPerTrigger", 10) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + + implicit val formats = DefaultFormats + + val mapped = kafka.map(kv => kv._2.toInt + 1) + testStream(mapped)( + StartStream(trigger = OneTimeTrigger), + AssertOnQuery { query => + query.awaitTermination() + val source = query.lastProgress.sources(0) + // masOffsetsPerTrigger is 10, and there are two partitions containing 50 events each + // so 5 events should be processed from each partition and a lag of 45 events + val custom = parse(source.customMetrics) + .extract[Map[String, Map[String, Map[String, Long]]]] + custom("lag")(topic)("0") == 45 && custom("lag")(topic)("1") == 45 + } + ) + } + } abstract class KafkaSourceSuiteBase extends KafkaSourceTest {