Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ The following properties are available to configure the consumer pool:
<tr>
<td>spark.kafka.consumer.cache.evictorThreadRunInterval</td>
<td>The interval of time between runs of the idle evictor thread for consumer pool. When non-positive, no idle evictor thread will be run.</td>
<td>1m (1 minutes)</td>
<td>1m (1 minute)</td>
</tr>
<tr>
<td>spark.kafka.consumer.cache.jmx.enable</td>
Expand Down Expand Up @@ -580,7 +580,7 @@ The following properties are available to configure the fetched data pool:
<tr>
<td>spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval</td>
<td>The interval of time between runs of the idle evictor thread for fetched data pool. When non-positive, no idle evictor thread will be run.</td>
<td>1m (1 minutes)</td>
<td>1m (1 minute)</td>
</tr>
</table>

Expand Down Expand Up @@ -802,6 +802,34 @@ df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
</div>
</div>

### Producer Caching

Given Kafka producer instance is designed to be thread-safe, Spark initializes a Kafka producer instance and co-use across tasks for same caching key.

The caching key is built up from the following information:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. Does The following information cover all configurations used in paramsToSeq? It seems that setAuthenticationConfigIfNeeded injects more config in addition to the following twos.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically you're right, the cache key will contain the configuration including auth config Spark will inject. I've not mentioned it as we start to explain the internal which end users may be OK with only knowing abstracted info.

And same configuration goes to same addition, except a case where delegation token is renewed. Not 100% sure about details, @gaborgsomogyi could you help me confirming this?

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi Jan 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun really good point! If delegation token used then time to time new producer must be created and the old must be evicted otherwise the query will fail. There are multiple ways to reach that (not yet analyzed how it's done in the latest change made by @HeartSaVioR but I'm on it):

  • Either the cache key contains authentication information (dynamic jaas config). This way the new producer creation and old eviction would be automatic. Without super deep consideration that's my suggested way.
  • Or the cache key NOT contains authentication information (dynamic jaas config). This way additional logic must be added to handle this scenario. At the first place I have the feeling it would just add complexity increase and would make this part of code brittle.

As I understand from @HeartSaVioR comment the first approach is implemented at the moment. If that so then I'm fine with that but I would mention 2 things here:

  • The key may contain authentication information
  • There could be situations where more than one producer is instantiated. This is important because producers are consuming significant amount of memory as @zsxwing pointed out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we take the first approach - I just followed the way we did it before.

Back to the topic, would we like to add the details on the guide doc? I'll address it if we would like to let end users know about. Otherwise we could leave it as it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the first bullet point is questionable since it's a deep detail but the second is an important memory sizing information. Let's hear what @dongjoon-hyun thinks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be more useful if you want to explain how a user can force separate producers to be used. Otherwise it's an internal detail that doesn't really affect users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for @vanzin 's advice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... would we like to guide how to tweak it, or would like to just hide it? The reason I explain this is that end users may encounter an issue on producer pool and would like to debug a bit, but if it feels us to be too internal, sounds OK to hide it as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added some explanation since it makes sense to let end users know about it to debug.


* Kafka producer configuration

This includes configuration for authorization, which Spark will automatically include when delegation token is being used. Even we take authorization into account, you can expect same Kafka producer instance will be used among same Kafka producer configuration.
It will use different Kafka producer when delegation token is renewed; Kafka producer instance for old delegation token will be evicted according to the cache policy.

The following properties are available to configure the producer pool:

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td>spark.kafka.producer.cache.timeout</td>
<td>The minimum amount of time a producer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
<td>10m (10 minutes)</td>
</tr>
<tr>
<td>spark.kafka.producer.cache.evictorThreadRunInterval</td>
<td>The interval of time between runs of the idle evictor thread for producer pool. When non-positive, no idle evictor thread will be run.</td>
<td>1m (1 minute)</td>
</tr>
</table>

Idle eviction thread periodically removes producers which are not used longer than given timeout. Note that the producer is shared and used concurrently, so the last used timestamp is determined by the moment the producer instance is returned and reference count is 0.

## Kafka Specific Configurations

Expand Down