Skip to content

Commit e4a93ac

Browse files
author
Andrew Or
committed
Fix tests?
1 parent 25416dc commit e4a93ac

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,8 +349,9 @@ object KafkaUtils {
349349
fromOffsets: Map[TopicAndPartition, Long],
350350
messageHandler: MessageAndMetadata[K, V] => R
351351
): InputDStream[R] = ssc.withScope {
352+
val cleanedHandler = ssc.sc.clean(messageHandler)
352353
new DirectKafkaInputDStream[K, V, KD, VD, R](
353-
ssc, kafkaParams, fromOffsets, messageHandler)
354+
ssc, kafkaParams, fromOffsets, cleanedHandler)
354355
}
355356

356357
/**
@@ -470,11 +471,12 @@ object KafkaUtils {
470471
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
471472
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
472473
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
474+
val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
473475
createDirectStream[K, V, KD, VD, R](
474476
jssc.ssc,
475477
Map(kafkaParams.toSeq: _*),
476478
Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
477-
messageHandler.call _
479+
cleanedHandler
478480
)
479481
}
480482

0 commit comments

Comments
 (0)