Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dd-trace-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ dependencies {

implementation group: 'com.google.re2j', name: 're2j', version: '1.7'

// Antithesis SDK for assertions and property testing - bundled in tracer JAR
implementation group: 'com.antithesis', name: 'sdk', version: '1.4.5'

compileOnly group: 'com.github.spotbugs', name: 'spotbugs-annotations', version: '4.2.0'

// We have autoservices defined in test subtree, looks like we need this to be able to properly rebuild this
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package datadog.trace.common.writer;

import com.antithesis.sdk.Assert;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import datadog.communication.monitor.Monitoring;
import datadog.communication.monitor.Recording;
import datadog.communication.serialization.ByteBufferConsumer;
Expand Down Expand Up @@ -57,6 +60,17 @@ public Collection<RemoteApi> getApis() {

@Override
public void onDroppedTrace(int spanCount) {
// Antithesis: Assert that traces should not be dropped before sending
ObjectNode dropDetails = JsonNodeFactory.instance.objectNode();
dropDetails.put("span_count", spanCount);
dropDetails.put("total_dropped_traces", droppedTraceCount.sum() + 1);
dropDetails.put("total_dropped_spans", droppedSpanCount.sum() + spanCount);

log.debug("ANTITHESIS_ASSERT: Traces dropped before sending (unreachable) - span_count: {}, total_dropped: {}", spanCount, droppedTraceCount.sum() + 1);
Assert.unreachable(
"Traces should not be dropped before attempting to send - indicates buffer overflow or backpressure",
dropDetails);

droppedSpanCount.add(spanCount);
droppedTraceCount.increment();
}
Expand Down Expand Up @@ -103,18 +117,60 @@ public void accept(int messageCount, ByteBuffer buffer) {
// the packer calls this when the buffer is full,
// or when the packer is flushed at a heartbeat
if (messageCount > 0) {
// Antithesis: Verify that we're attempting to send traces
log.debug("ANTITHESIS_ASSERT: Trace sending code path exercised (reachable) - message_count: {}", messageCount);
Assert.reachable("Trace sending code path is exercised", null);
log.debug("ANTITHESIS_ASSERT: Checking if traces are being sent to API (sometimes) - message_count: {}", messageCount);
Assert.sometimes(
messageCount > 0,
"Traces are being sent to the API",
null);

batchTimer.reset();
Payload payload = newPayload(messageCount, buffer);
final int sizeInBytes = payload.sizeInBytes();
healthMetrics.onSerialize(sizeInBytes);
RemoteApi.Response response = api.sendSerializedTraces(payload);
mapper.reset();

// Antithesis: Assert that trace sending should always succeed
ObjectNode sendDetails = JsonNodeFactory.instance.objectNode();
sendDetails.put("trace_count", messageCount);
sendDetails.put("payload_size_bytes", sizeInBytes);
sendDetails.put("success", response.success());
response.exception().ifPresent(ex -> {
sendDetails.put("exception", ex.getClass().getName());
sendDetails.put("exception_message", ex.getMessage());
});
response.status().ifPresent(status -> sendDetails.put("http_status", status));

log.debug("ANTITHESIS_ASSERT: Checking trace sending success (always) - success: {}, trace_count: {}", response.success(), messageCount);
Assert.always(
response.success(),
"Trace sending to API should always succeed - no traces should be lost",
sendDetails);

if (response.success()) {
if (log.isDebugEnabled()) {
log.debug("Successfully sent {} traces to the API", messageCount);
}
healthMetrics.onSend(messageCount, sizeInBytes, response);
} else {
// Antithesis: This code path should be unreachable if traces are never lost
ObjectNode failureDetails = JsonNodeFactory.instance.objectNode();
failureDetails.put("trace_count", messageCount);
failureDetails.put("payload_size_bytes", sizeInBytes);
response.exception().ifPresent(ex -> {
failureDetails.put("exception", ex.getClass().getName());
failureDetails.put("exception_message", ex.getMessage());
});
response.status().ifPresent(status -> failureDetails.put("http_status", status));

log.debug("ANTITHESIS_ASSERT: Trace sending failed (unreachable) - trace_count: {}, size: {} bytes", messageCount, sizeInBytes);
Assert.unreachable(
"Trace sending failure path should never be reached - indicates traces are being lost",
failureDetails);

if (log.isDebugEnabled()) {
log.debug(
"Failed to send {} traces of size {} bytes to the API", messageCount, sizeInBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import static datadog.trace.api.sampling.PrioritySampling.UNSET;
import static java.util.concurrent.TimeUnit.MINUTES;

import com.antithesis.sdk.Assert;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import datadog.trace.core.DDSpan;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.relocate.api.RatelimitedLogger;
Expand Down Expand Up @@ -67,9 +70,33 @@ protected RemoteWriter(

@Override
public void write(final List<DDSpan> trace) {
// Antithesis: Assert that we should never attempt to write when writer is closed
ObjectNode writeAttemptDetails = JsonNodeFactory.instance.objectNode();
writeAttemptDetails.put("writer_closed", closed);
writeAttemptDetails.put("trace_size", trace.size());
writeAttemptDetails.put("has_traces", !trace.isEmpty());

log.debug("ANTITHESIS_ASSERT: Checking writer not closed when writing (always) - closed: {}, trace_size: {}", closed, trace.size());
Assert.always(
!closed,
"Writer should never be closed when attempting to write traces",
writeAttemptDetails);

if (closed) {
// We can't add events after shutdown otherwise it will never complete shutting down.
log.debug("Dropped due to shutdown: {}", trace);

// Antithesis: Track when traces are dropped due to writer being closed
ObjectNode shutdownDetails = JsonNodeFactory.instance.objectNode();
shutdownDetails.put("trace_size", trace.size());
shutdownDetails.put("reason", "writer_closed_during_shutdown");

log.debug("ANTITHESIS_ASSERT: Traces dropped due to shutdown (sometimes) - closed: {}, trace_size: {}", closed, trace.size());
Assert.sometimes(
closed && !trace.isEmpty(),
"Traces are dropped due to writer shutdown - tracking shutdown behavior",
shutdownDetails);

handleDroppedTrace(trace);
} else {
if (trace.isEmpty()) {
Expand All @@ -91,6 +118,18 @@ public void write(final List<DDSpan> trace) {
handleDroppedTrace(trace);
break;
case DROPPED_BUFFER_OVERFLOW:
// Antithesis: Buffer overflow should NEVER happen - this indicates a serious problem
ObjectNode overflowDetails = JsonNodeFactory.instance.objectNode();
overflowDetails.put("trace_size", trace.size());
overflowDetails.put("sampling_priority", samplingPriority);
overflowDetails.put("buffer_capacity", traceProcessingWorker.getCapacity());
overflowDetails.put("reason", "buffer_overflow_backpressure");

log.debug("ANTITHESIS_ASSERT: Buffer overflow occurred (unreachable) - trace_size: {}, capacity: {}", trace.size(), traceProcessingWorker.getCapacity());
Assert.unreachable(
"Buffer overflow should never occur - traces are being dropped due to backpressure",
overflowDetails);

if (log.isDebugEnabled()) {
log.debug("Dropped due to a buffer overflow: {}", trace);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import static datadog.communication.http.OkHttpUtils.prepareRequest;

import com.antithesis.sdk.Assert;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types;
Expand Down Expand Up @@ -89,11 +92,33 @@ public void addResponseListener(final RemoteResponseListener listener) {

public Response sendSerializedTraces(final Payload payload) {
final int sizeInBytes = payload.sizeInBytes();

// Antithesis: Track that agent API send is being exercised
log.debug("ANTITHESIS_ASSERT: Verifying DDAgentApi trace sending is exercised (reachable) with {} traces", payload.traceCount());
Assert.reachable("DDAgentApi trace sending is exercised", null);
log.debug("ANTITHESIS_ASSERT: Checking if traces are being sent through DDAgentApi (sometimes) - count: {}", payload.traceCount());
Assert.sometimes(
payload.traceCount() > 0,
"Traces are being sent through DDAgentApi",
null);

String tracesEndpoint = featuresDiscovery.getTraceEndpoint();
if (null == tracesEndpoint) {
featuresDiscovery.discoverIfOutdated();
tracesEndpoint = featuresDiscovery.getTraceEndpoint();
if (null == tracesEndpoint) {
// Antithesis: Agent should always be detectable
ObjectNode agentDetectionDetails = JsonNodeFactory.instance.objectNode();
agentDetectionDetails.put("trace_count", payload.traceCount());
agentDetectionDetails.put("payload_size_bytes", sizeInBytes);
agentDetectionDetails.put("agent_url", agentUrl.toString());
agentDetectionDetails.put("failure_reason", "agent_not_detected");

log.debug("ANTITHESIS_ASSERT: Agent not detected (unreachable) - url: {}, traces: {}", agentUrl, payload.traceCount());
Assert.unreachable(
"Datadog agent should always be detected - agent communication failure",
agentDetectionDetails);

log.error("No datadog agent detected");
countAndLogFailedSend(payload.traceCount(), sizeInBytes, null, null);
return Response.failed(404);
Expand Down Expand Up @@ -122,7 +147,36 @@ public Response sendSerializedTraces(final Payload payload) {
try (final Recording recording = sendPayloadTimer.start();
final okhttp3.Response response = httpClient.newCall(request).execute()) {
handleAgentChange(response.header(DATADOG_AGENT_STATE));

// Antithesis: Track HTTP response status and assert success
ObjectNode httpResponseDetails = JsonNodeFactory.instance.objectNode();
httpResponseDetails.put("trace_count", payload.traceCount());
httpResponseDetails.put("payload_size_bytes", sizeInBytes);
httpResponseDetails.put("http_status", response.code());
httpResponseDetails.put("http_message", response.message());
httpResponseDetails.put("success", response.code() == 200);
httpResponseDetails.put("agent_url", tracesUrl.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: I'm very uncomfortable with using jackson API in these parts. And the Assert methods rely on it.


log.debug("ANTITHESIS_ASSERT: Checking HTTP response status (always) - code: {}, traces: {}", response.code(), payload.traceCount());
Assert.always(
response.code() == 200,
"HTTP response from Datadog agent should always be 200 - API communication failure",
httpResponseDetails);

if (response.code() != 200) {
// Antithesis: Mark non-200 path as unreachable
ObjectNode errorDetails = JsonNodeFactory.instance.objectNode();
errorDetails.put("trace_count", payload.traceCount());
errorDetails.put("payload_size_bytes", sizeInBytes);
errorDetails.put("http_status", response.code());
errorDetails.put("http_message", response.message());
errorDetails.put("failure_reason", "http_error_response");

log.debug("ANTITHESIS_ASSERT: Non-200 HTTP response (unreachable) - code: {}, message: {}, traces: {}", response.code(), response.message(), payload.traceCount());
Assert.unreachable(
"Non-200 HTTP response from agent indicates API failure - traces may be lost",
errorDetails);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: Another thing is the human cost to maintain these assertions.


agentErrorCounter.incrementErrorCount(response.message(), payload.traceCount());
countAndLogFailedSend(payload.traceCount(), sizeInBytes, response, null);
return Response.failed(response.code());
Expand All @@ -146,6 +200,20 @@ public Response sendSerializedTraces(final Payload payload) {
}
}
} catch (final IOException e) {
// Antithesis: Network failures should not occur
ObjectNode networkErrorDetails = JsonNodeFactory.instance.objectNode();
networkErrorDetails.put("trace_count", payload.traceCount());
networkErrorDetails.put("payload_size_bytes", sizeInBytes);
networkErrorDetails.put("exception_type", e.getClass().getName());
networkErrorDetails.put("exception_message", e.getMessage());
networkErrorDetails.put("agent_url", agentUrl.toString());
networkErrorDetails.put("failure_reason", "network_io_exception");

log.debug("ANTITHESIS_ASSERT: Network/IO exception (unreachable) - type: {}, message: {}, traces: {}", e.getClass().getName(), e.getMessage(), payload.traceCount());
Assert.unreachable(
"Network/IO exceptions should not occur when sending to agent - indicates connectivity issues",
networkErrorDetails);

countAndLogFailedSend(payload.traceCount(), sizeInBytes, null, e);
return Response.failed(e);
}
Expand Down
3 changes: 3 additions & 0 deletions telemetry/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ dependencies {
implementation(libs.slf4j)

implementation(project(":internal-api"))

// Antithesis SDK for assertions and property testing - bundled in tracer JAR
implementation(group = "com.antithesis", name = "sdk", version = "1.4.5")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: Another concern, is how much more weight it adds to the jar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this PR adds ~20mb to the final jar... but a lot of that is because it embeds antithesis, its native FFI wrapper, and various jackson dependencies multiple times in each of the product directories in the agent jar.

If we move it to the shared section in dd-java-agent/build.gradle and add the necessary excludes to gradle/dependencies.gradle like we do for other shared dependencies then the overhead is 3mb compressed and 8mb uncompressed.

That still feels too big to have in the general deliverable for something only used for testing purposes.

One option might be to only include the direct dependency in the release (i.e. without jackson or the ffi wrapper.) - in other words just enough to allow the classes to load. We'd then have to look at how to combine the other parts for testing, whether that's via -Xbootclasspath/a: or something similar.


compileOnly(project(":dd-java-agent:agent-tooling"))
testImplementation(project(":dd-java-agent:agent-tooling"))
Expand Down
63 changes: 63 additions & 0 deletions telemetry/src/main/java/datadog/telemetry/TelemetryClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package datadog.telemetry;

import com.antithesis.sdk.Assert;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import datadog.communication.http.HttpRetryPolicy;
import datadog.communication.http.OkHttpUtils;
import datadog.trace.api.Config;
Expand Down Expand Up @@ -94,26 +97,86 @@ public Result sendHttpRequest(Request.Builder httpRequestBuilder) {

Request httpRequest = httpRequestBuilder.build();
String requestType = httpRequest.header(DD_TELEMETRY_REQUEST_TYPE);

// Antithesis: Track telemetry sending attempts
log.debug("ANTITHESIS_ASSERT: Telemetry sending exercised (reachable) - request_type: {}", requestType);
Assert.reachable("Telemetry sending is exercised", null);

try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(okHttpClient, httpRetryPolicy, httpRequest)) {

// Antithesis: Assert that all telemetry requests should succeed
ObjectNode telemetryResponseDetails = JsonNodeFactory.instance.objectNode();
telemetryResponseDetails.put("request_type", requestType != null ? requestType : "unknown");
telemetryResponseDetails.put("http_status", response.code());
telemetryResponseDetails.put("http_message", response.message());
telemetryResponseDetails.put("url", url.toString());
telemetryResponseDetails.put("success", response.isSuccessful());

if (response.code() == 404) {
// Antithesis: Track 404 - endpoint disabled scenario
ObjectNode notFoundDetails = JsonNodeFactory.instance.objectNode();
notFoundDetails.put("request_type", requestType != null ? requestType : "unknown");
notFoundDetails.put("url", url.toString());
notFoundDetails.put("reason", "endpoint_disabled_404");

log.debug("ANTITHESIS_ASSERT: Telemetry endpoint 404 (sometimes) - request_type: {}, url: {}", requestType, url);
Assert.sometimes(
response.code() == 404,
"Telemetry endpoint returns 404 - endpoint may be disabled",
notFoundDetails);

log.debug("Telemetry endpoint is disabled, dropping {} message.", requestType);
return Result.NOT_FOUND;
}

if (!response.isSuccessful()) {
// Antithesis: Telemetry should not fail - data should be retried/buffered
ObjectNode failureDetails = JsonNodeFactory.instance.objectNode();
failureDetails.put("request_type", requestType != null ? requestType : "unknown");
failureDetails.put("http_status", response.code());
failureDetails.put("http_message", response.message());
failureDetails.put("url", url.toString());
failureDetails.put("reason", "http_error_response");

log.debug("ANTITHESIS_ASSERT: Telemetry HTTP request failed (unreachable) - request_type: {}, status: {}", requestType, response.code());
Assert.unreachable(
"Telemetry HTTP request failed - telemetry data should not be dropped, should retry",
failureDetails);

log.debug(
"Telemetry message {} failed with: {} {}.",
requestType,
response.code(),
response.message());
return Result.FAILURE;
}

// Antithesis: Assert success
log.debug("ANTITHESIS_ASSERT: Checking telemetry request success (always) - successful: {}, request_type: {}", response.isSuccessful(), requestType);
Assert.always(
response.isSuccessful(),
"Telemetry requests should always succeed - no telemetry data should be lost",
telemetryResponseDetails);

} catch (InterruptedIOException e) {
log.debug("Telemetry message {} sending interrupted: {}.", requestType, e.toString());
return Result.INTERRUPTED;

} catch (IOException e) {
// Antithesis: Network failures should not cause telemetry loss
ObjectNode ioErrorDetails = JsonNodeFactory.instance.objectNode();
ioErrorDetails.put("request_type", requestType != null ? requestType : "unknown");
ioErrorDetails.put("exception_type", e.getClass().getName());
ioErrorDetails.put("exception_message", e.getMessage());
ioErrorDetails.put("url", url.toString());
ioErrorDetails.put("reason", "network_io_exception");

log.debug("ANTITHESIS_ASSERT: Telemetry network/IO exception (unreachable) - request_type: {}, exception: {}", requestType, e.getClass().getName());
Assert.unreachable(
"Telemetry network/IO failure - telemetry data should not be dropped, should retry",
ioErrorDetails);

log.debug("Telemetry message {} failed with exception: {}.", requestType, e.toString());
return Result.FAILURE;
}
Expand Down
Loading
Loading