Skip to content

Commit

Permalink
[improve][fn] Improve implementation for maxPendingAsyncRequests asyn…
Browse files Browse the repository at this point in the history
…c concurrency limit when return type is CompletableFuture<Void> (apache#23708)
  • Loading branch information
lhotari authored Dec 17, 2024
1 parent 8d7d1fb commit 8ad6777
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@
*/
@Data
public class JavaExecutionResult {
private Exception userException;
private Exception systemException;
private Throwable userException;
private Object result;

public void reset() {
setUserException(null);
setSystemException(null);
setResult(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;

Expand Down Expand Up @@ -57,13 +59,26 @@ public static class AsyncFuncRequest {
private final ExecutorService executor;
@Getter
private final LinkedBlockingQueue<AsyncFuncRequest> pendingAsyncRequests;
@Getter
private final Semaphore asyncRequestsConcurrencyLimiter;
private final boolean asyncPreserveInputOrderForOutputMessages;

public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceConfig instanceConfig) {

this.context = contextImpl;
this.instanceConfig = instanceConfig;
this.executor = Executors.newSingleThreadExecutor();
this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests());

asyncPreserveInputOrderForOutputMessages =
resolveAsyncPreserveInputOrderForOutputMessages(instanceConfig);

if (asyncPreserveInputOrderForOutputMessages) {
this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests());
this.asyncRequestsConcurrencyLimiter = null;
} else {
this.pendingAsyncRequests = null;
this.asyncRequestsConcurrencyLimiter = new Semaphore(this.instanceConfig.getMaxPendingAsyncRequests());
}

// create the functions
if (userClassObject instanceof Function) {
Expand All @@ -73,6 +88,20 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceCon
}
}

// resolve whether to preserve input order for output messages for async functions
private boolean resolveAsyncPreserveInputOrderForOutputMessages(InstanceConfig instanceConfig) {
// no need to preserve input order for output messages if the function returns Void type
boolean voidReturnType = instanceConfig.getFunctionDetails() != null
&& instanceConfig.getFunctionDetails().getSink() != null
&& Void.class.getName().equals(instanceConfig.getFunctionDetails().getSink().getTypeClassName());
if (voidReturnType) {
return false;
}

// preserve input order for output messages
return true;
}

@VisibleForTesting
public JavaExecutionResult handleMessage(Record<?> record, Object input) {
return handleMessage(record, input, (rec, result) -> {
Expand Down Expand Up @@ -103,15 +132,33 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input,
}

if (output instanceof CompletableFuture) {
// Function is in format: Function<I, CompletableFuture<O>>
AsyncFuncRequest request = new AsyncFuncRequest(
record, (CompletableFuture) output
);
try {
pendingAsyncRequests.put(request);
((CompletableFuture) output).whenCompleteAsync((res, cause) -> {
if (asyncPreserveInputOrderForOutputMessages) {
// Function is in format: Function<I, CompletableFuture<O>>
AsyncFuncRequest request = new AsyncFuncRequest(
record, (CompletableFuture) output
);
pendingAsyncRequests.put(request);
} else {
asyncRequestsConcurrencyLimiter.acquire();
}
((CompletableFuture<Object>) output).whenCompleteAsync((Object res, Throwable cause) -> {
try {
processAsyncResults(asyncResultConsumer);
if (asyncPreserveInputOrderForOutputMessages) {
processAsyncResultsInInputOrder(asyncResultConsumer);
} else {
try {
JavaExecutionResult execResult = new JavaExecutionResult();
if (cause != null) {
execResult.setUserException(FutureUtil.unwrapCompletionException(cause));
} else {
execResult.setResult(res);
}
asyncResultConsumer.accept(record, execResult);
} finally {
asyncRequestsConcurrencyLimiter.release();
}
}
} catch (Throwable innerException) {
// the thread used for processing async results failed
asyncFailureHandler.accept(innerException);
Expand All @@ -132,29 +179,27 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input,
}
}

private void processAsyncResults(JavaInstanceRunnable.AsyncResultConsumer resultConsumer) throws Exception {
// processes the async results in the input order so that the order of the result messages in the output topic
// are in the same order as the input
private void processAsyncResultsInInputOrder(JavaInstanceRunnable.AsyncResultConsumer resultConsumer)
throws Exception {
AsyncFuncRequest asyncResult = pendingAsyncRequests.peek();
while (asyncResult != null && asyncResult.getProcessResult().isDone()) {
pendingAsyncRequests.remove(asyncResult);
JavaExecutionResult execResult = new JavaExecutionResult();

JavaExecutionResult execResult = new JavaExecutionResult();
try {
Object result = asyncResult.getProcessResult().get();
execResult.setResult(result);
} catch (ExecutionException e) {
if (e.getCause() instanceof Exception) {
execResult.setUserException((Exception) e.getCause());
} else {
execResult.setUserException(new Exception(e.getCause()));
}
execResult.setUserException(FutureUtil.unwrapCompletionException(e));
}

resultConsumer.accept(asyncResult.getRecord(), execResult);

// peek the next result
asyncResult = pendingAsyncRequests.peek();
}

}

public void initialize() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ private StateStoreProvider getStateStoreProvider() throws Exception {
@VisibleForTesting
void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {
if (result.getUserException() != null) {
Exception t = result.getUserException();
Throwable t = result.getUserException();
log.warn("Encountered exception when processing message {}",
srcRecord, t);
stats.incrUserExceptions(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Function;
Expand Down Expand Up @@ -245,4 +248,65 @@ public UserException(String msg) {
super(msg);
}
}

@Test
public void testAsyncFunctionMaxPendingVoidResult() throws Exception {
CountDownLatch count = new CountDownLatch(1);
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder()
.setSink(org.apache.pulsar.functions.proto.Function.SinkSpec.newBuilder()
.setTypeClassName(Void.class.getName())
.build())
.build());
int pendingQueueSize = 3;
instanceConfig.setMaxPendingAsyncRequests(pendingQueueSize);
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();

Function<String, CompletableFuture<Void>> function = (input, context) -> {
CompletableFuture<Void> result = new CompletableFuture<>();
executor.submit(() -> {
try {
count.await();
result.complete(null);
} catch (Exception e) {
result.completeExceptionally(e);
}
});

return result;
};

JavaInstance instance = new JavaInstance(
mock(ContextImpl.class),
function,
instanceConfig);
String testString = "ABC123";

CountDownLatch resultsLatch = new CountDownLatch(3);

long startTime = System.currentTimeMillis();
assertEquals(pendingQueueSize, instance.getAsyncRequestsConcurrencyLimiter().availablePermits());
JavaInstanceRunnable.AsyncResultConsumer asyncResultConsumer = (rec, result) -> {
resultsLatch.countDown();
};
Consumer<Throwable> asyncFailureHandler = cause -> {
};
assertNull(instance.handleMessage(mock(Record.class), testString, asyncResultConsumer, asyncFailureHandler));
assertEquals(pendingQueueSize - 1, instance.getAsyncRequestsConcurrencyLimiter().availablePermits());
assertNull(instance.handleMessage(mock(Record.class), testString, asyncResultConsumer, asyncFailureHandler));
assertEquals(pendingQueueSize - 2, instance.getAsyncRequestsConcurrencyLimiter().availablePermits());
assertNull(instance.handleMessage(mock(Record.class), testString, asyncResultConsumer, asyncFailureHandler));
// no space left
assertEquals(0, instance.getAsyncRequestsConcurrencyLimiter().availablePermits());

count.countDown();

assertTrue(resultsLatch.await(5, TimeUnit.SECONDS));

long endTime = System.currentTimeMillis();

log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime);
instance.close();
}
}

0 comments on commit 8ad6777

Please sign in to comment.