Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package co.elastic.apm.report;

import co.elastic.apm.impl.error.ErrorCapture;
import co.elastic.apm.impl.transaction.Transaction;
import co.elastic.apm.impl.payload.Process;
import co.elastic.apm.impl.payload.Service;
import co.elastic.apm.impl.payload.SystemInfo;
import co.elastic.apm.impl.transaction.Transaction;
import co.elastic.apm.objectpool.Recyclable;
import co.elastic.apm.util.ExecutorUtils;
import co.elastic.apm.util.MathUtils;
import com.lmax.disruptor.EventFactory;
Expand Down Expand Up @@ -54,10 +55,10 @@ public void translateTo(ReportingEvent event, long sequence, ErrorCapture error)
};

private final Disruptor<ReportingEvent> disruptor;
private ScheduledThreadPoolExecutor flushScheduler;
private final AtomicInteger dropped = new AtomicInteger();
private final boolean dropTransactionIfQueueFull;
private final ReportingEventHandler reportingEventHandler;
private ScheduledThreadPoolExecutor flushScheduler;

public ApmServerReporter(Service service, Process process, SystemInfo system, PayloadSender payloadSender,
boolean dropTransactionIfQueueFull, ReporterConfiguration reporterConfiguration) {
Expand Down Expand Up @@ -87,14 +88,8 @@ public void run() {

@Override
public void report(Transaction transaction) {
if (dropTransactionIfQueueFull) {
boolean queueFull = !disruptor.getRingBuffer().tryPublishEvent(TRANSACTION_EVENT_TRANSLATOR, transaction);
if (queueFull) {
dropped.incrementAndGet();
transaction.recycle();
}
} else {
disruptor.getRingBuffer().publishEvent(TRANSACTION_EVENT_TRANSLATOR, transaction);
if (!tryAddEventToRingBuffer(transaction, TRANSACTION_EVENT_TRANSLATOR)) {
transaction.recycle();
}
}

Expand All @@ -103,6 +98,14 @@ public int getDropped() {
return dropped.get();
}

/**
* Flushes pending {@link ErrorCapture}s and {@link Transaction}s to the APM server.
* <p>
* This method may block for a while until a slot in the ring buffer becomes available.
* </p>
*
* @return A {@link Future} which resolves when the flush has been executed.
*/
@Override
public Future<Void> flush() {
disruptor.publishEvent(FLUSH_EVENT_TRANSLATOR);
Expand All @@ -125,22 +128,32 @@ public boolean isDone() {

@Override
public Void get() throws InterruptedException, ExecutionException {
while (disruptor.getSequenceValueFor(reportingEventHandler) < cursor) {
while (!isEventProcessed(cursor)) {
Thread.sleep(1);
}
return null;
}

/*
* This might not a very elegant or efficient implementation but it is only intended to be used in tests anyway
*/
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
for (; timeout > 0; timeout--) {
for (; timeout > 0 && !isEventProcessed(cursor); timeout--) {
Thread.sleep(1);
}
if (!isEventProcessed(cursor)) {
throw new TimeoutException();
}
return null;
}
};
}

private boolean isEventProcessed(long sequence) {
return disruptor.getSequenceValueFor(reportingEventHandler) >= sequence;
}

@Override
public void close() {
disruptor.shutdown();
Expand All @@ -149,10 +162,24 @@ public void close() {
}
}

// TODO drop errors when queue is full
@Override
public void report(ErrorCapture error) {
disruptor.publishEvent(ERROR_EVENT_TRANSLATOR, error);
if (!tryAddEventToRingBuffer(error, ERROR_EVENT_TRANSLATOR)) {
error.recycle();
}
}

private <E extends Recyclable> boolean tryAddEventToRingBuffer(E event, EventTranslatorOneArg<ReportingEvent, E> eventTranslator) {
if (dropTransactionIfQueueFull) {
boolean queueFull = !disruptor.getRingBuffer().tryPublishEvent(eventTranslator, event);
if (queueFull) {
dropped.incrementAndGet();
return false;
}
} else {
disruptor.getRingBuffer().publishEvent(eventTranslator, event);
}
return true;
}

static class ReportingEvent {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package co.elastic.apm.report;

import co.elastic.apm.impl.error.ErrorCapture;
import co.elastic.apm.impl.transaction.Transaction;
import co.elastic.apm.impl.payload.Process;
import co.elastic.apm.impl.payload.Service;
import co.elastic.apm.impl.payload.SystemInfo;
import co.elastic.apm.report.serialize.JacksonPayloadSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import io.undertow.Undertow;
import okhttp3.OkHttpClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

class ApmServerReporterIntegrationTest {

private static Undertow server;
private static int port;
private static AtomicInteger receivedHttpRequests = new AtomicInteger();
private ApmServerHttpPayloadSender payloadSender;
private ReporterConfiguration reporterConfiguration;
private ApmServerReporter reporter;

@BeforeAll
static void startServer() {
server = Undertow.builder()
.addHttpListener(0, "127.0.0.1")
.setHandler(exchange -> {
receivedHttpRequests.incrementAndGet();
exchange.setStatusCode(200).endExchange();
}).build();
server.start();
port = ((InetSocketAddress) server.getListenerInfo().get(0).getAddress()).getPort();
}

@AfterAll
static void stopServer() {
server.stop();
}

@BeforeEach
void setUp() {
receivedHttpRequests.set(0);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new AfterburnerModule());
reporterConfiguration = spy(new ReporterConfiguration());
when(reporterConfiguration.getFlushInterval()).thenReturn(-1);
when(reporterConfiguration.getServerUrl()).thenReturn("http://localhost:" + port);
payloadSender = new ApmServerHttpPayloadSender(new OkHttpClient(), new JacksonPayloadSerializer(objectMapper), reporterConfiguration);
SystemInfo system = new SystemInfo("x64", "localhost", "platform");
reporter = new ApmServerReporter(new Service(), new Process(), system, payloadSender, false, reporterConfiguration);
}

@Test
void testReportTransaction() throws ExecutionException, InterruptedException {
reporter.report(new Transaction());
reporter.flush().get();
assertThat(reporter.getDropped()).isEqualTo(0);
assertThat(receivedHttpRequests.get()).isEqualTo(1);
}

@Test
void testReportErrorCapture() throws ExecutionException, InterruptedException {
reporter.report(new ErrorCapture());
reporter.flush().get();
assertThat(reporter.getDropped()).isEqualTo(0);
assertThat(receivedHttpRequests.get()).isEqualTo(1);
}
}
Original file line number Diff line number Diff line change
@@ -1,80 +1,57 @@
package co.elastic.apm.report;

import co.elastic.apm.impl.error.ErrorCapture;
import co.elastic.apm.impl.transaction.Transaction;
import co.elastic.apm.impl.payload.Process;
import co.elastic.apm.impl.payload.Service;
import co.elastic.apm.impl.payload.SystemInfo;
import co.elastic.apm.report.serialize.JacksonPayloadSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import io.undertow.Undertow;
import okhttp3.OkHttpClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import co.elastic.apm.impl.transaction.Transaction;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class ApmServerReporterTest {

private static Undertow server;
private static int port;
private static AtomicInteger receivedHttpRequests = new AtomicInteger();
private ApmServerHttpPayloadSender payloadSender;
private ReporterConfiguration reporterConfiguration;
private ApmServerReporter reporter;

@BeforeAll
static void startServer() {
server = Undertow.builder()
.addHttpListener(0, "127.0.0.1")
.setHandler(exchange -> {
receivedHttpRequests.incrementAndGet();
exchange.setStatusCode(200).endExchange();
}).build();
server.start();
port = ((InetSocketAddress) server.getListenerInfo().get(0).getAddress()).getPort();
}

@AfterAll
static void stopServer() {
server.stop();
}
private PayloadSender payloadSender;

@BeforeEach
void setUp() {
receivedHttpRequests.set(0);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new AfterburnerModule());
reporterConfiguration = spy(new ReporterConfiguration());
ReporterConfiguration reporterConfiguration = spy(new ReporterConfiguration());
when(reporterConfiguration.getFlushInterval()).thenReturn(-1);
when(reporterConfiguration.getServerUrl()).thenReturn("http://localhost:" + port);
payloadSender = new ApmServerHttpPayloadSender(new OkHttpClient(), new JacksonPayloadSerializer(objectMapper), reporterConfiguration);
when(reporterConfiguration.getMaxQueueSize()).thenReturn(2);
SystemInfo system = new SystemInfo("x64", "localhost", "platform");
reporter = new ApmServerReporter(new Service(), new Process(), system, payloadSender, false, reporterConfiguration);
payloadSender = mock(PayloadSender.class);
reporter = new ApmServerReporter(new Service(), new Process(), system, payloadSender, true, reporterConfiguration);
}

@Test
void testReportTransaction() throws ExecutionException, InterruptedException {
reporter.report(new Transaction());
reporter.flush().get();
assertThat(reporter.getDropped()).isEqualTo(0);
assertThat(receivedHttpRequests.get()).isEqualTo(1);
void testReport_discardTransactions_ifQueueIsFull() {
Transaction transaction = mock(Transaction.class);
// try to report lots of transactions with a tiny queue should lead to dropped events
for (int i = 0; i < 100; i++) {
reporter.report(transaction);
}
assertThat(reporter.getDropped()).isGreaterThan(0);
verify(payloadSender, atLeastOnce()).sendPayload(any());
verify(transaction, atLeastOnce()).recycle();
}

@Test
void testReportErrorCapture() throws ExecutionException, InterruptedException {
reporter.report(new ErrorCapture());
reporter.flush().get();
assertThat(reporter.getDropped()).isEqualTo(0);
assertThat(receivedHttpRequests.get()).isEqualTo(1);
void testReport_discardErrors_ifQueueIsFull() {
ErrorCapture error = mock(ErrorCapture.class);
// try to report lots of errors with a tiny queue should lead to dropped events
for (int i = 0; i < 100; i++) {
reporter.report(error);
}
assertThat(reporter.getDropped()).isGreaterThan(0);
verify(payloadSender, atLeastOnce()).sendPayload(any());
verify(error, atLeastOnce()).recycle();
}
}