From debe111da3cf1981a4c666554eba669be248f9d4 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Mon, 13 Feb 2017 16:01:06 -0800 Subject: [PATCH 1/7] update structured streaming documentation around batch mode --- .../structured-streaming-kafka-integration.md | 148 +++++++++++++++++- 1 file changed, 141 insertions(+), 7 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 8b2f51a378dc7..e88d1c018717e 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -119,6 +119,122 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +### Creating a Kafka Source Batch + +
+
+{% highlight scala %} + +// Subscribe to 1 topic defaults to the earliest and latest offsets +val ds1 = spark + .read + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1") + .load() +ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + +// Subscribe to multiple topics, specifying explicit Kafka offsets +val ds2 = spark + .read + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1,topic2") + .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") + .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") + .load() +ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + +// Subscribe to a pattern, at the earliest and latest offsets +val ds3 = spark + .read + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribePattern", "topic.*") + .option("startingOffsets", "earliest") + .option("endingOffsets", "latest") + .load() +ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + +{% endhighlight %} +
+
+{% highlight java %} + +// Subscribe to 1 topic defaults to the earliest and latest offsets +Dataset ds1 = spark + .read() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1") + .load() +ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + +// Subscribe to multiple topics, specifying explicit Kafka offsets +Dataset ds2 = spark + .read() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1,topic2") + .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}") + .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}) + .load() +ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + +// Subscribe to a pattern, at the earliest and latest offsets +Dataset ds3 = spark + .read() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribePattern", "topic.*") + .option("startingOffsets", "earliest") + .option("endingOffsets", "latest") + .load() +ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + +{% endhighlight %} +
+
+{% highlight python %} + +# Subscribe to 1 topic defaults to the earliest and latest offsets +ds1 = spark + .read + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1") + .load() +ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + +# Subscribe to multiple topics, specifying explicit Kafka offsets +ds2 = spark + .read + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1,topic2") + .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") + .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") + .load() +ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + +# Subscribe to a pattern, at the earliest and latest offsets +ds3 = spark + .read + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribePattern", "topic.*") + .option("startingOffsets", "earliest") + .option("endingOffsets", "latest") + .load() +ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + +{% endhighlight %} +
+
+ Each row in the source has the following schema: @@ -152,7 +268,7 @@ Each row in the source has the following schema:
ColumnType
-The following options must be set for the Kafka source. +The following options must be set for the Kafka source (streaming and batch). @@ -187,25 +303,39 @@ The following options must be set for the Kafka source. The following configurations are optional:
Optionvaluemeaning
- + - - + + + + + + + + + - + @@ -213,24 +343,28 @@ The following configurations are optional: + + + +
Optionvaluedefaultmeaning
Optionvaluedefaultmodemeaning
startingOffsetsearliest, latest, or json string + earliest, latest (streaming only), or json string {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} lateststreaming=latest, batch=earlieststreaming and batch The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. - Note: This only applies when a new Streaming query is started, and that resuming will always pick - up from where the query left off. Newly discovered partitions during a query will start at + Note: For Batch, latest (either implicitly or by using -1 in json) is not allowed. + For Streaming, this only applies when a new Streaming query is started, and that resuming will + always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.
endingOffsetslatest or json string + {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} + latestbatch onlyThe end point when a batch query is started, either "latest" which is just from the latest + offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -1 + as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed.
failOnDataLoss true or false trueWhether to fail the query when it's possible that data is lost (e.g., topics are deleted, or + streaming onlyWhether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.
kafkaConsumer.pollTimeoutMs long 512streaming and batch The timeout in milliseconds to poll data from Kafka in executors.
fetchOffset.numRetries int 3streaming and batch Number of times to retry before giving up fatch Kafka latest offsets.
fetchOffset.retryIntervalMs long 10streaming and batch milliseconds to wait before retrying to fetch Kafka offsets
maxOffsetsPerTrigger long nonestreaming and batch Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.
From 081ea246b5686657bd483c68bda522078aa2cd13 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Mon, 13 Feb 2017 16:10:14 -0800 Subject: [PATCH 2/7] update --- docs/structured-streaming-kafka-integration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index e88d1c018717e..953c3a4d15559 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -180,7 +180,7 @@ Dataset ds2 = spark .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}") - .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}) + .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}") .load() ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") From 28af04062fbcc3161519b5ee4485333bb786a9ee Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Mon, 13 Feb 2017 18:09:58 -0800 Subject: [PATCH 3/7] address comments from @zsxwing --- .../structured-streaming-kafka-integration.md | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 953c3a4d15559..bb0b9e673deb3 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -120,6 +120,8 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") ### Creating a Kafka Source Batch +If you have a use case that is better suited to batch processing, +you can create an Dataset/DataFrame for a defined range of offsets.
@@ -170,8 +172,8 @@ Dataset ds1 = spark .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") - .load() -ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .load(); +ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); // Subscribe to multiple topics, specifying explicit Kafka offsets Dataset ds2 = spark @@ -181,8 +183,8 @@ Dataset ds2 = spark .option("subscribe", "topic1,topic2") .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}") .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}") - .load() -ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .load(); +ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); // Subscribe to a pattern, at the earliest and latest offsets Dataset ds3 = spark @@ -192,8 +194,8 @@ Dataset ds3 = spark .option("subscribePattern", "topic.*") .option("startingOffsets", "earliest") .option("endingOffsets", "latest") - .load() -ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .load(); +ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); {% endhighlight %}
@@ -201,7 +203,7 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") {% highlight python %} # Subscribe to 1 topic defaults to the earliest and latest offsets -ds1 = spark +ds1 = spark \ .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") @@ -210,7 +212,7 @@ ds1 = spark ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # Subscribe to multiple topics, specifying explicit Kafka offsets -ds2 = spark +ds2 = spark \ .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") @@ -221,7 +223,7 @@ ds2 = spark ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # Subscribe to a pattern, at the earliest and latest offsets -ds3 = spark +ds3 = spark \ .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") @@ -326,8 +328,8 @@ The following configurations are optional: latest batch only - The end point when a batch query is started, either "latest" which is just from the latest - offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -1 + The end point when a batch query is ended, either "latest" which is just referred to the + latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed. From f5318e0a3b3c1240781f65b4a3a1bafbe31efc6f Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Tue, 14 Feb 2017 10:29:23 -0800 Subject: [PATCH 4/7] updatedd @zsxwing --- .../structured-streaming-kafka-integration.md | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index bb0b9e673deb3..f19954beaebf2 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -204,32 +204,32 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); # Subscribe to 1 topic defaults to the earliest and latest offsets ds1 = spark \ - .read - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("subscribe", "topic1") + .read \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("subscribe", "topic1") \ .load() ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # Subscribe to multiple topics, specifying explicit Kafka offsets ds2 = spark \ - .read - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("subscribe", "topic1,topic2") - .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") - .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") + .read \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("subscribe", "topic1,topic2") \ + .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \ + .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \ .load() ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # Subscribe to a pattern, at the earliest and latest offsets ds3 = spark \ - .read - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("subscribePattern", "topic.*") - .option("startingOffsets", "earliest") - .option("endingOffsets", "latest") + .read \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("subscribePattern", "topic.*") \ + .option("startingOffsets", "earliest") \ + .option("endingOffsets", "latest") \ .load() ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") From 5bf32f43ca201ba6698b212a1a86d82a42972dbd Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Tue, 14 Feb 2017 13:40:29 -0800 Subject: [PATCH 5/7] update addressing comments from @zsxwing and @tdas --- docs/structured-streaming-kafka-integration.md | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index f19954beaebf2..4bb151df8d090 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -270,7 +270,8 @@ Each row in the source has the following schema: -The following options must be set for the Kafka source (streaming and batch). +The following options must be set for the Kafka source +for both batch and streaming queries. @@ -305,19 +306,19 @@ The following options must be set for the Kafka source (streaming and batch). The following configurations are optional:
Optionvaluemeaning
- + - - + @@ -353,7 +354,7 @@ The following configurations are optional: - + From 8983b8d77846ecec180fa98bdc130950f0609b65 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Tue, 14 Feb 2017 15:15:28 -0800 Subject: [PATCH 6/7] revised based on feedback from @tdas --- docs/structured-streaming-kafka-integration.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 4bb151df8d090..d1faeaa081eb3 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -317,8 +317,8 @@ The following configurations are optional: @@ -383,7 +383,7 @@ Note that the following Kafka params cannot be set and the Kafka source will thr where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that `startingOffsets` only applies when a new - Streaming query is started, and that resuming will always pick up from where the query left off. + streaming query is started, and that resuming will always pick up from where the query left off. - **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys. - **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer. From d55ba0977f559a2be3f186f0c7552b3fc8457be8 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Tue, 14 Feb 2017 16:38:42 -0800 Subject: [PATCH 7/7] revision @tdas --- docs/structured-streaming-kafka-integration.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index d1faeaa081eb3..522e669568678 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -328,7 +328,7 @@ The following configurations are optional: {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} - + @@ -337,10 +337,11 @@ The following configurations are optional: - + + as you expected. Batch queries will always fail if it fails to read any data from the provided + offsets due to lost data.
Optionvaluedefaultmodemeaning
Optionvaluedefaultquery typemeaning
startingOffsetsearliest, latest (streaming only), or json string - {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} + "earliest", "latest" (streaming only), or json string + """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ streaming=latest, batch=earliest"latest" for streaming, "earliest" for batch streaming and batch The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. - Note: For Batch, latest (either implicitly or by using -1 in json) is not allowed. - For Streaming, this only applies when a new Streaming query is started, and that resuming will + Note: For Batch queries, latest (either implicitly or by using -1 in json) is not allowed. + For Streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.
int 3 streaming and batchNumber of times to retry before giving up fatch Kafka latest offsets.Number of times to retry before giving up fetching Kafka offsets.
fetchOffset.retryIntervalMsThe start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. - Note: For Batch queries, latest (either implicitly or by using -1 in json) is not allowed. - For Streaming queries, this only applies when a new query is started, and that resuming will + Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. + For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.
latestbatch onlybatch query The end point when a batch query is ended, either "latest" which is just referred to the latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed.failOnDataLoss true or false truestreaming onlystreaming query Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work - as you expected.
kafkaConsumer.pollTimeoutMs