Skip to content

Commit

Permalink
Merge pull request #31613 from geoand/#31606
Browse files Browse the repository at this point in the history
Ensure large payloads don't affect the invoking thread of a Resource Method
  • Loading branch information
geoand authored Mar 7, 2023
2 parents 89575ea + ade65b3 commit ff27a2f
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,8 @@ public List<String> strings(List<String> strings) {
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public Person personTest(Person person) {
//large requests should get bumped from the IO thread
if (!BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched");
if (BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched back to event loop");
}
return person;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,8 @@ public Person getPerson(Person person) {
@Produces(MediaType.APPLICATION_XML)
@Consumes(MediaType.APPLICATION_XML)
public Person personTest(Person person) {
//large requests should get bumped from the IO thread
if (!BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched");
if (BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched back to event loop");
}
return person;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public List<String> strings(List<String> strings) {
@Consumes(MediaType.APPLICATION_JSON)
public Person personTest(Person person) {
//large requests should get bumped from the IO thread
if (!BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched");
if (BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched back to event loop");
}
return person;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
}
// form params can be everywhere (field, beanparam, param)
boolean checkWithFormReadRequestFilters = false;
boolean inputHandlerEngaged = false;
if (method.isFormParamRequired() || hasWithFormReadRequestFilters) {
// read the body as multipart in one go
handlers.add(new FormBodyHandler(bodyParameter != null, executorSupplier, method.getFileFormNames()));
Expand All @@ -296,6 +297,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
// allow the body to be read by chunks
handlers.add(new InputHandler(resteasyReactiveConfig.getInputBufferSize(), executorSupplier));
checkWithFormReadRequestFilters = true;
inputHandlerEngaged = true;
}
}
}
Expand Down Expand Up @@ -324,6 +326,10 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
}
handlers.add(new RequestDeserializeHandler(typeClass, genericType, consumesMediaTypes, serialisers,
bodyParameterIndex));
if (inputHandlerEngaged) {
handlers.add(NonBlockingHandler.INSTANCE);
}

}

// given that we may inject form params in the endpoint we need to make sure we read the body before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
public class InputHandler implements ServerRestHandler {

final long maxBufferSize;
private volatile Executor executor;
private final Supplier<Executor> supplier;
private volatile Executor workerExecutor;
private final Supplier<Executor> workerExecutorSupplier;
private final ClassLoader originalTCCL;

public InputHandler(long maxBufferSize, Supplier<Executor> supplier) {
public InputHandler(long maxBufferSize, Supplier<Executor> workerExecutorSupplier) {
this.maxBufferSize = maxBufferSize;
this.supplier = supplier;
this.workerExecutorSupplier = workerExecutorSupplier;
// capture the proper TCCL in order to avoid losing it to Vert.x in dev-mode
this.originalTCCL = Thread.currentThread().getContextClassLoader();

Expand Down Expand Up @@ -93,8 +93,8 @@ public void data(ByteBuffer event) {
data.add(event);
if (dataCount > maxBufferSize) {
context.serverRequest().pauseRequestInput();
if (executor == null) {
executor = supplier.get();
if (workerExecutor == null) {
workerExecutor = workerExecutorSupplier.get();
}
//super inefficient
//TODO: write a stream that just uses the existing vert.x buffers
Expand All @@ -107,7 +107,7 @@ public void data(ByteBuffer event) {
}
//todo timeout
context.setInputStream(context.serverRequest().createInputStream(ByteBuffer.wrap(ar)));
context.resume(executor);
context.resume(workerExecutor);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.jboss.resteasy.reactive.server.vertx.test;

import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;

import java.util.Base64;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.jboss.resteasy.reactive.server.core.BlockingOperationSupport;
import org.jboss.resteasy.reactive.server.vertx.test.framework.ResteasyReactiveUnitTest;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.smallrye.mutiny.Uni;

public class BodyPayloadBlockingAllowedTest {

@RegisterExtension
static ResteasyReactiveUnitTest test = new ResteasyReactiveUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(TestResource.class));

@Test
void testSmallRequestForNonBlocking() {
doTest(5_000, "non-blocking", false);
}

@Test
void testLargeRequestForNonBlocking() {
doTest(5_000_000, "non-blocking", false);
}

@Test
void testSmallRequestForBlocking() {
doTest(5_000, "blocking", true);
}

@Test
void testLargeRequestForBlocking() {
doTest(5_000_000, "blocking", true);
}

private static void doTest(int size, String path, boolean blockingAllowed) {
given() //
.body(String.format("{\"data\":\"%s\"}", getBase64String(size)))
.header("Content-Type", MediaType.TEXT_PLAIN)
.when().post("/test/" + path)
.then()
.statusCode(200)
.body(equalTo("" + blockingAllowed));
}

private static String getBase64String(int size) {
return Base64.getEncoder().encodeToString(new byte[size]);
}

@Path("test")
public static class TestResource {

@Path("non-blocking")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Uni<Boolean> nonBlocking(String request) {
return Uni.createFrom().item(BlockingOperationSupport::isBlockingAllowed);
}

@Path("blocking")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Boolean blocking(String request) {
return BlockingOperationSupport.isBlockingAllowed();
}

}
}

0 comments on commit ff27a2f

Please sign in to comment.