-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21869][SS] Apply Apache Commons Pool to Kafka producer #25853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #111000 has finished for PR 25853 at commit
|
|
Test build #111003 has finished for PR 25853 at commit
|
|
cc @vanzin @HeartSaVioR since you've major knowledge about consumer caching |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good. Left some minor comments. Given you're planning to test with cluster env, it would be better to see the test result.
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
Outdated
Show resolved
Hide resolved
| * not yet returned, hence provide thread-safety usage of non-thread-safe objects unless caller | ||
| * shares the object to multiple threads. | ||
| */ | ||
| private[kafka010] abstract class InternalKafkaConnectorPool[K, V]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For other reviewers: this is pretty same as previous InternalKafkaConsumerPool, except
- It brings some abstract method to deal with extracting key from object.
- It replaces "consumer" related words in javadoc/code to common one to apply this to both consumer and producer.
| protected def createKey(connector: V): K | ||
| } | ||
|
|
||
| private[kafka010] abstract class PoolConfig[V] extends GenericKeyedObjectPoolConfig[V] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For other reviewers: this is pretty same as previous InternalKafkaConsumerPool.PoolConfig, except it brings some abstract methods to enable reading values from different configuration keys.
...kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| private[kafka010] abstract class ObjectFactory[K, V] extends BaseKeyedPooledObjectFactory[K, V] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For other reviewers: this is pretty same as previous InternalKafkaConsumerPool.ObjectFactory, except it brings some abstract methods to create/destroy objects.
.../kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
Show resolved
Hide resolved
| import org.apache.spark.sql.test.SharedSparkSession | ||
|
|
||
| class InternalKafkaConsumerPoolSuite extends SharedSparkSession { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
| @volatile var error: Throwable = null | ||
|
|
||
| def consume(i: Int): Unit = { | ||
| def consume(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For other reviewers: this removes unused parameter.
|
Thanks for your interest in redoing the entire patch from the start, you had to re-write the test suites as well to fit the new pool API used. Commons pool seems to wrap each pooled object with a PooledObect wrapper and keeps all the “inuse tracking” information inside that per object wrapper. Since guava cache does not do it, we had to add this tracking ourself earlier(in my previous PR). So definitely, this is better than using guava for this tracking.
Q. Is there a case, where we would use more than one kafka producer at a time ? if no, then why do we need object pooling? If yes, when would that happen? |
|
Test build #111282 has finished for PR 25853 at commit
|
If none of the tasks are using the old instance, then after the eviction time elapsed it will be closed and removed from cache.
Not sure I understand your question exactly. Let me try to answer my interpretation. Kafka connection pooling solves mainly one problem, namely it can spare the construction time of consumer/producer instances. This creation time can be significant when kerberos and SSL encryption is enabled which would happen in every micro-batch. As you've noted producers are thread safe so as a further optimization, instances can be shared between threads without any harm. Since Apache Commons Pool doesn't support this it can't be done here. If multiple threads are using a producer with the same Kafka params then multiple instances will be created (same happens just like the consumer side). This is a trade-off what I think makes sense comparing it with the main advantages listed in the PR description. If you mean something else, please clarify and we can discuss it... |
|
My question is, at any point in time, do we need more than one instance of kafka producer on the executor? Since Kafka Producer instance is shared across threads, there is actually one instance being used by all the executor threads. And if we are only using one instance of the Kafka Producer, then why do we need connection pooling? I am definitely not suggesting that Kafka Producer be created per task, in a micro batch. But, do we need object pooling here? |
|
Connection pool may not be needed, if we will never close the producer(s) in executor even in case of idle. (We're not.) That question actually makes me roll back to why we have custom cache. |
Not necessarily. It's a trade-off again.
I'm pretty sure there are hundreds of pros and cons. |
|
Actually, we intended that by using cache the code would be simpler, but it turned otherwise. I think, at the moment, the way it works is, the cache simply holds one instance of a producer and shares it across threads, when kafka parameters change, a fresh instance is created. The older instance, which is relinquished by all the threads is eventually closed. For the above case, we thought that cache would be cleaner solution. |
The rough stand in terms of lines is the following: |
That will become the most complicated part when we allow concurrent usage of instance. If we don't allow concurrent usage (like Commons Pool) we are always safe to close the instance whenever it sits in pool. If not, reference counting comes up and additional thread safety follows. I would be happy if I can see brilliant idea to simplify latter. Please feel free to share any idea if you have anything in mind.
I guess someone would concern about performance and cost between maintaining multiple producers vs concurrent using one producer, and then the concern would be valid, though we can't say former has much more overhead unless being measured. If we want simplicity, IMHO former would be much more simpler, especially we have already dealt with complexity on handling commons pool on consumer side. We just need to refactor the code to share the codebase of commons pool between consumer and producer. (And the PR does it nicely.) |
|
Does the current implementation in this patch lets us share the same KafkaProducer instance by all the executor threads, what happens when borrow is called with the same key more than once? |
|
That said in my previous comment here Apache Commons Pool doesn't support sharing, so the answer is no. |
|
Just a note: please update the description and result of manual tests when you have any. LGTM pending manual tests. |
|
Adding when having results. Since this is critical path in the streaming area I would like to keep the endurance test with artificial exceptions running for one week... |
|
Test type1: Endurance test https://github.com/gaborgsomogyi/spark-structured-kafka-stress-app |
|
During cluster tests it has been turned out the producer lifecycle has a problem. Going to check it next week... |
|
Added WIP to mark needed changes. |
|
I was away for some time but back and continuing... |
|
Test build #112018 has finished for PR 25853 at commit
|
|
Test build #113259 has finished for PR 25853 at commit
|
...nal/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
Outdated
Show resolved
Hide resolved
...l/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #113311 has finished for PR 25853 at commit
|
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noticed another minor thing when looking again.
.../kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
Outdated
Show resolved
Hide resolved
.../kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala
Outdated
Show resolved
Hide resolved
.../kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
Outdated
Show resolved
Hide resolved
.../kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala
Outdated
Show resolved
Hide resolved
|
Test build #113389 has finished for PR 25853 at commit
|
|
Merging to master. |
|
@gaborgsomogyi Thanks a lot for your contribution. Also thanks @HeartSaVioR for your benchmark result. However, I don't think it addressed my concerns. The benchmark covers the speed but it doesn't measure the increase of resources used by the new sharing strategy. Sharing producer in one JVM is actually recommended by Kafka API doc. Copying its statement here for people have not yet read it.
IMO, a Kafka producer is a heavy object. It includes threads, connections, and buffers, etc. If we change the cache to an object pool and not reuse it across tasks, we potentially increase a lot of pressure to executors and the Kafka clusters when an executor has lots of cores. For example, it's pretty common that an executor has 10-100 cores today. Before this patch, our strategy is sharing a producer for tasks that are using the same Kafka parameter. I don't think we should change this strategy. @gaborgsomogyi what do you think about my concerns? |
|
@gaborgsomogyi could you answer the question from @zsxwing ? |
|
Also, the behavior change should be documented in the migration guide. Please do not forget about it. |
|
This will be the first task on Monday. I've mentioned that documentation is not covered and will come. |
|
@zsxwing Since this PR is a tradeoff it's questionable because not everybody has the same threshold such cases. The copied recommendation is considered when filed the PR. Since the recommendation is not specific enough we've made the measurements to lower the uncertainty. The performance tests are given, the resource allocation part is not specifically written down. One producer allocates a little bit more than it's
This is true but not all the cores involved. The mentioned additional resource consumption applies only when cores are writing the exact same TopicPartition at the same time. This can be a number between 0 and the number of cores. Use-cases can be abused of course but hardly can believe that it can be efficient to write a single partition concurrently with 4+ cores. The additional allocation is not static so if a producer not used it times out and will be freed from cache allowing other computations to take over that memory. That said at the beginning this is a tradeoff and may not worth. If that would be the conclusion + considering the following:
I would question that we need to invalidate Kafka producers at all. Maybe removing timeout on producer side is the simplest solution?! |
Just to make sure we are on the same page. Previously, we were using the Kafka parameters as the cache key. As long as the kafka parameters are the same, tasks running at the same time will share the same producer. |
Each producer also potentially can connect to all brokers of a Kafka cluster. That will be a lot of connections. Summarizing the options we have:
Let me also add more context about SPARK-21869 since I created it. In the real workload I found out this bug, the root cause was actually not SPARK-21869. This workload had some memory issue and triggered large GC pauses. Writing zero messages in 10 minutes is actually because of GC pressure. After fixing the memory issue, this workload no longer hits SPARK-21869. IMO, writing zero messages in 10 minutes is usually not expected by the user and indicates some other issues. This makes me feel fixing SPARK-21869 is not worth if the fix requires potential increase of resource usage. Hence, I prefer option 3 right now. |
|
Well, the resource increase is definitely there which excludes option 1.
Thanks for sharing the initial issue, I agree it can happen rarely and the root cause must be fixed. Just a question here. The old code which creates single instance uses expireAfterAccess which means producer will be invalidated and closed when the cache item not accessed for 10 minutes. Do you think the original implementation will cover batch use-cases (since you've suggested |
|
Looks like Gabor has a good point about SPARK-21869. Elaborating the point a bit more, the root problem of producer cache is, it will be expired after 10 mins if some other task doesn't access the instance within new timeout, "regardless of the usage of producer". That doesn't work like heartbeat - producer instance will be expired even if the task gets the instance and writes the messages continuously more than 10 mins. If there's a luck, other task could access the same instance and extend the timeout, but if not, it's going to be timed out and task will be broken due to suddenly closed of producer instance. Things will get worse if there's some path which shares the producer instance directly to avoid getting instance from cache. So the cache doesn't look to be working as intuitive way - as it doesn't only expire the producer instance if there's no activity during timeout. A task spending 10 mins is abnormal case for streaming, so the behavior of cache won't bring actual issues for streaming cases, but I guess the expectations might be different for batch cases. |
|
Sorry for the late reply. That's a really good point about the batch case. However, I'm not convinced that we should go to the path that has potential stability issues. How about we revert this and try to fix the issue by adding reference count to each producer? I agree that it could be error prone, but errors can be avoided by adding more tests. In addition, if we keep this patch in master and we want to fix the potential stability issues in future, adding reference count to each producer sounds inevitable. |
|
I think the major problem is actually the timeout policy (sure it is not the only problem) - it's very weird to use "last access time + interval" as "time to live" as producer will be borrowed by tasks and being used until tasks return to the cache/pool. It should be "last returned time + interval", maybe "last returned time where reference count goes 0 + interval" considering reference count. That means, we may have to reimplement the cache logic regardless of reverting this one. |
|
FYI Gabor will probably not reply until next month. So whatever short-term action you want to take here, you shouldn't wait for him to comment. |
|
I just went through implementing new approach of SPARK-21869: please take a look at the branch below, which I revise the cache logic for producer. The actual diff is in this commit, HeartSaVioR@ae2b607 which is on top of revert commit of this PR. To minimize the change I just took the existing class and object but in other side trying to encapsulate, hence things may not seem to be beauty. (For example, I'd really like to avoid having only object and don't expose a chance to inject anything for testing - that's why I separated KafkaDataConsumer and pool implementations.) But it should be easier for us to review the concept. After we agree this is good to go, we can refine the code either before making a PR or in code review. Please let me know if the concept/direction looks good; if we feel this as good alternative for SPARK-21869, I guess we can revert the commit first, and raise a PR for new approach afterwards. |
|
Since we may cut a new release preview soon, I just went ahead and reverted this patch. @HeartSaVioR your solution sounds good. I will take a detail look. |
|
I've raised #26845 to reflect my solution for SPARK-21869. Please take a look. Thanks! |
|
Thanks guys to step in and dealt with this! I think we've came up with a better approach. |
What changes were proposed in this pull request?
Kafka producers are now closed when
spark.kafka.producer.cache.timeoutreached which could be significant problem when processing big SQL queries. The workaround was to increasespark.kafka.producer.cache.timeoutto a number where the biggest SQL query can be finished.In this PR I've adapted similar solution which already exists on the consumer side, namely applies Apache Commons Pool on the producer side as well. Main advantages choosing this solution:
What this PR contains:
InternalKafkaConsumerPooltoInternalKafkaConnectorPooland made it abstractInternalKafkaConsumerPoolandInternalKafkaProducerPoolCachedKafkaProducerto useInternalKafkaProducerPoolKafkaDataWriterandKafkaDataWriteTaskto release producer even in failure scenarioKafkaTestto clear not only producers but consumers as wellInternalKafkaConsumerPoolSuitetoInternalKafkaConnectorPoolSuitewhere only consumer tests are checking the behavior (please see comment for reasoning)What this PR not yet contains(but intended when the main concept is stable):
Why are the changes needed?
Kafka producer closed after 10 minutes (with default settings).
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing + additional unit tests.
Cluster tests being started.