From 0fd365f38d2d5f91f85806b6e650da9302705afa Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Thu, 8 Mar 2018 16:43:58 -0800 Subject: [PATCH] Dropp errors if queue is full --- .../elastic/apm/report/ApmServerReporter.java | 55 +++++++++---- .../ApmServerReporterIntegrationTest.java | 80 ++++++++++++++++++ .../apm/report/ApmServerReporterTest.java | 81 +++++++------------ 3 files changed, 150 insertions(+), 66 deletions(-) create mode 100644 apm-agent-core/src/test/java/co/elastic/apm/report/ApmServerReporterIntegrationTest.java diff --git a/apm-agent-core/src/main/java/co/elastic/apm/report/ApmServerReporter.java b/apm-agent-core/src/main/java/co/elastic/apm/report/ApmServerReporter.java index e948cdbf79..e892a8526f 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/report/ApmServerReporter.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/report/ApmServerReporter.java @@ -1,10 +1,11 @@ package co.elastic.apm.report; import co.elastic.apm.impl.error.ErrorCapture; -import co.elastic.apm.impl.transaction.Transaction; import co.elastic.apm.impl.payload.Process; import co.elastic.apm.impl.payload.Service; import co.elastic.apm.impl.payload.SystemInfo; +import co.elastic.apm.impl.transaction.Transaction; +import co.elastic.apm.objectpool.Recyclable; import co.elastic.apm.util.ExecutorUtils; import co.elastic.apm.util.MathUtils; import com.lmax.disruptor.EventFactory; @@ -54,10 +55,10 @@ public void translateTo(ReportingEvent event, long sequence, ErrorCapture error) }; private final Disruptor disruptor; - private ScheduledThreadPoolExecutor flushScheduler; private final AtomicInteger dropped = new AtomicInteger(); private final boolean dropTransactionIfQueueFull; private final ReportingEventHandler reportingEventHandler; + private ScheduledThreadPoolExecutor flushScheduler; public ApmServerReporter(Service service, Process process, SystemInfo system, PayloadSender payloadSender, boolean dropTransactionIfQueueFull, ReporterConfiguration reporterConfiguration) { @@ -87,14 +88,8 @@ public void run() { @Override public void report(Transaction transaction) { - if (dropTransactionIfQueueFull) { - boolean queueFull = !disruptor.getRingBuffer().tryPublishEvent(TRANSACTION_EVENT_TRANSLATOR, transaction); - if (queueFull) { - dropped.incrementAndGet(); - transaction.recycle(); - } - } else { - disruptor.getRingBuffer().publishEvent(TRANSACTION_EVENT_TRANSLATOR, transaction); + if (!tryAddEventToRingBuffer(transaction, TRANSACTION_EVENT_TRANSLATOR)) { + transaction.recycle(); } } @@ -103,6 +98,14 @@ public int getDropped() { return dropped.get(); } + /** + * Flushes pending {@link ErrorCapture}s and {@link Transaction}s to the APM server. + *

+ * This method may block for a while until a slot in the ring buffer becomes available. + *

+ * + * @return A {@link Future} which resolves when the flush has been executed. + */ @Override public Future flush() { disruptor.publishEvent(FLUSH_EVENT_TRANSLATOR); @@ -125,22 +128,32 @@ public boolean isDone() { @Override public Void get() throws InterruptedException, ExecutionException { - while (disruptor.getSequenceValueFor(reportingEventHandler) < cursor) { + while (!isEventProcessed(cursor)) { Thread.sleep(1); } return null; } + /* + * This might not a very elegant or efficient implementation but it is only intended to be used in tests anyway + */ @Override public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - for (; timeout > 0; timeout--) { + for (; timeout > 0 && !isEventProcessed(cursor); timeout--) { Thread.sleep(1); } + if (!isEventProcessed(cursor)) { + throw new TimeoutException(); + } return null; } }; } + private boolean isEventProcessed(long sequence) { + return disruptor.getSequenceValueFor(reportingEventHandler) >= sequence; + } + @Override public void close() { disruptor.shutdown(); @@ -149,10 +162,24 @@ public void close() { } } - // TODO drop errors when queue is full @Override public void report(ErrorCapture error) { - disruptor.publishEvent(ERROR_EVENT_TRANSLATOR, error); + if (!tryAddEventToRingBuffer(error, ERROR_EVENT_TRANSLATOR)) { + error.recycle(); + } + } + + private boolean tryAddEventToRingBuffer(E event, EventTranslatorOneArg eventTranslator) { + if (dropTransactionIfQueueFull) { + boolean queueFull = !disruptor.getRingBuffer().tryPublishEvent(eventTranslator, event); + if (queueFull) { + dropped.incrementAndGet(); + return false; + } + } else { + disruptor.getRingBuffer().publishEvent(eventTranslator, event); + } + return true; } static class ReportingEvent { diff --git a/apm-agent-core/src/test/java/co/elastic/apm/report/ApmServerReporterIntegrationTest.java b/apm-agent-core/src/test/java/co/elastic/apm/report/ApmServerReporterIntegrationTest.java new file mode 100644 index 0000000000..b51ef81f4d --- /dev/null +++ b/apm-agent-core/src/test/java/co/elastic/apm/report/ApmServerReporterIntegrationTest.java @@ -0,0 +1,80 @@ +package co.elastic.apm.report; + +import co.elastic.apm.impl.error.ErrorCapture; +import co.elastic.apm.impl.transaction.Transaction; +import co.elastic.apm.impl.payload.Process; +import co.elastic.apm.impl.payload.Service; +import co.elastic.apm.impl.payload.SystemInfo; +import co.elastic.apm.report.serialize.JacksonPayloadSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; +import io.undertow.Undertow; +import okhttp3.OkHttpClient; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +class ApmServerReporterIntegrationTest { + + private static Undertow server; + private static int port; + private static AtomicInteger receivedHttpRequests = new AtomicInteger(); + private ApmServerHttpPayloadSender payloadSender; + private ReporterConfiguration reporterConfiguration; + private ApmServerReporter reporter; + + @BeforeAll + static void startServer() { + server = Undertow.builder() + .addHttpListener(0, "127.0.0.1") + .setHandler(exchange -> { + receivedHttpRequests.incrementAndGet(); + exchange.setStatusCode(200).endExchange(); + }).build(); + server.start(); + port = ((InetSocketAddress) server.getListenerInfo().get(0).getAddress()).getPort(); + } + + @AfterAll + static void stopServer() { + server.stop(); + } + + @BeforeEach + void setUp() { + receivedHttpRequests.set(0); + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(new AfterburnerModule()); + reporterConfiguration = spy(new ReporterConfiguration()); + when(reporterConfiguration.getFlushInterval()).thenReturn(-1); + when(reporterConfiguration.getServerUrl()).thenReturn("http://localhost:" + port); + payloadSender = new ApmServerHttpPayloadSender(new OkHttpClient(), new JacksonPayloadSerializer(objectMapper), reporterConfiguration); + SystemInfo system = new SystemInfo("x64", "localhost", "platform"); + reporter = new ApmServerReporter(new Service(), new Process(), system, payloadSender, false, reporterConfiguration); + } + + @Test + void testReportTransaction() throws ExecutionException, InterruptedException { + reporter.report(new Transaction()); + reporter.flush().get(); + assertThat(reporter.getDropped()).isEqualTo(0); + assertThat(receivedHttpRequests.get()).isEqualTo(1); + } + + @Test + void testReportErrorCapture() throws ExecutionException, InterruptedException { + reporter.report(new ErrorCapture()); + reporter.flush().get(); + assertThat(reporter.getDropped()).isEqualTo(0); + assertThat(receivedHttpRequests.get()).isEqualTo(1); + } +} diff --git a/apm-agent-core/src/test/java/co/elastic/apm/report/ApmServerReporterTest.java b/apm-agent-core/src/test/java/co/elastic/apm/report/ApmServerReporterTest.java index f4d0d45969..829dd4cbc1 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/report/ApmServerReporterTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/report/ApmServerReporterTest.java @@ -1,80 +1,57 @@ package co.elastic.apm.report; import co.elastic.apm.impl.error.ErrorCapture; -import co.elastic.apm.impl.transaction.Transaction; import co.elastic.apm.impl.payload.Process; import co.elastic.apm.impl.payload.Service; import co.elastic.apm.impl.payload.SystemInfo; -import co.elastic.apm.report.serialize.JacksonPayloadSerializer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.module.afterburner.AfterburnerModule; -import io.undertow.Undertow; -import okhttp3.OkHttpClient; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import co.elastic.apm.impl.transaction.Transaction; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.net.InetSocketAddress; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class ApmServerReporterTest { - private static Undertow server; - private static int port; - private static AtomicInteger receivedHttpRequests = new AtomicInteger(); - private ApmServerHttpPayloadSender payloadSender; - private ReporterConfiguration reporterConfiguration; private ApmServerReporter reporter; - - @BeforeAll - static void startServer() { - server = Undertow.builder() - .addHttpListener(0, "127.0.0.1") - .setHandler(exchange -> { - receivedHttpRequests.incrementAndGet(); - exchange.setStatusCode(200).endExchange(); - }).build(); - server.start(); - port = ((InetSocketAddress) server.getListenerInfo().get(0).getAddress()).getPort(); - } - - @AfterAll - static void stopServer() { - server.stop(); - } + private PayloadSender payloadSender; @BeforeEach void setUp() { - receivedHttpRequests.set(0); - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.registerModule(new AfterburnerModule()); - reporterConfiguration = spy(new ReporterConfiguration()); + ReporterConfiguration reporterConfiguration = spy(new ReporterConfiguration()); when(reporterConfiguration.getFlushInterval()).thenReturn(-1); - when(reporterConfiguration.getServerUrl()).thenReturn("http://localhost:" + port); - payloadSender = new ApmServerHttpPayloadSender(new OkHttpClient(), new JacksonPayloadSerializer(objectMapper), reporterConfiguration); + when(reporterConfiguration.getMaxQueueSize()).thenReturn(2); SystemInfo system = new SystemInfo("x64", "localhost", "platform"); - reporter = new ApmServerReporter(new Service(), new Process(), system, payloadSender, false, reporterConfiguration); + payloadSender = mock(PayloadSender.class); + reporter = new ApmServerReporter(new Service(), new Process(), system, payloadSender, true, reporterConfiguration); } @Test - void testReportTransaction() throws ExecutionException, InterruptedException { - reporter.report(new Transaction()); - reporter.flush().get(); - assertThat(reporter.getDropped()).isEqualTo(0); - assertThat(receivedHttpRequests.get()).isEqualTo(1); + void testReport_discardTransactions_ifQueueIsFull() { + Transaction transaction = mock(Transaction.class); + // try to report lots of transactions with a tiny queue should lead to dropped events + for (int i = 0; i < 100; i++) { + reporter.report(transaction); + } + assertThat(reporter.getDropped()).isGreaterThan(0); + verify(payloadSender, atLeastOnce()).sendPayload(any()); + verify(transaction, atLeastOnce()).recycle(); } @Test - void testReportErrorCapture() throws ExecutionException, InterruptedException { - reporter.report(new ErrorCapture()); - reporter.flush().get(); - assertThat(reporter.getDropped()).isEqualTo(0); - assertThat(receivedHttpRequests.get()).isEqualTo(1); + void testReport_discardErrors_ifQueueIsFull() { + ErrorCapture error = mock(ErrorCapture.class); + // try to report lots of errors with a tiny queue should lead to dropped events + for (int i = 0; i < 100; i++) { + reporter.report(error); + } + assertThat(reporter.getDropped()).isGreaterThan(0); + verify(payloadSender, atLeastOnce()).sendPayload(any()); + verify(error, atLeastOnce()).recycle(); } }