Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package datadog.trace.common.writer;

import static datadog.trace.api.sampling.PrioritySampling.UNSET;
import static java.util.concurrent.TimeUnit.MINUTES;

import datadog.trace.core.DDSpan;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.relocate.api.RatelimitedLogger;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -28,6 +30,8 @@ public abstract class RemoteWriter implements Writer {

private static final Logger log = LoggerFactory.getLogger(RemoteWriter.class);

private final RatelimitedLogger rlLog = new RatelimitedLogger(log, 1, MINUTES);

protected final TraceProcessingWorker traceProcessingWorker;
private final PayloadDispatcher dispatcher;
private final boolean alwaysFlush;
Expand Down Expand Up @@ -63,10 +67,14 @@ protected RemoteWriter(

@Override
public void write(final List<DDSpan> trace) {
// We can't add events after shutdown otherwise it will never complete shutting down.
if (!closed) {
if (closed) {
// We can't add events after shutdown otherwise it will never complete shutting down.
log.debug("Dropped due to shutdown: {}", trace);
handleDroppedTrace(trace);
} else {
if (trace.isEmpty()) {
handleDroppedTrace("Trace was empty", trace, UNSET);
log.debug("Dropped an empty trace.");
handleDroppedTrace(trace);
} else {
final DDSpan root = trace.get(0);
final int samplingPriority = root.samplingPriority();
Expand All @@ -79,26 +87,28 @@ public void write(final List<DDSpan> trace) {
log.debug("Enqueued for single span sampling: {}", trace);
break;
case DROPPED_BY_POLICY:
handleDroppedTrace("Dropping policy is active", trace, samplingPriority);
log.debug("Dropped by the policy: {}", trace);
handleDroppedTrace(trace);
break;
case DROPPED_BUFFER_OVERFLOW:
handleDroppedTrace("Trace written to overfilled buffer", trace, samplingPriority);
if (log.isDebugEnabled()) {
log.debug("Dropped due to a buffer overflow: {}", trace);
} else {
rlLog.warn("Dropped due to a buffer overflow: [{} spans]", trace.size());
}
handleDroppedTrace(trace);
break;
}
}
} else {
handleDroppedTrace("Trace written after shutdown.", trace, UNSET);
}
if (alwaysFlush) {
flush();
}
}

private void handleDroppedTrace(
final String reason, final List<DDSpan> trace, final int samplingPriority) {
log.debug("{}. Counted but dropping trace: {}", reason, trace);
healthMetrics.onFailedPublish(
trace.isEmpty() ? 0 : trace.get(0).samplingPriority(), trace.size());
private void handleDroppedTrace(final List<DDSpan> trace) {
int samplingPriority = trace.isEmpty() ? UNSET : trace.get(0).samplingPriority();
healthMetrics.onFailedPublish(samplingPriority, trace.size());
incrementDropCounts(trace.size());
}

Expand Down
Loading