diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/SimpleJsonResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/SimpleJsonResource.java index 818810a7047c0..4e68d3eadb5cd 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/SimpleJsonResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/SimpleJsonResource.java @@ -202,9 +202,8 @@ public List strings(List strings) { @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) public Person personTest(Person person) { - //large requests should get bumped from the IO thread - if (!BlockingOperationControl.isBlockingAllowed()) { - throw new RuntimeException("should have dispatched"); + if (BlockingOperationControl.isBlockingAllowed()) { + throw new RuntimeException("should have dispatched back to event loop"); } return person; } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/test/java/io/quarkus/resteasy/reactive/jaxb/deployment/test/SimpleXmlTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/test/java/io/quarkus/resteasy/reactive/jaxb/deployment/test/SimpleXmlTest.java index 0d9421c9f6767..214d074bad7ff 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/test/java/io/quarkus/resteasy/reactive/jaxb/deployment/test/SimpleXmlTest.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/test/java/io/quarkus/resteasy/reactive/jaxb/deployment/test/SimpleXmlTest.java @@ -262,9 +262,8 @@ public Person getPerson(Person person) { @Produces(MediaType.APPLICATION_XML) @Consumes(MediaType.APPLICATION_XML) public Person personTest(Person person) { - //large requests should get bumped from the IO thread - if (!BlockingOperationControl.isBlockingAllowed()) { - throw new RuntimeException("should have dispatched"); + if (BlockingOperationControl.isBlockingAllowed()) { + throw new RuntimeException("should have dispatched back to event loop"); } return person; } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/SimpleJsonResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/SimpleJsonResource.java index 93e3676ab6115..9d219771a2a53 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/SimpleJsonResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/SimpleJsonResource.java @@ -104,8 +104,8 @@ public List strings(List strings) { @Consumes(MediaType.APPLICATION_JSON) public Person personTest(Person person) { //large requests should get bumped from the IO thread - if (!BlockingOperationControl.isBlockingAllowed()) { - throw new RuntimeException("should have dispatched"); + if (BlockingOperationControl.isBlockingAllowed()) { + throw new RuntimeException("should have dispatched back to event loop"); } return person; } diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java index 7b57c785c0b94..6f52909d69e7d 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java @@ -55,6 +55,11 @@ public void resume() { resume((Executor) null); } + public void resumeOnEventLoop() { + lastExecutor = null; + resume(); + } + public synchronized void resume(Throwable throwable) { resume(throwable, false); } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java index d40bdfbf3ec61..46ebd2f0308b8 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java @@ -286,6 +286,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz, } // form params can be everywhere (field, beanparam, param) boolean checkWithFormReadRequestFilters = false; + boolean inputHandlerEngaged = false; if (method.isFormParamRequired() || hasWithFormReadRequestFilters) { // read the body as multipart in one go handlers.add(new FormBodyHandler(bodyParameter != null, executorSupplier, method.getFileFormNames())); @@ -296,6 +297,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz, // allow the body to be read by chunks handlers.add(new InputHandler(resteasyReactiveConfig.getInputBufferSize(), executorSupplier)); checkWithFormReadRequestFilters = true; + inputHandlerEngaged = true; } } } @@ -324,6 +326,10 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz, } handlers.add(new RequestDeserializeHandler(typeClass, genericType, consumesMediaTypes, serialisers, bodyParameterIndex)); + if (inputHandlerEngaged) { + handlers.add(new NonBlockingHandler()); + } + } // given that we may inject form params in the endpoint we need to make sure we read the body before diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/InputHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/InputHandler.java index 74c78b2d61bc8..976ef27c0865f 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/InputHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/InputHandler.java @@ -25,13 +25,13 @@ public class InputHandler implements ServerRestHandler { final long maxBufferSize; - private volatile Executor executor; - private final Supplier supplier; + private volatile Executor workerExecutor; + private final Supplier workerExecutorSupplier; private final ClassLoader originalTCCL; - public InputHandler(long maxBufferSize, Supplier supplier) { + public InputHandler(long maxBufferSize, Supplier workerExecutorSupplier) { this.maxBufferSize = maxBufferSize; - this.supplier = supplier; + this.workerExecutorSupplier = workerExecutorSupplier; // capture the proper TCCL in order to avoid losing it to Vert.x in dev-mode this.originalTCCL = Thread.currentThread().getContextClassLoader(); @@ -93,8 +93,8 @@ public void data(ByteBuffer event) { data.add(event); if (dataCount > maxBufferSize) { context.serverRequest().pauseRequestInput(); - if (executor == null) { - executor = supplier.get(); + if (workerExecutor == null) { + workerExecutor = workerExecutorSupplier.get(); } //super inefficient //TODO: write a stream that just uses the existing vert.x buffers @@ -107,7 +107,7 @@ public void data(ByteBuffer event) { } //todo timeout context.setInputStream(context.serverRequest().createInputStream(ByteBuffer.wrap(ar))); - context.resume(executor); + context.resume(workerExecutor); } } } diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/BodyPayloadBlockingAllowedTest.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/BodyPayloadBlockingAllowedTest.java new file mode 100644 index 0000000000000..ffbf410ef3493 --- /dev/null +++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/BodyPayloadBlockingAllowedTest.java @@ -0,0 +1,84 @@ +package org.jboss.resteasy.reactive.server.vertx.test; + +import static io.restassured.RestAssured.given; +import static org.hamcrest.CoreMatchers.equalTo; + +import java.util.Base64; + +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; + +import org.jboss.resteasy.reactive.server.core.BlockingOperationSupport; +import org.jboss.resteasy.reactive.server.vertx.test.framework.ResteasyReactiveUnitTest; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.smallrye.mutiny.Uni; + +public class BodyPayloadBlockingAllowedTest { + + @RegisterExtension + static ResteasyReactiveUnitTest test = new ResteasyReactiveUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(TestResource.class)); + + @Test + void testSmallRequestForNonBlocking() { + doTest(5_000, "non-blocking", false); + } + + @Test + void testLargeRequestForNonBlocking() { + doTest(5_000_000, "non-blocking", false); + } + + @Test + void testSmallRequestForBlocking() { + doTest(5_000, "blocking", true); + } + + @Test + void testLargeRequestForBlocking() { + doTest(5_000_000, "blocking", true); + } + + private static void doTest(int size, String path, boolean blockingAllowed) { + given() // + .body(String.format("{\"data\":\"%s\"}", getBase64String(size))) + .header("Content-Type", MediaType.TEXT_PLAIN) + .when().post("/test/" + path) + .then() + .statusCode(200) + .body(equalTo("" + blockingAllowed)); + } + + private static String getBase64String(int size) { + return Base64.getEncoder().encodeToString(new byte[size]); + } + + @Path("test") + public static class TestResource { + + @Path("non-blocking") + @POST + @Consumes(MediaType.TEXT_PLAIN) + @Produces(MediaType.TEXT_PLAIN) + public Uni nonBlocking(String request) { + return Uni.createFrom().item(BlockingOperationSupport::isBlockingAllowed); + } + + @Path("blocking") + @POST + @Consumes(MediaType.TEXT_PLAIN) + @Produces(MediaType.TEXT_PLAIN) + public Boolean blocking(String request) { + return BlockingOperationSupport.isBlockingAllowed(); + } + + } +}