Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure large payloads don't affect the invoking thread of a Resource Method #31613

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,8 @@ public List<String> strings(List<String> strings) {
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public Person personTest(Person person) {
//large requests should get bumped from the IO thread
if (!BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched");
if (BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched back to event loop");
}
return person;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,8 @@ public Person getPerson(Person person) {
@Produces(MediaType.APPLICATION_XML)
@Consumes(MediaType.APPLICATION_XML)
public Person personTest(Person person) {
//large requests should get bumped from the IO thread
if (!BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched");
if (BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched back to event loop");
}
return person;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public List<String> strings(List<String> strings) {
@Consumes(MediaType.APPLICATION_JSON)
public Person personTest(Person person) {
//large requests should get bumped from the IO thread
if (!BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched");
if (BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched back to event loop");
}
return person;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
}
// form params can be everywhere (field, beanparam, param)
boolean checkWithFormReadRequestFilters = false;
boolean inputHandlerEngaged = false;
if (method.isFormParamRequired() || hasWithFormReadRequestFilters) {
// read the body as multipart in one go
handlers.add(new FormBodyHandler(bodyParameter != null, executorSupplier, method.getFileFormNames()));
Expand All @@ -296,6 +297,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
// allow the body to be read by chunks
handlers.add(new InputHandler(resteasyReactiveConfig.getInputBufferSize(), executorSupplier));
checkWithFormReadRequestFilters = true;
inputHandlerEngaged = true;
}
}
}
Expand Down Expand Up @@ -324,6 +326,10 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
}
handlers.add(new RequestDeserializeHandler(typeClass, genericType, consumesMediaTypes, serialisers,
bodyParameterIndex));
if (inputHandlerEngaged) {
handlers.add(NonBlockingHandler.INSTANCE);
}

}

// given that we may inject form params in the endpoint we need to make sure we read the body before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
public class InputHandler implements ServerRestHandler {

final long maxBufferSize;
private volatile Executor executor;
private final Supplier<Executor> supplier;
private volatile Executor workerExecutor;
private final Supplier<Executor> workerExecutorSupplier;
private final ClassLoader originalTCCL;

public InputHandler(long maxBufferSize, Supplier<Executor> supplier) {
public InputHandler(long maxBufferSize, Supplier<Executor> workerExecutorSupplier) {
this.maxBufferSize = maxBufferSize;
this.supplier = supplier;
this.workerExecutorSupplier = workerExecutorSupplier;
// capture the proper TCCL in order to avoid losing it to Vert.x in dev-mode
this.originalTCCL = Thread.currentThread().getContextClassLoader();

Expand Down Expand Up @@ -93,8 +93,8 @@ public void data(ByteBuffer event) {
data.add(event);
if (dataCount > maxBufferSize) {
context.serverRequest().pauseRequestInput();
if (executor == null) {
executor = supplier.get();
if (workerExecutor == null) {
workerExecutor = workerExecutorSupplier.get();
}
//super inefficient
//TODO: write a stream that just uses the existing vert.x buffers
Expand All @@ -107,7 +107,7 @@ public void data(ByteBuffer event) {
}
//todo timeout
context.setInputStream(context.serverRequest().createInputStream(ByteBuffer.wrap(ar)));
context.resume(executor);
context.resume(workerExecutor);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.jboss.resteasy.reactive.server.vertx.test;

import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;

import java.util.Base64;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.jboss.resteasy.reactive.server.core.BlockingOperationSupport;
import org.jboss.resteasy.reactive.server.vertx.test.framework.ResteasyReactiveUnitTest;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.smallrye.mutiny.Uni;

public class BodyPayloadBlockingAllowedTest {

@RegisterExtension
static ResteasyReactiveUnitTest test = new ResteasyReactiveUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(TestResource.class));

@Test
void testSmallRequestForNonBlocking() {
doTest(5_000, "non-blocking", false);
}

@Test
void testLargeRequestForNonBlocking() {
doTest(5_000_000, "non-blocking", false);
}

@Test
void testSmallRequestForBlocking() {
doTest(5_000, "blocking", true);
}

@Test
void testLargeRequestForBlocking() {
doTest(5_000_000, "blocking", true);
}

private static void doTest(int size, String path, boolean blockingAllowed) {
given() //
.body(String.format("{\"data\":\"%s\"}", getBase64String(size)))
.header("Content-Type", MediaType.TEXT_PLAIN)
.when().post("/test/" + path)
.then()
.statusCode(200)
.body(equalTo("" + blockingAllowed));
}

private static String getBase64String(int size) {
return Base64.getEncoder().encodeToString(new byte[size]);
}

@Path("test")
public static class TestResource {

@Path("non-blocking")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Uni<Boolean> nonBlocking(String request) {
return Uni.createFrom().item(BlockingOperationSupport::isBlockingAllowed);
}

@Path("blocking")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Boolean blocking(String request) {
return BlockingOperationSupport.isBlockingAllowed();
}

}
}