-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20452][SS][Kafka]Fix a potential ConcurrentModificationException for batch Kafka DataFrame #17752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| def close(): Unit = { | ||
| consumer.close() | ||
| kafkaReaderThread.shutdownNow() | ||
| runUninterruptibly { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the kafkaReaderThread thread is using the consumer, consumer.close will throw ConcurrentModificationException. Put it inside runUninterruptibly to prevent this case happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this prevent it? seems like you want a lock, so that the consumer is not being used while close is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just like other methods wrapped with runUninterruptibly which runs either in the stream thread or kafkaReaderThread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm. i understand that runUninterruptibly ensures that.
| runUninterruptibly { | ||
| consumer.close() | ||
| } | ||
| kafkaReaderThread.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not need to interrupt since kafkaReaderThread is UninterruptibleThread
| // Each running query should use its own group id. Otherwise, the query may be only assigned | ||
| // partial data since Kafka will assign partitions to multiple consumers having the same group | ||
| // id. Hence, we should generate a unique id for each query. | ||
| val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generate the unique group id and KafkaOffsetReader here, and close it inside this method, so that we never use the same reader at different threads (such as using the same DataFrame in different threads).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: previous codes forget to close KafkaOffsetReader
| ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName) | ||
| } | ||
|
|
||
| private def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to object KafkaSourceProvider
| } | ||
|
|
||
| /** Class to conveniently update Kafka config params, while logging the changes */ | ||
| private case class ConfigUpdater(module: String, kafkaParams: Map[String, String]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to object KafkaSourceProvider and change logInfo to logDebug. Kafka consumer will print all configs. Not need to print duplicated information in the logs.
| .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) | ||
| .build() | ||
|
|
||
| private def kafkaParamsForExecutors( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to object KafkaSourceProvider
| if (!reuseKafkaConsumer) { | ||
| // If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. As here we | ||
| // uses `assign`, we don't need to worry about the "group.id" conflicts. | ||
| new CachedKafkaConsumer(new TopicPartition(topic, kafkaPartition), executorKafkaParams) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the major change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be more consistent with getOrCreate if you just add create method to CachedKafkaConsumer
| // If this is reattempt at running the task, then invalidate cache and start with | ||
| // a new consumer | ||
| if (TaskContext.get != null && TaskContext.get.attemptNumber > 1) { | ||
| if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix attemptNumber. It starts with 0.
| val consumer = if (useConsumerCache) { | ||
| CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) | ||
| if (context.attemptNumber > 1) { | ||
| if (context.attemptNumber >= 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix attemptNumber. It starts with 0.
|
Test build #76120 has finished for PR 17752 at commit
|
| * but processing the same topicpartition and group id in multiple threads is usually bad anyway. | ||
| */ | ||
| private[kafka010] case class CachedKafkaConsumer private( | ||
| private[kafka010] case class CachedKafkaConsumer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isnt it cleaner to create a new method create in object CachedKafkaConsumer which returns a new CachedKafkaConsumer without putting it in the map?
|
Test build #76124 has finished for PR 17752 at commit
|
|
LGTM. Merging to master and 2.2 |
…ion for batch Kafka DataFrame ## What changes were proposed in this pull request? Cancel a batch Kafka query but one of task cannot be cancelled, and rerun the same DataFrame may cause ConcurrentModificationException because it may launch two tasks sharing the same group id. This PR always create a new consumer when `reuseKafkaConsumer = false` to avoid ConcurrentModificationException. It also contains other minor fixes. ## How was this patch tested? Jenkins. Author: Shixiong Zhu <[email protected]> Closes #17752 from zsxwing/kafka-fix. (cherry picked from commit 823baca) Signed-off-by: Tathagata Das <[email protected]>
What changes were proposed in this pull request?
Cancel a batch Kafka query but one of task cannot be cancelled, and rerun the same DataFrame may cause ConcurrentModificationException because it may launch two tasks sharing the same group id.
This PR always create a new consumer when
reuseKafkaConsumer = falseto avoid ConcurrentModificationException. It also contains other minor fixes.How was this patch tested?
Jenkins.