Skip to content

Commit eb81be6

Browse files
vvysotskyianicolasppmgorbov
authored andcommitted
Cherry-pick two commits for Spark-672 (apache#661)
* Update package.scala (apache#650) * Update package.scala - We need to change the API access point to use `InputDStream` instead of `DirectKafkaInputDStream` since this is private, but it extends `InputDStream` - The same functionality is there and the same principles apply: We can commit offsets directly without the boilerplate code but as always, it can only be done on the original Stream and NOT in the transformations * Update package.scala * Branch 2.4.4 mapr offsets fix (apache#654) * Update package.scala - We need to change the API access point to use `InputDStream` instead of `DirectKafkaInputDStream` since this is private, but it extends `InputDStream` - The same functionality is there and the same principles apply: We can commit offsets directly without the boilerplate code but as always, it can only be done on the original Stream and NOT in the transformations * Update package.scala * Using InputDStream of ConsumerRecord Co-authored-by: Mikhail Gorbov <[email protected]> Co-authored-by: Nicolas A Perez <[email protected]> Co-authored-by: Mikhail Gorbov <[email protected]>
1 parent 320f4e2 commit eb81be6

File tree

1 file changed

+5
-3
lines changed
  • external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010

1 file changed

+5
-3
lines changed

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ package org.apache.spark.streaming
2020
import org.apache.spark.internal.config.ConfigBuilder
2121

2222
import org.apache.kafka.clients.consumer.OffsetCommitCallback
23-
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, DirectKafkaInputDStream, HasOffsetRanges}
23+
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges}
24+
import org.apache.spark.streaming.dstream.InputDStream
25+
import org.apache.kafka.clients.consumer.ConsumerRecord
2426

2527
/**
2628
* Spark Integration for Kafka 0.9
@@ -86,10 +88,10 @@ package object kafka010 { //scalastyle:ignore
8688
def commitOffsetsAsync(): Unit = commitOffsetsAsync(null)
8789

8890
def commitOffsetsAsync(callback: OffsetCommitCallback): Unit = {
89-
directKafkaInputDStream.foreachRDD { rdd =>
91+
inputDStream.foreachRDD { rdd =>
9092
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
9193

92-
directKafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
94+
inputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
9395
}
9496
}
9597
}

0 commit comments

Comments
 (0)