-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closing a context should interrupt pending virtual thread tasks of this context #5339
Conversation
dc2610b
to
c27b685
Compare
a26cf72
to
d2c6248
Compare
Why not simplify this by creating a VirtualThreadExecutor per context and close it there more simply. Just need to reuse the virtualThreadFactory. Because alot of the shutdown/suspension logic can be reused from the VirtualThreadExecutor |
yes maybe that would be an idea too, I'll ahve a look tomorrow again at this |
b321135
to
86b3b8b
Compare
I think one issue I found @zekronium is that when close the virtual thread executor instead will unlatch all virtual thread simultaneously in the same vertx context which is not allowed. The current approach can let us unlatch threads one by one and join them to ensure two asks are not executed at the same time, this looks cleaner to me. e.g. here is a test that should pass: @Test
public void testContextCloseContextSerialization() throws Exception {
int num = 4;
Assume.assumeTrue(isVirtualThreadAvailable());
ContextInternal ctx = vertx.createVirtualThreadContext();
Thread[] threads = new Thread[num];
List<Promise<Void>> promises = IntStream.range(0, num).mapToObj(idx -> Promise.<Void>promise()).collect(Collectors.toList());
Deque<CyclicBarrier> latches = new ConcurrentLinkedDeque<>();
CyclicBarrier[] l = new CyclicBarrier[num];
AtomicInteger count = new AtomicInteger();
for (int i = 0;i < num;i++) {
int idx = i;
CyclicBarrier latch = new CyclicBarrier(2);
l[i] = latch;
latches.add(latch);
ctx.runOnContext(v -> {
threads[idx] = Thread.currentThread();
try {
promises.get(idx).future().await();
fail();
} catch (Exception e) {
assertTrue(e instanceof InterruptedException);
CyclicBarrier barrier = latches.removeFirst();
int val = count.addAndGet(1);
assertTrue(val == 1);
try {
barrier.await();
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
count.decrementAndGet();
}
}
});
}
assertWaitUntil(() -> {
for (Thread thread : threads) {
if (thread == null || thread.getState() != Thread.State.WAITING) {
return false;
}
}
return true;
});
Future<Void> f = ctx.closeFuture().close();
for (int i = 0;i < num;i++) {
try {
l[i].await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
f.await();
} |
…tions in VertxImpl
0f420d1
to
85d10a1
Compare
…tate. Motivation: The TaskQueue does not implement closeability, since now such queue can hold many suspended virtual thread tasks, we should provide a way to deal those threads (interrupt) when the context holding the queue is closed, e.g. undeploying a verticle or closing a vertx instance. Changes: Implement a TaskQueue close method that returns the list of current thread being suspended. The context holding the queue can close the queue when the context close future is destroyed.
9e405cd
to
50733de
Compare
I was under the assumption that TaskQueue wont allow that, but you are totally right. Maybe this was some of the leaking I saw when I was implementing an interrupt approach on close too. The solution I had thought was uppon interrupt on await() I would requeue the task to the executor, which would be gated by task queue, but that was not very clean and hacky Is this your final solution? I will test if it is |
@zekronium please have a look at #5344 which is the port to 4.x and is the best solutution I think (I will port the 4.x improvements to this branch of course). |
@vietj I am encoutering the same issue I had myself when trying to shutdown the taskQueue itself with rejection. Upon shutdown or close of the context, there might be alot of events and promises fired and some of them are off context (We are in Virtual Thread land after all) and because the TaskQueue/Executor is closed, some of the events can not fire Example with HTTP2:
|
if the verticle is undeployed, it makes sense that the related tasks cannot be executed, otherwise undeplying might take forever ? perhaps we could add a grace period and let the queue execute tasks for a while until a timeout decides ? |
can you provide a reproduce for this specific cases ? |
I'll have a look at timers |
it is weird to see the timer affected, because timers are executed on the event-loop and then transferred to context |
I think the issue happens due to queueWrite in the connection. It always gets queued up, then it fails to write because the connection is closed, then it fails the promise, but to fail the promise it has to do context.emit/runOnContext which thats where the rejection exception is thrown, because it can not callback to the virtual thread. Thats the example at least in http2 i sent above. My guess is because it does not expect the callback to fail, if it does, some cleanup steps dont run. |
Just do a virtual thread that loops requests in a loop with sleep |
do you mean a verticle that performs HTTP client requests and then does thread sleep without vertx await , that is undeployed ? that being said if there is no await how can it get a response ? it is not clear |
I tried this and did not work, because who prevents a user to run some other virtual thread code in the callback which in reality should not run, like some other loop that should of been stopped. The best idea I had was to ALLOW internal events always and only suspend/interrupt the users virtual threads on promises and callbacks thwt the user wrote. Meaning any internal vertx callbacks, runOnContext would have to be wrapped in a different Task object type that encodes that its an internal event and is always allowed to run. Or have two types of task queue and one thats always allowed to run for internal events. The first solution seems cleaner but this approach of effectively coloring the callbacks does not sound so nice in general. EDIT: i think it would make more sense to wrap the user events from virtual threads as suspendable and leave everything event loop as is, reverse of what I said |
No, its always awaiting. Lets say while(contextNotClosed) get every 5000ms. Everything awaited, effectively synchronous code |
have you tried closing the server in undeploy explicitely and complete undeploy when the server has stopped ? |
it is still unclear, can you show some code ? |
A primitive example. I usually await without Thread.sleep but using actual vertx mechanisms.
Our application that uses virtual threads uses the client mostly. If this behavior is also visible on the server as well I am not sure. |
thanks for the code.
can you try closing the client in verticle stop method ?
…On Wed, Oct 9, 2024 at 6:05 PM zekronium ***@***.***> wrote:
do you mean a verticle that performs HTTP client requests and then does
thread sleep without vertx await , that is undeployed ? that being said if
there is no await how can it get a response ? it is not clear
No, its always awaiting. Lets say while(contextNotClosed) get every
5000ms. Everything awaited, effectively synchronous code
it is still unclear, can you show some code ?
A primitive example. I usually await without Thread.sleep but using actual
vertx mechanisms.
private void test() {
while (this.active()) {
var resp = await(client.getAbs("https://google.com").send());
await(Future.future(p -> this.vertx.setTimer(5000, t -> p.complete())));
}
}
have you tried closing the server in undeploy explicitely and complete
undeploy when the server has stopped ?
Our application that uses virtual threads uses the client mostly. If this
behavior is also visible on the server as well I am not sure.
—
Reply to this email directly, view it on GitHub
<#5339 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AABXDCXD6JQCGR4AHJUKC7DZ2VH4FAVCNFSM6AAAAABPH2SWZOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDIMBSG4ZTOOJYHA>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
can you provide a full reproducer ? what you shown will simply interrupt the thread when the verticle is undeployed since there are no pending requests, here is what I did: @Test
public void testRepro) throws Exception {
vertx.createHttpServer().requestHandler(req -> {
vertx.setTimer(100, id -> {
req.response().end();
});
}).listen(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
vertx.deployVerticle(new AbstractVerticle() {
volatile boolean active;
HttpClient client;
@Override
public void stop(Promise<Void> stopPromise) throws Exception {
active = false;
super.stop(stopPromise);
}
@Override
public void start() throws Exception {
active = true;
client = vertx.createHttpClient();
vertx.runOnContext(v -> test());
vertx.setTimer(2000, id -> {
});
}
private void test() {
while (this.active) {
Future<Buffer> fut = client.request(HttpMethod.GET, HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/")
.compose(req -> req
.send()
.compose(resp -> resp.body()));
Future.await(fut);
System.out.println("got resp");
Future.await(Future.future(p -> this.vertx.setTimer(150, t -> p.complete())));
}
}
}, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD))
.onComplete(onSuccess(id -> {
vertx.setTimer(4000, timerID -> {
System.out.println("Undeploying");
vertx.undeploy(id);
});
}));
await();
} |
I wrote this simple test that closes the HTTP client. @Test
public void testDeployHTTPClient() throws Exception {
Assume.assumeTrue(isVirtualThreadAvailable());
AtomicInteger inflight = new AtomicInteger();
vertx.createHttpServer().requestHandler(request -> {
inflight.incrementAndGet();
}).listen(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST);
int numReq = 10;
Set<Thread> threads = Collections.synchronizedSet(new HashSet<>());
Set<Thread> interruptedThreads = Collections.synchronizedSet(new HashSet<>());
String deploymentID = vertx.deployVerticle(new VerticleBase() {
HttpClient client;
@Override
public Future<?> start() throws Exception {
client = vertx.createHttpClient(new PoolOptions().setHttp1MaxSize(numReq));
for (int i = 0;i < numReq;i++) {
vertx.runOnContext(v -> {
threads.add(Thread.currentThread());
try {
HttpClientResponse response = client
.request(HttpMethod.GET, HttpTestBase.DEFAULT_HTTP_PORT, "localhost", "/")
.compose(HttpClientRequest::send)
.await();
} catch (Throwable e) {
if (e instanceof InterruptedException) {
interruptedThreads.add(Thread.currentThread());
}
}
});
}
return super.start();
}
@Override
public Future<?> stop() throws Exception {
return client.close();
}
}, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD))
.await();
assertWaitUntil(() -> inflight.get() == numReq);
vertx.undeploy(deploymentID).await();
assertEquals(threads, interruptedThreads);
} When the stop method is commented, indeed I can see such exceptions
But it makes sense to me, the client is not closed and tries to resume its work which cannot be done. It would be the same with a worker verticle using a named worker pool, we would also get rejected tasks as well. When the verticle closes the client, then the client is closed and the thread are interrupted, no exception is logged. As for the timer lingering there might be a bug there that would be different. |
In my tests I let it loop then close the verticle and I get the same exception. If the client is closed, it normally throws illegalStateException, where client close state is checked before sending the request(somehwhere in send), so why rejected? Also how does it try to resume work if the threads are interrupted/suspended |
As for the timer, I am confused myself what happens |
No description provided.