diff --git a/plugin/trino-http-event-listener/src/main/java/io/trino/plugin/httpquery/HttpEventListener.java b/plugin/trino-http-event-listener/src/main/java/io/trino/plugin/httpquery/HttpEventListener.java index 7dd099990819..fb6a05ace45a 100644 --- a/plugin/trino-http-event-listener/src/main/java/io/trino/plugin/httpquery/HttpEventListener.java +++ b/plugin/trino-http-event-listener/src/main/java/io/trino/plugin/httpquery/HttpEventListener.java @@ -82,7 +82,7 @@ public HttpEventListener(HttpEventListenerConfig config, @ForHttpEventListener H public void queryCreated(QueryCreatedEvent queryCreatedEvent) { if (config.getLogCreated()) { - sendLog(queryCreatedEvent); + sendLog(queryCreatedEvent, queryCreatedEvent.getMetadata().getQueryId()); } } @@ -90,7 +90,7 @@ public void queryCreated(QueryCreatedEvent queryCreatedEvent) public void queryCompleted(QueryCompletedEvent queryCompletedEvent) { if (config.getLogCompleted()) { - sendLog(queryCompletedEvent); + sendLog(queryCompletedEvent, queryCompletedEvent.getMetadata().getQueryId()); } } @@ -98,11 +98,11 @@ public void queryCompleted(QueryCompletedEvent queryCompletedEvent) public void splitCompleted(SplitCompletedEvent splitCompletedEvent) { if (config.getLogSplit()) { - sendLog(splitCompletedEvent); + sendLog(splitCompletedEvent, splitCompletedEvent.getQueryId()); } } - private void sendLog(T event) + private void sendLog(T event, String queryId) { Request request = preparePost() .addHeaders(Multimaps.forMap(config.getHttpHeaders())) @@ -111,10 +111,10 @@ private void sendLog(T event) .setBodyGenerator(out -> objectWriter.writeValue(out, event)) .build(); - attemptToSend(request, 0, Duration.valueOf("0s")); + attemptToSend(request, 0, Duration.valueOf("0s"), queryId); } - private void attemptToSend(Request request, int attempt, Duration delay) + private void attemptToSend(Request request, int attempt, Duration delay, String queryId) { this.executor.schedule( () -> Futures.addCallback(client.executeAsync(request, createStatusResponseHandler()), @@ -124,20 +124,39 @@ public void onSuccess(StatusResponse result) { verify(result != null); - if (result.getStatusCode() >= 500 && attempt < config.getRetryCount()) { - attemptToSend(request, attempt + 1, nextDelay(delay)); + if (!(result.getStatusCode() >= 200 && result.getStatusCode() < 300) && attempt < config.getRetryCount()) { + Duration nextDelay = nextDelay(delay); + int nextAttempt = attempt + 1; + + log.warn("QueryId = \"%s\", attempt = %d/%d, URL = %s | Ingest server responded with code %d, will retry after approximately %d seconds", + queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString(), + result.getStatusCode(), nextDelay.roundTo(TimeUnit.SECONDS)); + + attemptToSend(request, nextAttempt, nextDelay, queryId); return; } - if (!(result.getStatusCode() >= 200 && result.getStatusCode() < 300)) { - log.error("Received status code %d from ingest server URI %s; expecting status 200", result.getStatusCode(), request.getUri()); - } + log.debug("QueryId = \"%s\", attempt = %d/%d, URL = %s | Query event delivered successfully", + queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString()); } @Override public void onFailure(Throwable t) { - log.error("Error sending HTTP request to ingest server with URL %s: %s", request.getUri(), t); + if (attempt < config.getRetryCount()) { + Duration nextDelay = nextDelay(delay); + int nextAttempt = attempt + 1; + + log.warn(t, "QueryId = \"%s\", attempt = %d/%d, URL = %s | Sending event caused an exception, will retry after %d seconds", + queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString(), + nextDelay.roundTo(TimeUnit.SECONDS)); + + attemptToSend(request, nextAttempt, nextDelay, queryId); + return; + } + + log.error(t, "QueryId = \"%s\", attempt = %d/%d, URL = %s | Error sending HTTP request", + queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString()); } }, executor), (long) delay.getValue(), delay.getUnit()); @@ -145,6 +164,10 @@ public void onFailure(Throwable t) private Duration nextDelay(Duration delay) { + if (delay.compareTo(Duration.valueOf("0s")) == 0) { + return config.getRetryDelay(); + } + Duration newDuration = Duration.succinctDuration(delay.getValue(TimeUnit.SECONDS) * this.config.getBackoffBase(), TimeUnit.SECONDS); if (newDuration.compareTo(config.getMaxDelay()) > 0) { return config.getMaxDelay(); diff --git a/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpQueryListener.java b/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpQueryListener.java index 78b8187a652b..8a6262b91426 100644 --- a/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpQueryListener.java +++ b/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpQueryListener.java @@ -33,6 +33,7 @@ import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; +import okhttp3.mockwebserver.SocketPolicy; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -226,7 +227,7 @@ public void testAllLoggingDisabledShouldTimeout() querylogListener.queryCompleted(null); querylogListener.splitCompleted(null); - assertNull(server.takeRequest(1, TimeUnit.SECONDS)); + assertNull(server.takeRequest(5, TimeUnit.SECONDS)); } @Test @@ -245,13 +246,13 @@ public void testAllLoggingEnabledShouldSendCorrectEvent() server.enqueue(new MockResponse().setResponseCode(200)); querylogListener.queryCreated(queryCreatedEvent); - checkRequest(server.takeRequest(1, TimeUnit.SECONDS), queryCreatedEvent); + checkRequest(server.takeRequest(5, TimeUnit.SECONDS), queryCreatedEvent); querylogListener.queryCompleted(queryCompleteEvent); - checkRequest(server.takeRequest(1, TimeUnit.SECONDS), queryCompleteEvent); + checkRequest(server.takeRequest(5, TimeUnit.SECONDS), queryCompleteEvent); querylogListener.splitCompleted(splitCompleteEvent); - checkRequest(server.takeRequest(1, TimeUnit.SECONDS), splitCompleteEvent); + checkRequest(server.takeRequest(5, TimeUnit.SECONDS), splitCompleteEvent); } @Test @@ -267,7 +268,7 @@ public void testContentTypeDefaultHeaderShouldAlwaysBeSet() querylogListener.queryCompleted(queryCompleteEvent); - assertEquals(server.takeRequest(1, TimeUnit.SECONDS).getHeader("Content-Type"), "application/json; charset=utf-8"); + assertEquals(server.takeRequest(5, TimeUnit.SECONDS).getHeader("Content-Type"), "application/json; charset=utf-8"); } public void testHttpHeadersShouldBePresent() @@ -283,7 +284,7 @@ public void testHttpHeadersShouldBePresent() querylogListener.queryCompleted(queryCompleteEvent); - checkRequest(server.takeRequest(1, TimeUnit.SECONDS), new HashMap<>() {{ + checkRequest(server.takeRequest(5, TimeUnit.SECONDS), new HashMap<>() {{ put("Authorization", "Trust Me!"); put("Cache-Control", "no-cache"); }}, queryCompleteEvent); @@ -304,7 +305,7 @@ public void testHttpsEnabledShouldUseTLSv13() }}); querylogListener.queryCompleted(queryCompleteEvent); - RecordedRequest recordedRequest = server.takeRequest(1, TimeUnit.SECONDS); + RecordedRequest recordedRequest = server.takeRequest(5, TimeUnit.SECONDS); assertNotNull(recordedRequest, "Handshake probably failed"); assertEquals(recordedRequest.getTlsVersion().javaName(), "TLSv1.3"); @@ -327,7 +328,7 @@ public void testDifferentCertificatesShouldNotSendRequest() }}); querylogListener.queryCompleted(queryCompleteEvent); - RecordedRequest recordedRequest = server.takeRequest(1, TimeUnit.SECONDS); + RecordedRequest recordedRequest = server.takeRequest(5, TimeUnit.SECONDS); assertNull(recordedRequest, "Handshake should have failed"); } @@ -346,7 +347,7 @@ public void testNoServerCertificateShouldNotSendRequest() }}); querylogListener.queryCompleted(queryCompleteEvent); - RecordedRequest recordedRequest = server.takeRequest(1, TimeUnit.SECONDS); + RecordedRequest recordedRequest = server.takeRequest(5, TimeUnit.SECONDS); assertNull(recordedRequest, "Handshake should have failed"); } @@ -366,12 +367,12 @@ public void testServer500ShouldRetry() querylogListener.queryCompleted(queryCompleteEvent); - assertNotNull(server.takeRequest(1, TimeUnit.SECONDS)); // First request that responds with 500 - checkRequest(server.takeRequest(1, TimeUnit.SECONDS), queryCompleteEvent); // The retry that responds with 200 + assertNotNull(server.takeRequest(5, TimeUnit.SECONDS)); // First request that responds with 500 + checkRequest(server.takeRequest(5, TimeUnit.SECONDS), queryCompleteEvent); // The retry that responds with 200 } @Test - public void testServer400ShouldNotRetry() + public void testServer400ShouldRetry() throws Exception { EventListener querylogListener = factory.create(new HashMap<>(){{ @@ -385,8 +386,27 @@ public void testServer400ShouldNotRetry() querylogListener.queryCompleted(queryCompleteEvent); - assertNotNull(server.takeRequest(1, TimeUnit.SECONDS)); // First request, send back 400 - assertNull(server.takeRequest(1, TimeUnit.SECONDS)); // No more retries + assertNotNull(server.takeRequest(5, TimeUnit.SECONDS)); // First request, send back 400 + checkRequest(server.takeRequest(5, TimeUnit.SECONDS), queryCompleteEvent); // The retry that responds with 200 + } + + @Test + public void testServerDisconnectShouldRetry() + throws Exception + { + EventListener querylogListener = factory.create(new HashMap<>(){{ + put("http-event-listener.connect-ingest-uri", server.url("/").toString()); + put("http-event-listener.log-completed", "true"); + put("http-event-listener.connect-retry-count", "1"); + }}); + + server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + server.enqueue(new MockResponse().setResponseCode(200)); + + querylogListener.queryCompleted(queryCompleteEvent); + + assertNotNull(server.takeRequest(5, TimeUnit.SECONDS)); // First request, causes exception + checkRequest(server.takeRequest(5, TimeUnit.SECONDS), queryCompleteEvent); } @Test