-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21869][SS][DOCS][FOLLOWUP] Document Kafka producer pool configuration #27146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #116365 has finished for PR 27146 at commit
|
|
|
||
| Given Kafka producer instance is designed to be thread-safe, Spark initializes a Kafka producer instance and co-use across tasks for same caching key. | ||
|
|
||
| The caching key is built up from the following information: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question. Does The following information cover all configurations used in paramsToSeq? It seems that setAuthenticationConfigIfNeeded injects more config in addition to the following twos.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically you're right, the cache key will contain the configuration including auth config Spark will inject. I've not mentioned it as we start to explain the internal which end users may be OK with only knowing abstracted info.
And same configuration goes to same addition, except a case where delegation token is renewed. Not 100% sure about details, @gaborgsomogyi could you help me confirming this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun really good point! If delegation token used then time to time new producer must be created and the old must be evicted otherwise the query will fail. There are multiple ways to reach that (not yet analyzed how it's done in the latest change made by @HeartSaVioR but I'm on it):
- Either the cache key contains authentication information (dynamic jaas config). This way the new producer creation and old eviction would be automatic. Without super deep consideration that's my suggested way.
- Or the cache key NOT contains authentication information (dynamic jaas config). This way additional logic must be added to handle this scenario. At the first place I have the feeling it would just add complexity increase and would make this part of code brittle.
As I understand from @HeartSaVioR comment the first approach is implemented at the moment. If that so then I'm fine with that but I would mention 2 things here:
- The key may contain authentication information
- There could be situations where more than one producer is instantiated. This is important because producers are consuming significant amount of memory as @zsxwing pointed out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we take the first approach - I just followed the way we did it before.
Back to the topic, would we like to add the details on the guide doc? I'll address it if we would like to let end users know about. Otherwise we could leave it as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree the first bullet point is questionable since it's a deep detail but the second is an important memory sizing information. Let's hear what @dongjoon-hyun thinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be more useful if you want to explain how a user can force separate producers to be used. Otherwise it's an internal detail that doesn't really affect users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for @vanzin 's advice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... would we like to guide how to tweak it, or would like to just hide it? The reason I explain this is that end users may encounter an issue on producer pool and would like to debug a bit, but if it feels us to be too internal, sounds OK to hide it as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added some explanation since it makes sense to let end users know about it to debug.
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking tare @HeartSaVioR
| <tr> | ||
| <td>spark.kafka.producer.cache.evictorThreadRunInterval</td> | ||
| <td>The interval of time between runs of the idle evictor thread for producer pool. When non-positive, no idle evictor thread will be run.</td> | ||
| <td>1m (1 minutes)</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/1 minutes/1 minute
|
|
||
| Given Kafka producer instance is designed to be thread-safe, Spark initializes a Kafka producer instance and co-use across tasks for same caching key. | ||
|
|
||
| The caching key is built up from the following information: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun really good point! If delegation token used then time to time new producer must be created and the old must be evicted otherwise the query will fail. There are multiple ways to reach that (not yet analyzed how it's done in the latest change made by @HeartSaVioR but I'm on it):
- Either the cache key contains authentication information (dynamic jaas config). This way the new producer creation and old eviction would be automatic. Without super deep consideration that's my suggested way.
- Or the cache key NOT contains authentication information (dynamic jaas config). This way additional logic must be added to handle this scenario. At the first place I have the feeling it would just add complexity increase and would make this part of code brittle.
As I understand from @HeartSaVioR comment the first approach is implemented at the moment. If that so then I'm fine with that but I would mention 2 things here:
- The key may contain authentication information
- There could be situations where more than one producer is instantiated. This is important because producers are consuming significant amount of memory as @zsxwing pointed out.
|
@HeartSaVioR @dongjoon-hyun I've just answered a question in the user channel and realized that the following doc page is collapsed somehow: https://spark.apache.org/docs/3.0.0-preview/structured-streaming-kafka-integration.html Since my environment to generate docs collapsed as it is could you double check that the mentioned page work without this change? I don't see any obvious issue which should cause such collapse. |
|
The doc collapsed issue was fixed via #27098 |
|
Test build #116486 has finished for PR 27146 at commit
|
Then only a rebuild needed. |
|
Yeah I sought a bit, and realized the published doc seems to be only broken for preview/preview2. If then may not be a big deal. |
|
Test build #116584 has finished for PR 27146 at commit
|
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @HeartSaVioR , @gaborgsomogyi , and @vanzin .
Merged to master.
BTW, @HeartSaVioR , for a documentation PR, we need a screenshot. I attached one for you.
|
Thanks all for reviewing and merging! @dongjoon-hyun What about enhancing the steps on contribution for documentation? (Either adding this to 'contributing' page or even adding this to PR template.) I'm not sure we explicitly have some requirements, and it would be helpful if we standardize this. |
|
Btw I'm also seeing different understanding of the section "Does this PR introduce any user-facing change?" around many open PRs. My understanding of intention for the section is emphasizing the fact and enumerating if there's any behavioral changes / API side changes so that end users are likely to change their query/code. (So if the answer of section is yes then the patch should have to be reviewed carefully.) Expanding this to anything end users are facing would lead the answer of section to be most likely "yes", lighten the meaning of the section. I might be missing anything, welcome discussion around this. |
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Late LGTM.
What changes were proposed in this pull request?
This patch documents the configuration for the Kafka producer pool, newly revised via SPARK-21869 (#26845)
Why are the changes needed?
The explanation of new Kafka producer pool configuration is missing, whereas the doc has Kafka
consumer pool configuration.
Does this PR introduce any user-facing change?
Yes. This is a documentation change.
How was this patch tested?
N/A