Skip to content

Conversation

@gaborgsomogyi
Copy link
Contributor

What changes were proposed in this pull request?

Spark uses an old and deprecated API named KafkaConsumer.poll(long) which never returns and stays in live lock if metadata is not updated (for instance when broker disappears at consumer creation). Please see Kafka documentation and standalone test application for further details.

In this PR I've applied the new KafkaConsumer.poll(Duration) API on executor side. Please note driver side still uses the old API which will be fixed in SPARK-32032.

Why are the changes needed?

Infinite wait in KafkaConsumer.poll(long).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing unit tests.

@gaborgsomogyi
Copy link
Contributor Author

cc @zsxwing @HeartSaVioR

@SparkQA
Copy link

SparkQA commented Jun 19, 2020

Test build #124278 has finished for PR 28871 at commit d33a5f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code change looks good.

Since we're here and this changes the semantic of timeout, could we double-check the default value of timeout is enough, and the documentation is correct?

I roughly skimmed it and found the option follows spark.network.timeout as the default value which is 120s by default, but the default value of kafkaConsumer.pollTimeoutMs in integration doc is 512 ms. Would be nice to fix either one, and probably we would want to higher one, as now it counts metadata update.

I guess it'd be nice to also update the doc as well as migration guide (not 100% sure about this, so let's hear other's voice) so that the semantic of timeout now includes metadata update, so end users will want to set higher value than before.

@gaborgsomogyi
Copy link
Contributor Author

The reason why not added any increase in the kafkaConsumer.pollTimeoutMs area because I remembered always that it falls back to spark.network.timeout which is high enough.

Honestly didn't check the doc, presumed it's set properly. Nice catch! Fixed it. :)

@gaborgsomogyi
Copy link
Contributor Author

Related the migration guide I don't think it's required. What would you write there, which represents real value? Presumable you've set kafkaConsumer.pollTimeoutMs high enough but if it fails w/ timeout increase it more? :)

I've seen setups which contain enough room to handle such situations + admins know this issue well enough not to write this down.

@SparkQA
Copy link

SparkQA commented Jun 19, 2020

Test build #124288 has finished for PR 28871 at commit 42e4adb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @gaborgsomogyi and @HeartSaVioR .
Merged to master.

@gaborgsomogyi
Copy link
Contributor Author

Thank you @dongjoon-hyun and @HeartSaVioR for the quick action.
We've solved a long standing cluster stop issue.

holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
…or side to avoid infinite wait

### What changes were proposed in this pull request?
Spark uses an old and deprecated API named `KafkaConsumer.poll(long)` which never returns and stays in live lock if metadata is not updated (for instance when broker disappears at consumer creation). Please see [Kafka documentation](https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-long-) and [standalone test application](https://github.com/gaborgsomogyi/kafka-get-assignment) for further details.

In this PR I've applied the new `KafkaConsumer.poll(Duration)` API on executor side. Please note driver side still uses the old API which will be fixed in SPARK-32032.

### Why are the changes needed?
Infinite wait in `KafkaConsumer.poll(long)`.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing unit tests.

Closes apache#28871 from gaborgsomogyi/SPARK-32033.

Authored-by: Gabor Somogyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants