Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SSE limit issue #225

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions deployment/src/main/resources/web-bundler/live-reload.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ function connectToChanges() {
if (retry > 0) {
retry = 0;
// server is back-on, let's reload to get the latest
eventSource.close();
location.reload();
}

console.debug("connected to web-bundler live-reload");
};
eventSource.addEventListener('bundling-error', e => {
eventSource.close();
location.reload();
});
eventSource.addEventListener('change', e => {
Expand All @@ -40,10 +42,11 @@ function connectToChanges() {
}
}
}
eventSource.close();
location.reload();
});

eventSource.onerror = () => {
eventSource.onerror = (e) => {
// Reconnect on error
eventSource.close();
retry++;
Expand All @@ -59,4 +62,14 @@ function connectToChanges() {
};
}

connectToChanges();
fetch(process.env.LIVE_RELOAD_PATH)
.then(response => {
if (response.status === 429) {
return Promise.reject(new Error("There are too many live-reload open connections."));
}
return response;
})
.then(connectToChanges)
.catch(error => {
console.error('Error:', error.message);
});
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

import org.jboss.logging.Logger;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.quarkus.runtime.ShutdownContext;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
Expand All @@ -25,6 +27,7 @@ public class ChangeEventHandler implements Handler<RoutingContext> {
private static final Logger LOGGER = Logger.getLogger(ChangeEventHandler.class);

private static final List<String> IGNORED_SUFFIX = List.of(".map");
public static final String MEDIA_TYPE_TEXT_EVENT_STREAM = "text/event-stream";
private final Map<String, Long> lastModifiedMap;
private final List<Connection> connections = new CopyOnWriteArrayList<>();
private final ClassLoader cl;
Expand All @@ -41,11 +44,18 @@ public ChangeEventHandler(Function<Consumer<Set<String>>, Runnable> registerHand
}

private void onShutdown() {
final ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
unRegisterChangeListener.run();
for (Connection connection : connections) {
closeConnection(connection);
try {
unRegisterChangeListener.run();
for (Connection connection : connections) {
closeConnection(connection);
}
} finally {
Thread.currentThread().setContextClassLoader(oldCl);
}


}

private Map<String, Long> initLastModifiedMap(Set<String> webResources) {
Expand All @@ -72,27 +82,33 @@ private void onChange(Set<String> srcChanges) {
if (!srcChanges.contains("web-bundler/build-success") && !isBundlingError) {
return;
}
final ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
final Changes changes = computeChanges();
try {
final Changes changes = computeChanges();

LOGGER.info(changes);
for (Connection connection : connections) {
if (!connection.closed().get() && !connection.ctx().response().closed()) {
if (isBundlingError) {
connection.ctx.response().write("event: bundling-error\ndata:\n\n");
continue;
}
if (!changes.added.isEmpty() || !changes.removed.isEmpty() || !changes.updated.isEmpty()) {
// Send an initial SSE event to establish the connection
JsonObject eventData = new JsonObject();
eventData.put("added", new JsonArray(changes.added));
eventData.put("removed", new JsonArray(changes.removed));
eventData.put("updated", new JsonArray(changes.updated));
connection.ctx.response().write("event: change\ndata: " + eventData.encode() + "\n\n");
}

LOGGER.info(changes);
for (Connection connection : connections) {
if (!connection.closed().get() && !connection.ctx().response().closed()) {
if (isBundlingError) {
connection.ctx.response().write("event: bundling-error\ndata:\n\n");
continue;
}
if (!changes.added.isEmpty() || !changes.removed.isEmpty() || !changes.updated.isEmpty()) {
// Send an initial SSE event to establish the connection
JsonObject eventData = new JsonObject();
eventData.put("added", new JsonArray(changes.added));
eventData.put("removed", new JsonArray(changes.removed));
eventData.put("updated", new JsonArray(changes.updated));
connection.ctx.response().write("event: change\ndata: " + eventData.encode() + "\n\n");
}

}
} finally {
Thread.currentThread().setContextClassLoader(oldCl);
}

}

private Changes computeChanges() {
Expand Down Expand Up @@ -130,8 +146,22 @@ private Changes computeChanges() {

@Override
public void handle(RoutingContext routingContext) {

if (connections.size() > 2) {
routingContext.response().setStatusCode(HttpResponseStatus.TOO_MANY_REQUESTS.code());
routingContext.response().send();
return;
}

final String header = routingContext.request().getHeader(HttpHeaders.ACCEPT);
if (header == null || !header.equalsIgnoreCase(MEDIA_TYPE_TEXT_EVENT_STREAM)) {
routingContext.response().setStatusCode(HttpResponseStatus.OK.code());
routingContext.response().send();
return;
}

HttpServerResponse response = routingContext.response();
response.putHeader("Content-Type", "text/event-stream");
response.putHeader("Content-Type", MEDIA_TYPE_TEXT_EVENT_STREAM);
response.putHeader("Cache-Control", "no-cache");
response.putHeader("Connection", "keep-alive");
response.setChunked(true);
Expand All @@ -151,7 +181,6 @@ public void handle(RoutingContext routingContext) {
routingContext.request().connection().closeHandler(v -> {
closeConnection(connection);
});

}

private void closeConnection(Connection connection) {
Expand Down
Loading