Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][fn] Improve implementation for maxPendingAsyncRequests async concurrency limit when return type is CompletableFuture<Void> #23708

Merged

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented Dec 10, 2024

Motivation

When using asynchronous functions in Pulsar Functions, concurrency needs to be limited to prevent unbounded resource consumption. This is currently handled by the maxPendingAsyncRequests configuration parameter (default value: 1000). Without such limits, asynchronous processing could lead to unbounded message consumption from input topics, potentially causing resource exhaustion (out-of-memory errors) and function crashes.

However, the current implementation has a performance issue specifically for asynchronous functions that return CompletableFuture<Void>. In these cases, the current queue-based implementation introduces unnecessary processing delays when the concurrency limit is reached. In particular:

  1. When there's a slow response at the head of the queue, request processing gets delayed unnecessarily when the queue is full. This makes the actual concurrency limit lower than the configured limit and it becomes nondeterministic. Under heavy load, the concurrency limit will be introducing backpressure and this is why any slow request will cause an unnecessary additional latency to the messages in the backlog. This could lead to additional cumulative latency for backlogged messages since all previous delays impact the processing latency of later messages. This type of latency is avoidable with the solution explained in this PR.
  2. This introduces avoidable latency for functions using Context.newOutputMessage(...).sendAsync() to send messages to multiple topics
  3. The current queue-based ordering preservation is unnecessary when no results are being returned (when return type is CompletableFuture<Void>)

Modifications

This PR improves the implementation by:

  1. Using a Semaphore instead of LinkedBlockingQueue for concurrency limiting when the function's return type is CompletableFuture<Void>
  2. Preserving the existing queue-based implementation for other return types where output message ordering needs to match input ordering
  3. Making exception handling more robust by properly unwrapping completion exceptions
  4. Simplifying the JavaExecutionResult class by removing unused systemException field
  5. Adding a unit test to verify the new implementation

Implementation Details

  • Added logic to detect CompletableFuture<Void> return type through function configuration
  • Introduced a Semaphore-based concurrency limiter that gets used when order preservation isn't needed
  • Maintained backwards compatibility by keeping queue-based implementation for non-void return types
  • Updated error handling to better handle asynchronous exceptions and to avoid the extra wrapping of Throwable with new Exception(t) type of code.
  • Added test coverage for the new concurrency limiting approach

Testing

Added new unit test testAsyncFunctionMaxPendingVoidResult() that verifies:

  • Proper concurrency limiting with Semaphore
  • Correct handling of async void results

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added this to the 4.1.0 milestone Dec 10, 2024
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Dec 10, 2024
@lhotari lhotari marked this pull request as draft December 10, 2024 17:59
@lhotari lhotari marked this pull request as ready for review December 10, 2024 23:04
@lhotari lhotari force-pushed the lh-improve-pf-async-concurrency-limit branch from bec9365 to bc519d4 Compare December 10, 2024 23:05
@lhotari lhotari changed the title [improve][fn] Improve implementation for maxPendingAsyncRequests async concurrency limit [improve][fn] Improve implementation for maxPendingAsyncRequests async concurrency limit when return type is CompletableFuture<Void> Dec 10, 2024
@codecov-commenter
Copy link

codecov-commenter commented Dec 11, 2024

Codecov Report

Attention: Patch coverage is 90.62500% with 3 lines in your changes missing coverage. Please review.

Project coverage is 74.42%. Comparing base (bbc6224) to head (bc519d4).
Report is 797 commits behind head on master.

Files with missing lines Patch % Lines
...apache/pulsar/functions/instance/JavaInstance.java 90.00% 1 Missing and 2 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23708      +/-   ##
============================================
+ Coverage     73.57%   74.42%   +0.85%     
- Complexity    32624    35100    +2476     
============================================
  Files          1877     1945      +68     
  Lines        139502   147483    +7981     
  Branches      15299    16276     +977     
============================================
+ Hits         102638   109768    +7130     
- Misses        28908    29232     +324     
- Partials       7956     8483     +527     
Flag Coverage Δ
inttests 27.25% <0.00%> (+2.67%) ⬆️
systests 24.37% <37.50%> (+0.05%) ⬆️
unittests 73.80% <90.62%> (+0.96%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...pulsar/functions/instance/JavaExecutionResult.java 33.33% <100.00%> (+8.33%) ⬆️
...ulsar/functions/instance/JavaInstanceRunnable.java 73.29% <100.00%> (+0.24%) ⬆️
...apache/pulsar/functions/instance/JavaInstance.java 81.44% <90.00%> (+3.81%) ⬆️

... and 666 files with indirect coverage changes

@lhotari lhotari merged commit 8ad6777 into apache:master Dec 17, 2024
65 checks passed
lhotari added a commit that referenced this pull request Dec 18, 2024
…c concurrency limit when return type is CompletableFuture<Void> (#23708)

(cherry picked from commit 8ad6777)
lhotari added a commit that referenced this pull request Dec 18, 2024
…c concurrency limit when return type is CompletableFuture<Void> (#23708)

(cherry picked from commit 8ad6777)
lhotari added a commit that referenced this pull request Dec 18, 2024
…c concurrency limit when return type is CompletableFuture<Void> (#23708)

(cherry picked from commit 8ad6777)
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 19, 2024
…c concurrency limit when return type is CompletableFuture<Void> (apache#23708)

(cherry picked from commit 8ad6777)
(cherry picked from commit a8512e7)
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 21, 2024
…c concurrency limit when return type is CompletableFuture<Void> (apache#23708)

(cherry picked from commit 8ad6777)
(cherry picked from commit a8512e7)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 23, 2024
…c concurrency limit when return type is CompletableFuture<Void> (apache#23708)

(cherry picked from commit 8ad6777)
(cherry picked from commit a8512e7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants