Skip to content

Commit c812eff

Browse files
Kosar, Vaclav: Functions Transformationvackosar
authored andcommitted
[SPARK-24647][SS] Report KafkaStreamWriter's written min and max offsets via CustomMetrics.
1 parent 6b8fbbf commit c812eff

File tree

4 files changed

+169
-7
lines changed

4 files changed

+169
-7
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,23 @@ package org.apache.spark.sql.kafka010
1919

2020
import scala.collection.JavaConverters._
2121

22+
import org.json4s.JsonDSL._
23+
import org.json4s.jackson.JsonMethods._
24+
2225
import org.apache.spark.sql.catalyst.InternalRow
2326
import org.apache.spark.sql.catalyst.expressions.Attribute
2427
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
28+
import org.apache.spark.sql.sources.v2.CustomMetrics
2529
import org.apache.spark.sql.sources.v2.writer._
26-
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
30+
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamWriter, SupportsCustomWriterMetrics}
2731
import org.apache.spark.sql.types.StructType
2832

2933
/**
3034
* Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
3135
* don't need to really send one.
3236
*/
33-
case object KafkaWriterCommitMessage extends WriterCommitMessage
37+
case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, maxOffset: KafkaSourceOffset)
38+
extends WriterCommitMessage
3439

3540
/**
3641
* A [[StreamWriter]] for Kafka writing. Responsible for generating the writer factory.
@@ -42,15 +47,25 @@ case object KafkaWriterCommitMessage extends WriterCommitMessage
4247
*/
4348
class KafkaStreamWriter(
4449
topic: Option[String], producerParams: Map[String, String], schema: StructType)
45-
extends StreamWriter {
50+
extends StreamWriter with SupportsCustomWriterMetrics {
51+
52+
private var customMetrics: KafkaWriterCustomMetrics = _
4653

4754
validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)
4855

4956
override def createWriterFactory(): KafkaStreamWriterFactory =
5057
KafkaStreamWriterFactory(topic, producerParams, schema)
5158

52-
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
59+
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
60+
customMetrics = KafkaWriterCustomMetrics(messages)
61+
}
62+
5363
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
64+
65+
override def getCustomMetrics: KafkaWriterCustomMetrics = {
66+
customMetrics
67+
}
68+
5469
}
5570

5671
/**
@@ -102,7 +117,9 @@ class KafkaStreamDataWriter(
102117
checkForErrors()
103118
producer.flush()
104119
checkForErrors()
105-
KafkaWriterCommitMessage
120+
val minOffset: KafkaSourceOffset = KafkaSourceOffset(minOffsetAccumulator.toMap)
121+
val maxOffset: KafkaSourceOffset = KafkaSourceOffset(maxOffsetAccumulator.toMap)
122+
KafkaWriterCommitMessage(minOffset, maxOffset)
106123
}
107124

108125
def abort(): Unit = {}
@@ -116,3 +133,66 @@ class KafkaStreamDataWriter(
116133
}
117134
}
118135
}
136+
137+
private[kafka010] case class KafkaWriterCustomMetrics(
138+
minOffset: KafkaSourceOffset,
139+
maxOffset: KafkaSourceOffset) extends CustomMetrics {
140+
override def json(): String = {
141+
val jsonVal = ("minOffset" -> parse(minOffset.json)) ~
142+
("maxOffset" -> parse(maxOffset.json))
143+
compact(render(jsonVal))
144+
}
145+
146+
override def toString: String = json()
147+
}
148+
149+
private[kafka010] object KafkaWriterCustomMetrics {
150+
151+
import Math.{min, max}
152+
153+
def apply(messages: Array[WriterCommitMessage]): KafkaWriterCustomMetrics = {
154+
val minMax = collate(messages)
155+
KafkaWriterCustomMetrics(minMax._1, minMax._2)
156+
}
157+
158+
private def collate(messages: Array[WriterCommitMessage]):
159+
(KafkaSourceOffset, KafkaSourceOffset) = {
160+
161+
messages.headOption.flatMap {
162+
case x: KafkaWriterCommitMessage =>
163+
val lower = messages.map(_.asInstanceOf[KafkaWriterCommitMessage])
164+
.map(_.minOffset).reduce(collateLower)
165+
val higher = messages.map(_.asInstanceOf[KafkaWriterCommitMessage])
166+
.map(_.maxOffset).reduce(collateHigher)
167+
Some((lower, higher))
168+
case _ => throw new IllegalArgumentException()
169+
}.getOrElse((KafkaSourceOffset(), KafkaSourceOffset()))
170+
}
171+
172+
private def collateHigher(o1: KafkaSourceOffset, o2: KafkaSourceOffset): KafkaSourceOffset = {
173+
collate(o1, o2, max)
174+
}
175+
176+
private def collateLower(o1: KafkaSourceOffset, o2: KafkaSourceOffset): KafkaSourceOffset = {
177+
collate(o1, o2, min)
178+
}
179+
180+
private def collate(
181+
o1: KafkaSourceOffset,
182+
o2: KafkaSourceOffset,
183+
collator: (Long, Long) => Long): KafkaSourceOffset = {
184+
val thisOffsets = o1.partitionToOffsets
185+
val thatOffsets = o2.partitionToOffsets
186+
val collated = (thisOffsets.keySet ++ thatOffsets.keySet)
187+
.map(key =>
188+
if (!thatOffsets.contains(key)) {
189+
key -> thisOffsets(key)
190+
} else if (!thisOffsets.contains(key)) {
191+
key -> thatOffsets(key)
192+
} else {
193+
key -> collator(thisOffsets(key), thatOffsets(key))
194+
}
195+
).toMap
196+
new KafkaSourceOffset(collated)
197+
}
198+
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
package org.apache.spark.sql.kafka010
1919

2020
import java.{util => ju}
21+
import java.util.concurrent.ConcurrentHashMap
2122

2223
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
24+
import org.apache.kafka.common.TopicPartition
2325

2426
import org.apache.spark.sql.catalyst.InternalRow
2527
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
@@ -61,12 +63,30 @@ private[kafka010] class KafkaWriteTask(
6163
private[kafka010] abstract class KafkaRowWriter(
6264
inputSchema: Seq[Attribute], topic: Option[String]) {
6365

66+
import scala.collection.JavaConverters._
67+
68+
protected val minOffsetAccumulator: collection.concurrent.Map[TopicPartition, Long] =
69+
new ConcurrentHashMap[TopicPartition, Long]().asScala
70+
71+
protected val maxOffsetAccumulator: collection.concurrent.Map[TopicPartition, Long] =
72+
new ConcurrentHashMap[TopicPartition, Long]().asScala
73+
6474
// used to synchronize with Kafka callbacks
6575
@volatile protected var failedWrite: Exception = _
6676
protected val projection = createProjection
6777

6878
private val callback = new Callback() {
79+
import Math.{min, max}
80+
6981
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
82+
if (recordMetadata != null) {
83+
val topicPartition = new TopicPartition(recordMetadata.topic(), recordMetadata.partition())
84+
val next = recordMetadata.offset()
85+
val currentMin = minOffsetAccumulator.getOrElse(topicPartition, next)
86+
minOffsetAccumulator.put(topicPartition, min(currentMin, next))
87+
val currentMax = maxOffsetAccumulator.getOrElse(topicPartition, next)
88+
maxOffsetAccumulator.put(topicPartition, max(currentMax, next))
89+
}
7090
if (failedWrite == null && e != null) {
7191
failedWrite = e
7292
}

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
2222

2323
import org.apache.kafka.clients.producer.ProducerConfig
2424
import org.apache.kafka.common.serialization.ByteArraySerializer
25+
import org.scalatest.time.Span
2526
import org.scalatest.time.SpanSugar._
2627

2728
import org.apache.spark.SparkException
@@ -38,7 +39,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
3839

3940
protected var testUtils: KafkaTestUtils = _
4041

41-
override val streamingTimeout = 30.seconds
42+
override val streamingTimeout: Span = 30.seconds
4243

4344
override def beforeAll(): Unit = {
4445
super.beforeAll()
@@ -229,6 +230,30 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
229230
}
230231
}
231232

233+
test("streaming - sink progress is produced") {
234+
/* ensure sink progress is correctly produced. */
235+
val input = MemoryStream[String]
236+
val topic = newTopic()
237+
testUtils.createTopic(topic)
238+
239+
val writer = createKafkaWriter(
240+
input.toDF(),
241+
withTopic = Some(topic),
242+
withOutputMode = Some(OutputMode.Update()))()
243+
244+
try {
245+
input.addData("1", "2", "3")
246+
failAfter(streamingTimeout) {
247+
writer.processAllAvailable()
248+
}
249+
val topicName = topic.toString
250+
val expected = "{\"minOffset\":{\"" + topicName + "\":{\"0\":0}}," +
251+
"\"maxOffset\":{\"" + topicName + "\":{\"0\":2}}}"
252+
assert(writer.lastProgress.sink.customMetrics == expected)
253+
} finally {
254+
writer.stop()
255+
}
256+
}
232257

233258
test("streaming - write data with bad schema") {
234259
val input = MemoryStream[String]
@@ -417,7 +442,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
417442
var stream: DataStreamWriter[Row] = null
418443
withTempDir { checkpointDir =>
419444
var df = input.toDF()
420-
if (withSelectExpr.length > 0) {
445+
if (withSelectExpr.nonEmpty) {
421446
df = df.selectExpr(withSelectExpr: _*)
422447
}
423448
stream = df.writeStream
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.kafka010
19+
20+
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
22+
23+
class KafkaWriterCustomMetricsSuite extends SparkFunSuite {
24+
25+
test("collate messages") {
26+
val minOffset1 = KafkaSourceOffset(("topic1", 1, 2), ("topic1", 2, 3))
27+
val maxOffset1 = KafkaSourceOffset(("topic1", 1, 2), ("topic1", 2, 5))
28+
val minOffset2 = KafkaSourceOffset(("topic1", 1, 0), ("topic1", 2, 3))
29+
val maxOffset2 = KafkaSourceOffset(("topic1", 1, 0), ("topic1", 2, 7))
30+
val messages: Array[WriterCommitMessage] = Array(
31+
KafkaWriterCommitMessage(minOffset1, maxOffset1),
32+
KafkaWriterCommitMessage(minOffset2, maxOffset2))
33+
val metrics = KafkaWriterCustomMetrics(messages)
34+
assert(metrics.minOffset === KafkaSourceOffset(("topic1", 1, 0), ("topic1", 2, 3)))
35+
assert(metrics.maxOffset === KafkaSourceOffset(("topic1", 1, 2), ("topic1", 2, 7)))
36+
}
37+
}

0 commit comments

Comments
 (0)