-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] Add java code snippet for Kafka 0.10 integration doc #15679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] Add java code snippet for Kafka 0.10 integration doc #15679
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,9 +8,9 @@ The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 [ | |
| ### Linking | ||
| For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). | ||
|
|
||
| groupId = org.apache.spark | ||
| artifactId = spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}} | ||
| version = {{site.SPARK_VERSION_SHORT}} | ||
| groupId = org.apache.spark | ||
| artifactId = spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}} | ||
| version = {{site.SPARK_VERSION_SHORT}} | ||
|
|
||
| ### Creating a Direct Stream | ||
| Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010 | ||
|
|
@@ -44,6 +44,42 @@ For Scala/Java applications using SBT/Maven project definitions, link your strea | |
| Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html) | ||
| </div> | ||
| <div data-lang="java" markdown="1"> | ||
| import java.util.*; | ||
| import org.apache.spark.SparkConf; | ||
| import org.apache.spark.TaskContext; | ||
| import org.apache.spark.api.java.*; | ||
| import org.apache.spark.api.java.function.*; | ||
| import org.apache.spark.streaming.api.java.*; | ||
| import org.apache.spark.streaming.kafka010.*; | ||
| import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
| import org.apache.kafka.common.TopicPartition; | ||
| import org.apache.kafka.common.serialization.StringDeserializer; | ||
| import scala.Tuple2; | ||
|
|
||
| Map<String, Object> kafkaParams = new HashMap<String, Object>(); | ||
| kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092"); | ||
| kafkaParams.put("key.deserializer", StringDeserializer.class); | ||
| kafkaParams.put("value.deserializer", StringDeserializer.class); | ||
| kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); | ||
| kafkaParams.put("auto.offset.reset", "latest"); | ||
| kafkaParams.put("enable.auto.commit", false); | ||
|
|
||
| Collection<String> topics = Arrays.asList("topicA", "topicB"); | ||
|
|
||
| final JavaInputDStream<ConsumerRecord<String, String>> stream = | ||
| KafkaUtils.createDirectStream( | ||
| streamingContext, | ||
| LocationStrategies.PreferConsistent(), | ||
| ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) | ||
| ); | ||
|
|
||
| stream.mapToPair( | ||
| new PairFunction<ConsumerRecord<String, String>, String, String>() { | ||
| @Override | ||
| public Tuple2<String, String> call(ConsumerRecord<String, String> record) { | ||
| return new Tuple2<>(record.key(), record.value()); | ||
| } | ||
| }) | ||
| </div> | ||
| </div> | ||
|
|
||
|
|
@@ -85,6 +121,20 @@ If you have a use case that is better suited to batch processing, you can create | |
|
|
||
| </div> | ||
| <div data-lang="java" markdown="1"> | ||
| // Import dependencies and create kafka params as in Create Direct Stream above | ||
|
|
||
| OffsetRange[] offsetRanges = new OffsetRange[]{ | ||
|
||
| // topic, partition, inclusive starting offset, exclusive ending offset | ||
| OffsetRange.create("test", 0, 0, 100), | ||
| OffsetRange.create("test", 1, 0, 100) | ||
| }; | ||
|
|
||
| JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.<String, String>createRDD( | ||
|
||
| sparkContext, | ||
| kafkaParams, | ||
| offsetRanges, | ||
| LocationStrategies.PreferConsistent() | ||
| ); | ||
| </div> | ||
| </div> | ||
|
|
||
|
|
@@ -103,6 +153,20 @@ Note that you cannot use `PreferBrokers`, because without the stream there is no | |
| } | ||
| </div> | ||
| <div data-lang="java" markdown="1"> | ||
| stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { | ||
| @Override | ||
| public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { | ||
| final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); | ||
| rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() { | ||
| @Override | ||
| public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) throws Exception { | ||
|
||
| OffsetRange o = offsetRanges[TaskContext.get().partitionId()]; | ||
| System.out.println( | ||
| o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()); | ||
| } | ||
| }); | ||
| } | ||
| }); | ||
| </div> | ||
| </div> | ||
|
|
||
|
|
@@ -120,15 +184,24 @@ Kafka has an offset commit API that stores offsets in a special Kafka topic. By | |
| <div class="codetabs"> | ||
| <div data-lang="scala" markdown="1"> | ||
| stream.foreachRDD { rdd => | ||
| val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges | ||
| val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges | ||
|
|
||
| // some time later, after outputs have completed | ||
| stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets) | ||
| stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) | ||
| } | ||
|
|
||
| As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics. | ||
| </div> | ||
| <div data-lang="java" markdown="1"> | ||
| stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { | ||
| @Override | ||
| public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { | ||
| OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); | ||
|
|
||
| // some time later, after outputs have completed | ||
| ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i personally feel it'd be strange while we can stream.asInstanceOf[CanCommitOffsets].commitAsync(...) in scala, we must ((CanCommitOffsets) stream**.inputDStream()**).commitAsync(...) in java? I can open a pr to fix this when needed. @koeninger @zsxwing options please?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's far too late to fix those issues at this point. DStreams return an RDD, not a parameterized type. KafkaUtils methods return DStreams and RDDs, not an implementation specific type.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks Cody. sorry for not being clear, but my point was that the Java kafka input stream does not implements should ((CanCommitOffsets) stream).commitAsync(offsetRanges);rather than ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understood your point. My point is that you have to do the same kind of delegation to get access to HasOffsetRanges on a java rdd, and you're unlikely to be able to fix that kind of thing at this point without either changing the interfaces for dstream, or exposing implementation classes, which Spark is historically very much against. |
||
| } | ||
| }); | ||
| </div> | ||
| </div> | ||
|
|
||
|
|
@@ -141,9 +214,11 @@ For data stores that support transactions, saving offsets in the same transactio | |
|
|
||
| // begin from the the offsets committed to the database | ||
| val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet => | ||
| new TopicPartition(resultSet.string("topic")), resultSet.int("partition")) -> resultSet.long("offset") | ||
| new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset") | ||
| }.toMap | ||
|
|
||
| import org.apache.spark.streaming.kafka010.ConsumerStrategies.Assign | ||
|
||
|
|
||
| val stream = KafkaUtils.createDirectStream[String, String]( | ||
| streamingContext, | ||
| PreferConsistent, | ||
|
|
@@ -165,6 +240,36 @@ For data stores that support transactions, saving offsets in the same transactio | |
| } | ||
| </div> | ||
| <div data-lang="java" markdown="1"> | ||
| // The details depend on your data store, but the general idea looks like this | ||
|
|
||
| // begin from the the offsets committed to the database | ||
| Map<TopicPartition, Long> fromOffsets = new HashMap<>(); | ||
| for (resultSet: selectOffsetsFromYourDatabase) | ||
|
||
| fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset")); | ||
| } | ||
|
|
||
| JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.<String, String>createDirectStream( | ||
| streamingContext, | ||
| LocationStrategies.PreferConsistent(), | ||
| ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets) | ||
| ); | ||
|
|
||
| stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { | ||
| @Override | ||
| public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { | ||
| OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); | ||
|
|
||
| Object results = yourCalculation(rdd); | ||
|
|
||
| yourTransactionBlock { | ||
|
||
| // update results | ||
|
|
||
| // update offsets where the end of existing offsets matches the beginning of this batch of offsets | ||
|
|
||
| // assert that offsets were updated correctly | ||
| } | ||
| } | ||
| }); | ||
| </div> | ||
| </div> | ||
|
|
||
|
|
@@ -185,6 +290,14 @@ The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html | |
| ) | ||
| </div> | ||
| <div data-lang="java" markdown="1"> | ||
| Map<String, Object> kafkaParams = new HashMap<String, Object>(); | ||
| // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS | ||
| kafkaParams.put("security.protocol", "SSL"); | ||
| kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks"); | ||
| kafkaParams.put("ssl.truststore.password", "test1234"); | ||
| kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks"); | ||
| kafkaParams.put("ssl.keystore.password", "test1234"); | ||
| kafkaParams.put("ssl.key.password", "test1234"); | ||
| </div> | ||
| </div> | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: is the map value type really Object for the Kafka API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A: yes -- please see https://github.com/apache/kafka/blob/0.10.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L539