Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,27 @@ public HttpEventListener(HttpEventListenerConfig config, @ForHttpEventListener H
public void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
if (config.getLogCreated()) {
sendLog(queryCreatedEvent);
sendLog(queryCreatedEvent, queryCreatedEvent.getMetadata().getQueryId());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit messge Add more logging the event listener -> Add more logging in the HTTP event listener

}
}

@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
if (config.getLogCompleted()) {
sendLog(queryCompletedEvent);
sendLog(queryCompletedEvent, queryCompletedEvent.getMetadata().getQueryId());
}
}

@Override
public void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
if (config.getLogSplit()) {
sendLog(splitCompletedEvent);
sendLog(splitCompletedEvent, splitCompletedEvent.getQueryId());
}
}

private <T> void sendLog(T event)
private <T> void sendLog(T event, String queryId)
{
Request request = preparePost()
.addHeaders(Multimaps.forMap(config.getHttpHeaders()))
Expand All @@ -111,10 +111,10 @@ private <T> void sendLog(T event)
.setBodyGenerator(out -> objectWriter.writeValue(out, event))
.build();

attemptToSend(request, 0, Duration.valueOf("0s"));
attemptToSend(request, 0, Duration.valueOf("0s"), queryId);
}

private void attemptToSend(Request request, int attempt, Duration delay)
private void attemptToSend(Request request, int attempt, Duration delay, String queryId)
{
this.executor.schedule(
() -> Futures.addCallback(client.executeAsync(request, createStatusResponseHandler()),
Expand All @@ -124,27 +124,50 @@ public void onSuccess(StatusResponse result)
{
verify(result != null);

if (result.getStatusCode() >= 500 && attempt < config.getRetryCount()) {
attemptToSend(request, attempt + 1, nextDelay(delay));
if (!(result.getStatusCode() >= 200 && result.getStatusCode() < 300) && attempt < config.getRetryCount()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how would retrying any 4xx error ever succeed? Wouldn't we end up retrying until retry attempts are exhausted?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we would retry until attempts are exhausted.

Copy link
Copy Markdown
Member

@hashhar hashhar Jan 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems wasteful - specially since the event listener is synchronous and retrying a 4xx error seems guaranteed to fail unless for maybe HTTP 429.

Thanks for clarifying. I believe the intent is to use this mechanism to identify the scenarios we don't handle well currently and then fix them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it has its use-cases (which admittedly are edge-cases) and it's better to be safe than drop data. This doesn't run on the query execution threads so no direct slowdown.

Duration nextDelay = nextDelay(delay);
int nextAttempt = attempt + 1;

log.warn("QueryId = \"%s\", attempt = %d/%d, URL = %s | Ingest server responded with code %d, will retry after approximately %d seconds",
queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString(),
result.getStatusCode(), nextDelay.roundTo(TimeUnit.SECONDS));

attemptToSend(request, nextAttempt, nextDelay, queryId);
return;
}

if (!(result.getStatusCode() >= 200 && result.getStatusCode() < 300)) {
log.error("Received status code %d from ingest server URI %s; expecting status 200", result.getStatusCode(), request.getUri());
}
log.debug("QueryId = \"%s\", attempt = %d/%d, URL = %s | Query event delivered successfully",
queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString());
}

@Override
public void onFailure(Throwable t)
{
log.error("Error sending HTTP request to ingest server with URL %s: %s", request.getUri(), t);
if (attempt < config.getRetryCount()) {
Duration nextDelay = nextDelay(delay);
int nextAttempt = attempt + 1;

log.warn(t, "QueryId = \"%s\", attempt = %d/%d, URL = %s | Sending event caused an exception, will retry after %d seconds",
queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString(),
nextDelay.roundTo(TimeUnit.SECONDS));

attemptToSend(request, nextAttempt, nextDelay, queryId);
return;
}

log.error(t, "QueryId = \"%s\", attempt = %d/%d, URL = %s | Error sending HTTP request",
queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString());
}
}, executor),
(long) delay.getValue(), delay.getUnit());
}

private Duration nextDelay(Duration delay)
{
if (delay.compareTo(Duration.valueOf("0s")) == 0) {
return config.getRetryDelay();
}

Duration newDuration = Duration.succinctDuration(delay.getValue(TimeUnit.SECONDS) * this.config.getBackoffBase(), TimeUnit.SECONDS);
if (newDuration.compareTo(config.getMaxDelay()) > 0) {
return config.getMaxDelay();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import okhttp3.mockwebserver.SocketPolicy;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -226,7 +227,7 @@ public void testAllLoggingDisabledShouldTimeout()
querylogListener.queryCompleted(null);
querylogListener.splitCompleted(null);

assertNull(server.takeRequest(1, TimeUnit.SECONDS));
assertNull(server.takeRequest(5, TimeUnit.SECONDS));
}

@Test
Expand All @@ -245,13 +246,13 @@ public void testAllLoggingEnabledShouldSendCorrectEvent()
server.enqueue(new MockResponse().setResponseCode(200));

querylogListener.queryCreated(queryCreatedEvent);
checkRequest(server.takeRequest(1, TimeUnit.SECONDS), queryCreatedEvent);
checkRequest(server.takeRequest(5, TimeUnit.SECONDS), queryCreatedEvent);

querylogListener.queryCompleted(queryCompleteEvent);
checkRequest(server.takeRequest(1, TimeUnit.SECONDS), queryCompleteEvent);
checkRequest(server.takeRequest(5, TimeUnit.SECONDS), queryCompleteEvent);

querylogListener.splitCompleted(splitCompleteEvent);
checkRequest(server.takeRequest(1, TimeUnit.SECONDS), splitCompleteEvent);
checkRequest(server.takeRequest(5, TimeUnit.SECONDS), splitCompleteEvent);
}

@Test
Expand All @@ -267,7 +268,7 @@ public void testContentTypeDefaultHeaderShouldAlwaysBeSet()

querylogListener.queryCompleted(queryCompleteEvent);

assertEquals(server.takeRequest(1, TimeUnit.SECONDS).getHeader("Content-Type"), "application/json; charset=utf-8");
assertEquals(server.takeRequest(5, TimeUnit.SECONDS).getHeader("Content-Type"), "application/json; charset=utf-8");
}

public void testHttpHeadersShouldBePresent()
Expand All @@ -283,7 +284,7 @@ public void testHttpHeadersShouldBePresent()

querylogListener.queryCompleted(queryCompleteEvent);

checkRequest(server.takeRequest(1, TimeUnit.SECONDS), new HashMap<>() {{
checkRequest(server.takeRequest(5, TimeUnit.SECONDS), new HashMap<>() {{
put("Authorization", "Trust Me!");
put("Cache-Control", "no-cache");
}}, queryCompleteEvent);
Expand All @@ -304,7 +305,7 @@ public void testHttpsEnabledShouldUseTLSv13()
}});
querylogListener.queryCompleted(queryCompleteEvent);

RecordedRequest recordedRequest = server.takeRequest(1, TimeUnit.SECONDS);
RecordedRequest recordedRequest = server.takeRequest(5, TimeUnit.SECONDS);

assertNotNull(recordedRequest, "Handshake probably failed");
assertEquals(recordedRequest.getTlsVersion().javaName(), "TLSv1.3");
Expand All @@ -327,7 +328,7 @@ public void testDifferentCertificatesShouldNotSendRequest()
}});
querylogListener.queryCompleted(queryCompleteEvent);

RecordedRequest recordedRequest = server.takeRequest(1, TimeUnit.SECONDS);
RecordedRequest recordedRequest = server.takeRequest(5, TimeUnit.SECONDS);

assertNull(recordedRequest, "Handshake should have failed");
}
Expand All @@ -346,7 +347,7 @@ public void testNoServerCertificateShouldNotSendRequest()
}});
querylogListener.queryCompleted(queryCompleteEvent);

RecordedRequest recordedRequest = server.takeRequest(1, TimeUnit.SECONDS);
RecordedRequest recordedRequest = server.takeRequest(5, TimeUnit.SECONDS);

assertNull(recordedRequest, "Handshake should have failed");
}
Expand All @@ -366,12 +367,12 @@ public void testServer500ShouldRetry()

querylogListener.queryCompleted(queryCompleteEvent);

assertNotNull(server.takeRequest(1, TimeUnit.SECONDS)); // First request that responds with 500
checkRequest(server.takeRequest(1, TimeUnit.SECONDS), queryCompleteEvent); // The retry that responds with 200
assertNotNull(server.takeRequest(5, TimeUnit.SECONDS)); // First request that responds with 500
checkRequest(server.takeRequest(5, TimeUnit.SECONDS), queryCompleteEvent); // The retry that responds with 200
}

@Test
public void testServer400ShouldNotRetry()
public void testServer400ShouldRetry()
throws Exception
{
EventListener querylogListener = factory.create(new HashMap<>(){{
Expand All @@ -385,8 +386,27 @@ public void testServer400ShouldNotRetry()

querylogListener.queryCompleted(queryCompleteEvent);

assertNotNull(server.takeRequest(1, TimeUnit.SECONDS)); // First request, send back 400
assertNull(server.takeRequest(1, TimeUnit.SECONDS)); // No more retries
assertNotNull(server.takeRequest(5, TimeUnit.SECONDS)); // First request, send back 400
checkRequest(server.takeRequest(5, TimeUnit.SECONDS), queryCompleteEvent); // The retry that responds with 200
}

@Test
public void testServerDisconnectShouldRetry()
throws Exception
{
EventListener querylogListener = factory.create(new HashMap<>(){{
put("http-event-listener.connect-ingest-uri", server.url("/").toString());
put("http-event-listener.log-completed", "true");
put("http-event-listener.connect-retry-count", "1");
}});

server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START));
server.enqueue(new MockResponse().setResponseCode(200));

querylogListener.queryCompleted(queryCompleteEvent);

assertNotNull(server.takeRequest(5, TimeUnit.SECONDS)); // First request, causes exception
checkRequest(server.takeRequest(5, TimeUnit.SECONDS), queryCompleteEvent);
}

@Test
Expand Down