Skip to content

Commit 3e0f8d6

Browse files
authored
Merge pull request #43248 from michalvavrik/feature/vertx-security-event-loop-blocking-exec-limitations
Don't execute blocking security code serially as it limits concurrent blocking execution to number of the event loops
2 parents 3d9c8a9 + d708b9d commit 3e0f8d6

File tree

2 files changed

+209
-1
lines changed

2 files changed

+209
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
package io.quarkus.vertx.http.security;
2+
3+
import static org.apache.commons.codec.binary.Base64.encodeBase64URLSafeString;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
5+
import static org.junit.jupiter.api.Assertions.assertFalse;
6+
import static org.junit.jupiter.api.Assertions.assertTrue;
7+
8+
import java.net.URL;
9+
import java.nio.charset.StandardCharsets;
10+
import java.time.Duration;
11+
import java.util.concurrent.CountDownLatch;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.concurrent.atomic.AtomicBoolean;
14+
import java.util.function.Function;
15+
import java.util.function.Supplier;
16+
17+
import jakarta.enterprise.context.ApplicationScoped;
18+
import jakarta.inject.Inject;
19+
20+
import org.awaitility.Awaitility;
21+
import org.jboss.shrinkwrap.api.ShrinkWrap;
22+
import org.jboss.shrinkwrap.api.asset.StringAsset;
23+
import org.jboss.shrinkwrap.api.spec.JavaArchive;
24+
import org.junit.jupiter.api.BeforeAll;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.extension.RegisterExtension;
27+
28+
import io.quarkus.logging.Log;
29+
import io.quarkus.security.identity.AuthenticationRequestContext;
30+
import io.quarkus.security.identity.SecurityIdentity;
31+
import io.quarkus.security.identity.SecurityIdentityAugmentor;
32+
import io.quarkus.security.runtime.QuarkusSecurityIdentity;
33+
import io.quarkus.security.test.utils.TestIdentityController;
34+
import io.quarkus.security.test.utils.TestIdentityProvider;
35+
import io.quarkus.test.QuarkusUnitTest;
36+
import io.quarkus.test.common.http.TestHTTPResource;
37+
import io.smallrye.mutiny.Uni;
38+
import io.vertx.core.http.HttpClientOptions;
39+
import io.vertx.core.http.HttpMethod;
40+
import io.vertx.mutiny.core.Vertx;
41+
import io.vertx.mutiny.core.buffer.Buffer;
42+
import io.vertx.mutiny.core.http.HttpClient;
43+
import io.vertx.mutiny.core.http.HttpClientRequest;
44+
45+
/**
46+
* Inspired by https://github.com/quarkusio/quarkus/issues/43217.
47+
* Tests that number of concurrent blocking requests processed
48+
* is not limited by a number of the IO threads.
49+
*/
50+
public class ConcurrentBlockingRequestTest {
51+
52+
private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(10);
53+
private static final String APP_PROPS = """
54+
quarkus.http.auth.permission.auth.paths=/blocker
55+
quarkus.http.auth.permission.auth.policy=root-policy
56+
quarkus.http.auth.policy.root-policy.roles-allowed=root
57+
quarkus.http.auth.permission.auth.paths=/viewer
58+
quarkus.http.auth.permission.auth.policy=viewer-policy
59+
quarkus.http.auth.policy.viewer-policy.roles-allowed=viewer
60+
quarkus.http.io-threads=1
61+
""";
62+
63+
@RegisterExtension
64+
static QuarkusUnitTest test = new QuarkusUnitTest().setArchiveProducer(new Supplier<>() {
65+
@Override
66+
public JavaArchive get() {
67+
return ShrinkWrap.create(JavaArchive.class)
68+
.addClasses(TestIdentityController.class, TestIdentityProvider.class, PathHandler.class,
69+
BlockingAugmentor.class)
70+
.addAsResource(new StringAsset(APP_PROPS), "application.properties");
71+
}
72+
});
73+
74+
@Inject
75+
Vertx vertx;
76+
77+
@TestHTTPResource("/blocker")
78+
URL blockerUri;
79+
80+
@TestHTTPResource("/viewer")
81+
URL viewerUri;
82+
83+
@BeforeAll
84+
public static void setup() {
85+
TestIdentityController.resetRoles()
86+
.add("blocker", "blocker")
87+
.add("test", "test");
88+
}
89+
90+
@Test
91+
public void testConcurrentBlockingExecutionAllowed() {
92+
var httpClient = vertx.createHttpClient(new HttpClientOptions()
93+
.setDefaultHost(blockerUri.getHost())
94+
.setDefaultPort(blockerUri.getPort()));
95+
try {
96+
// first perform blocking request
97+
AtomicBoolean blockerSucceeded = new AtomicBoolean(false);
98+
AtomicBoolean blockerFailed = new AtomicBoolean(false);
99+
httpClient
100+
.request(HttpMethod.GET, blockerUri.getPath())
101+
.map(withBasic("blocker:blocker"))
102+
.flatMap(HttpClientRequest::send)
103+
.subscribe()
104+
.with(resp -> {
105+
if (resp.statusCode() == 200) {
106+
resp.body().map(Buffer::toString).subscribe().with(body -> {
107+
if (body.equals("blocker:/blocker")) {
108+
blockerSucceeded.set(true);
109+
} else {
110+
Log.error(("Request to path '/blocker' failed, expected response body 'blocker:/blocker',"
111+
+ " got: %s").formatted(body));
112+
blockerFailed.set(true);
113+
}
114+
});
115+
} else {
116+
Log.error("Request to path '/blocker' failed, expected response status 200, got: "
117+
+ resp.statusCode());
118+
blockerFailed.set(true);
119+
}
120+
}, err -> {
121+
Log.error("Request to path '/blocker' failed", err);
122+
blockerFailed.set(true);
123+
});
124+
125+
assertFalse(blockerSucceeded::get);
126+
assertFalse(blockerFailed::get);
127+
128+
// anonymous request is denied while blocking is still in progress
129+
int statusCode = requestToViewerPathAndGetStatusCode(httpClient, null);
130+
assertEquals(401, statusCode);
131+
assertFalse(blockerSucceeded::get);
132+
assertFalse(blockerFailed::get);
133+
134+
int concurrentAuthReq = BlockingAugmentor.EXPECTED_AUTHENTICATED_REQUESTS;
135+
do {
136+
assertFalse(blockerSucceeded::get);
137+
assertFalse(blockerFailed::get);
138+
statusCode = requestToViewerPathAndGetStatusCode(httpClient, "test:test");
139+
assertEquals(200, statusCode);
140+
} while (--concurrentAuthReq > 0);
141+
142+
Awaitility.await().untilAsserted(() -> {
143+
assertTrue(blockerSucceeded::get);
144+
assertFalse(blockerFailed::get);
145+
});
146+
} finally {
147+
httpClient.closeAndAwait();
148+
}
149+
}
150+
151+
private int requestToViewerPathAndGetStatusCode(HttpClient httpClient, String credentials) {
152+
var response = httpClient
153+
.request(HttpMethod.GET, viewerUri.getPath())
154+
.map(withBasic(credentials))
155+
.flatMap(HttpClientRequest::send)
156+
.await().atMost(REQUEST_TIMEOUT);
157+
return response.statusCode();
158+
}
159+
160+
private static Function<HttpClientRequest, HttpClientRequest> withBasic(String basic) {
161+
if (basic != null) {
162+
return req -> req.putHeader("Authorization",
163+
"Basic " + encodeBase64URLSafeString(basic.getBytes(StandardCharsets.UTF_8)));
164+
}
165+
return Function.identity();
166+
}
167+
168+
@ApplicationScoped
169+
public static class BlockingAugmentor implements SecurityIdentityAugmentor {
170+
171+
static final int EXPECTED_AUTHENTICATED_REQUESTS = 3;
172+
private final CountDownLatch blockingLatch = new CountDownLatch(EXPECTED_AUTHENTICATED_REQUESTS);
173+
174+
@Override
175+
public Uni<SecurityIdentity> augment(SecurityIdentity securityIdentity, AuthenticationRequestContext ctx) {
176+
if (!securityIdentity.isAnonymous()) {
177+
if ("blocker".equals(securityIdentity.getPrincipal().getName())) {
178+
return ctx.runBlocking(() -> {
179+
try {
180+
Log.info("Waiting for next 3 authenticated requests before continuing");
181+
boolean concurrentRequestsDone = blockingLatch.await(15, TimeUnit.SECONDS);
182+
if (concurrentRequestsDone) {
183+
Log.info("Waiting ended, adding role 'blocker' to SecurityIdentity");
184+
return withRole(securityIdentity, "blocker");
185+
} else {
186+
Log.error("Waiting ended, concurrent authenticated requests were not detected");
187+
return securityIdentity;
188+
}
189+
} catch (InterruptedException e) {
190+
throw new RuntimeException(e);
191+
}
192+
});
193+
} else {
194+
Log.info("Detected authenticated identity, adding role 'viewer'");
195+
blockingLatch.countDown();
196+
return ctx.runBlocking(() -> withRole(securityIdentity, "viewer"));
197+
}
198+
}
199+
Log.info("Detected anonymous identity - no augmentation.");
200+
return Uni.createFrom().item(securityIdentity);
201+
}
202+
203+
private static SecurityIdentity withRole(SecurityIdentity securityIdentity, String role) {
204+
return QuarkusSecurityIdentity.builder(securityIdentity).addRole(role).build();
205+
}
206+
}
207+
208+
}

extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/security/VertxBlockingSecurityExecutor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public Uni<? extends T> get() {
4242
public T call() {
4343
return supplier.get();
4444
}
45-
})
45+
}, false)
4646
.toCompletionStage());
4747
}
4848
}

0 commit comments

Comments
 (0)