Skip to content

Commit 487e069

Browse files
authored
Merge pull request #6859 from DataDog/kr-igor/dsm-s3-checkpoints
[DSM] - Set DSM checkpoints for S3 put / get operations
2 parents 23a819a + 12e83df commit 487e069

File tree

16 files changed

+302
-41
lines changed

16 files changed

+302
-41
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java

+37
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,41 @@ public String operationName() {
131131
.client()
132132
.operationForComponent(component().toString());
133133
}
134+
135+
public String getSpanTagAsString(AgentSpan span, String tag) {
136+
Object value = span.getTag(tag);
137+
return value == null ? null : value.toString();
138+
}
139+
140+
public long getRequestContentLength(final REQUEST request) {
141+
if (request == null) {
142+
return 0;
143+
}
144+
145+
String contentLengthStr = getRequestHeader(request, "Content-Length");
146+
if (contentLengthStr != null) {
147+
try {
148+
return Long.parseLong(contentLengthStr);
149+
} catch (NumberFormatException ignored) {
150+
}
151+
}
152+
153+
return 0;
154+
}
155+
156+
public long getResponseContentLength(final RESPONSE response) {
157+
if (response == null) {
158+
return 0;
159+
}
160+
161+
String contentLengthStr = getResponseHeader(response, "Content-Length");
162+
if (contentLengthStr != null) {
163+
try {
164+
return Long.parseLong(contentLengthStr);
165+
} catch (NumberFormatException ignored) {
166+
}
167+
}
168+
169+
return 0;
170+
}
134171
}

dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkClientDecorator.java

+67-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package datadog.trace.instrumentation.aws.v0;
22

33
import static datadog.trace.bootstrap.instrumentation.api.ResourceNamePriorities.RPC_COMMAND_NAME;
4+
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
45

56
import com.amazonaws.AmazonWebServiceRequest;
67
import com.amazonaws.AmazonWebServiceResponse;
78
import com.amazonaws.Request;
89
import com.amazonaws.Response;
10+
import com.amazonaws.http.HttpMethodName;
911
import datadog.trace.api.Config;
1012
import datadog.trace.api.DDTags;
1113
import datadog.trace.api.cache.DDCache;
@@ -20,7 +22,9 @@
2022
import datadog.trace.bootstrap.instrumentation.api.Tags;
2123
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
2224
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
25+
import datadog.trace.core.datastreams.TagsProcessor;
2326
import java.net.URI;
27+
import java.util.LinkedHashMap;
2428
import java.util.List;
2529
import java.util.Locale;
2630
import java.util.regex.Matcher;
@@ -75,20 +79,25 @@ public AgentSpan onRequest(final AgentSpan span, final Request request) {
7579
super.onRequest(span, request);
7680

7781
final String awsServiceName = request.getServiceName();
82+
final String awsSimplifiedServiceName = simplifyServiceName(awsServiceName);
7883
final AmazonWebServiceRequest originalRequest = request.getOriginalRequest();
7984
final Class<?> awsOperation = originalRequest.getClass();
8085
final GetterAccess access = GetterAccess.of(originalRequest);
8186

8287
span.setTag(InstrumentationTags.AWS_AGENT, COMPONENT_NAME);
8388
span.setTag(InstrumentationTags.AWS_SERVICE, awsServiceName);
84-
span.setTag(InstrumentationTags.TOP_LEVEL_AWS_SERVICE, simplifyServiceName(awsServiceName));
89+
span.setTag(InstrumentationTags.TOP_LEVEL_AWS_SERVICE, awsSimplifiedServiceName);
8590
span.setTag(InstrumentationTags.AWS_OPERATION, awsOperation.getSimpleName());
8691
span.setTag(InstrumentationTags.AWS_ENDPOINT, request.getEndpoint().toString());
8792

8893
CharSequence awsRequestName = AwsNameCache.getQualifiedName(request);
89-
9094
span.setResourceName(awsRequestName, RPC_COMMAND_NAME);
9195

96+
if ("s3".equalsIgnoreCase(awsSimplifiedServiceName)
97+
&& span.traceConfig().isDataStreamsEnabled()) {
98+
span.setTag(Tags.HTTP_REQUEST_CONTENT_LENGTH, getRequestContentLength(request));
99+
}
100+
92101
switch (awsRequestName.toString()) {
93102
case "SQS.SendMessage":
94103
case "SQS.SendMessageBatch":
@@ -233,12 +242,68 @@ public AgentSpan onRequest(final AgentSpan span, final Request request) {
233242
return span;
234243
}
235244

245+
public AgentSpan onServiceResponse(
246+
final AgentSpan span, final String awsService, final Response response) {
247+
if ("s3".equalsIgnoreCase(simplifyServiceName(awsService))
248+
&& span.traceConfig().isDataStreamsEnabled()) {
249+
long responseSize = getResponseContentLength(response);
250+
span.setTag(Tags.HTTP_RESPONSE_CONTENT_LENGTH, responseSize);
251+
252+
String key = getSpanTagAsString(span, InstrumentationTags.AWS_OBJECT_KEY);
253+
String bucket = getSpanTagAsString(span, InstrumentationTags.AWS_BUCKET_NAME);
254+
String awsOperation = getSpanTagAsString(span, InstrumentationTags.AWS_OPERATION);
255+
256+
if (key != null && bucket != null && awsOperation != null) {
257+
// GetObjectMetadataRequest may return the object if it's not "HEAD"
258+
if (HttpMethodName.GET.name().equals(span.getTag(Tags.HTTP_METHOD))
259+
&& ("GetObjectMetadataRequest".equalsIgnoreCase(awsOperation)
260+
|| "GetObjectRequest".equalsIgnoreCase(awsOperation))) {
261+
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
262+
263+
sortedTags.put(TagsProcessor.DIRECTION_TAG, TagsProcessor.DIRECTION_IN);
264+
sortedTags.put(TagsProcessor.DATASET_NAME_TAG, key);
265+
sortedTags.put(TagsProcessor.DATASET_NAMESPACE_TAG, bucket);
266+
sortedTags.put(TagsProcessor.TOPIC_TAG, bucket);
267+
sortedTags.put(TagsProcessor.TYPE_TAG, "s3");
268+
269+
AgentTracer.get()
270+
.getDataStreamsMonitoring()
271+
.setCheckpoint(span, sortedTags, 0, responseSize);
272+
}
273+
274+
if ("PutObjectRequest".equalsIgnoreCase(awsOperation)
275+
|| "UploadPartRequest".equalsIgnoreCase(awsOperation)) {
276+
Object requestSize = span.getTag(Tags.HTTP_REQUEST_CONTENT_LENGTH);
277+
long payloadSize = 0;
278+
if (requestSize != null) {
279+
payloadSize = (long) requestSize;
280+
}
281+
282+
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
283+
284+
sortedTags.put(TagsProcessor.DIRECTION_TAG, DIRECTION_OUT);
285+
sortedTags.put(TagsProcessor.DATASET_NAME_TAG, key);
286+
sortedTags.put(TagsProcessor.DATASET_NAMESPACE_TAG, bucket);
287+
sortedTags.put(TagsProcessor.TOPIC_TAG, bucket);
288+
sortedTags.put(TagsProcessor.TYPE_TAG, "s3");
289+
290+
AgentTracer.get()
291+
.getDataStreamsMonitoring()
292+
.setCheckpoint(span, sortedTags, 0, payloadSize);
293+
}
294+
}
295+
}
296+
297+
return span;
298+
}
299+
236300
@Override
237301
public AgentSpan onResponse(final AgentSpan span, final Response response) {
238302
if (response.getAwsResponse() instanceof AmazonWebServiceResponse) {
239303
final AmazonWebServiceResponse awsResp = (AmazonWebServiceResponse) response.getAwsResponse();
240304
span.setTag(InstrumentationTags.AWS_REQUEST_ID, awsResp.getRequestId());
241305
}
306+
242307
return super.onResponse(span, response);
243308
}
244309

dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/TracingRequestHandler.java

+1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public void afterResponse(final Request<?> request, final Response<?> response)
8989
if (span != null) {
9090
request.addHandlerContext(SPAN_CONTEXT_KEY, null);
9191
DECORATE.onResponse(span, response);
92+
DECORATE.onServiceResponse(span, request.getServiceName(), response);
9293
DECORATE.beforeFinish(span);
9394
span.finish();
9495
}

dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AwsSdkClientDecorator.java

+62-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package datadog.trace.instrumentation.aws.v2;
22

33
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
4+
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
45
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
56
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
67
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
@@ -22,6 +23,7 @@
2223
import datadog.trace.bootstrap.instrumentation.api.Tags;
2324
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
2425
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
26+
import datadog.trace.core.datastreams.TagsProcessor;
2527
import java.net.URI;
2628
import java.time.Instant;
2729
import java.util.Collections;
@@ -106,13 +108,23 @@ public CharSequence spanName(final ExecutionAttributes attributes) {
106108
}
107109

108110
public AgentSpan onSdkRequest(
109-
final AgentSpan span, final SdkRequest request, final ExecutionAttributes attributes) {
111+
final AgentSpan span,
112+
final SdkRequest request,
113+
final SdkHttpRequest httpRequest,
114+
final ExecutionAttributes attributes) {
110115
final String awsServiceName = attributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME);
111116
final String awsOperationName = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
112117
onOperation(span, awsServiceName, awsOperationName);
113118

114119
// S3
115120
request.getValueForField("Bucket", String.class).ifPresent(name -> setBucketName(span, name));
121+
if ("s3".equalsIgnoreCase(awsServiceName) && span.traceConfig().isDataStreamsEnabled()) {
122+
request
123+
.getValueForField("Key", String.class)
124+
.ifPresent(key -> span.setTag(InstrumentationTags.AWS_OBJECT_KEY, key));
125+
span.setTag(Tags.HTTP_REQUEST_CONTENT_LENGTH, getRequestContentLength(httpRequest));
126+
}
127+
116128
getRequestKey(request).ifPresent(key -> setObjectKey(span, key));
117129
request
118130
.getValueForField("StorageClass", String.class)
@@ -280,7 +292,10 @@ private static void setTableName(AgentSpan span, String name) {
280292
}
281293

282294
public AgentSpan onSdkResponse(
283-
final AgentSpan span, final SdkResponse response, final ExecutionAttributes attributes) {
295+
final AgentSpan span,
296+
final SdkResponse response,
297+
final SdkHttpResponse httpResponse,
298+
final ExecutionAttributes attributes) {
284299
if (response instanceof AwsResponse) {
285300
span.setTag(
286301
InstrumentationTags.AWS_REQUEST_ID,
@@ -335,6 +350,51 @@ public AgentSpan onSdkResponse(
335350
});
336351
}
337352
}
353+
354+
if ("s3".equalsIgnoreCase(awsServiceName) && span.traceConfig().isDataStreamsEnabled()) {
355+
long responseSize = getResponseContentLength(httpResponse);
356+
span.setTag(Tags.HTTP_RESPONSE_CONTENT_LENGTH, responseSize);
357+
358+
String key = getSpanTagAsString(span, InstrumentationTags.AWS_OBJECT_KEY);
359+
String bucket = getSpanTagAsString(span, InstrumentationTags.AWS_BUCKET_NAME);
360+
String awsOperation = getSpanTagAsString(span, InstrumentationTags.AWS_OPERATION);
361+
362+
if (key != null && bucket != null && awsOperation != null) {
363+
if ("GetObject".equalsIgnoreCase(awsOperation)) {
364+
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
365+
366+
sortedTags.put(TagsProcessor.DIRECTION_TAG, TagsProcessor.DIRECTION_IN);
367+
sortedTags.put(TagsProcessor.DATASET_NAME_TAG, key);
368+
sortedTags.put(TagsProcessor.DATASET_NAMESPACE_TAG, bucket);
369+
sortedTags.put(TagsProcessor.TOPIC_TAG, bucket);
370+
sortedTags.put(TagsProcessor.TYPE_TAG, "s3");
371+
372+
AgentTracer.get()
373+
.getDataStreamsMonitoring()
374+
.setCheckpoint(span, sortedTags, 0, responseSize);
375+
}
376+
377+
if ("PutObject".equalsIgnoreCase(awsOperation)) {
378+
Object requestSize = span.getTag(Tags.HTTP_REQUEST_CONTENT_LENGTH);
379+
long payloadSize = 0;
380+
if (requestSize != null) {
381+
payloadSize = (long) requestSize;
382+
}
383+
384+
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
385+
386+
sortedTags.put(TagsProcessor.DIRECTION_TAG, DIRECTION_OUT);
387+
sortedTags.put(TagsProcessor.DATASET_NAME_TAG, key);
388+
sortedTags.put(TagsProcessor.DATASET_NAMESPACE_TAG, bucket);
389+
sortedTags.put(TagsProcessor.TOPIC_TAG, bucket);
390+
sortedTags.put(TagsProcessor.TYPE_TAG, "s3");
391+
392+
AgentTracer.get()
393+
.getDataStreamsMonitoring()
394+
.setCheckpoint(span, sortedTags, 0, payloadSize);
395+
}
396+
}
397+
}
338398
}
339399
return span;
340400
}

dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/TracingExecutionInterceptor.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void afterMarshalling(
5858
if (span != null) {
5959
try (AgentScope ignored = activateSpan(span)) {
6060
DECORATE.onRequest(span, context.httpRequest());
61-
DECORATE.onSdkRequest(span, context.request(), executionAttributes);
61+
DECORATE.onSdkRequest(span, context.request(), context.httpRequest(), executionAttributes);
6262
}
6363
}
6464
}
@@ -104,7 +104,7 @@ public void afterExecution(
104104
if (span != null) {
105105
executionAttributes.putAttribute(SPAN_ATTRIBUTE, null);
106106
// Call onResponse on both types of responses:
107-
DECORATE.onSdkResponse(span, context.response(), executionAttributes);
107+
DECORATE.onSdkResponse(span, context.response(), context.httpResponse(), executionAttributes);
108108
DECORATE.onResponse(span, context.httpResponse());
109109
DECORATE.beforeFinish(span);
110110
span.finish();
@@ -127,7 +127,7 @@ public void onExecutionFailure(
127127
Optional<SdkResponse> responseOpt = context.response();
128128
if (responseOpt.isPresent()) {
129129
SdkResponse response = responseOpt.get();
130-
DECORATE.onSdkResponse(span, response, executionAttributes);
130+
DECORATE.onSdkResponse(span, response, response.sdkHttpResponse(), executionAttributes);
131131
DECORATE.onResponse(span, response.sdkHttpResponse());
132132
if (span.isError()) {
133133
DECORATE.onError(span, context.exception());

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

+1
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize)
196196
saved.getEdgeTags(),
197197
saved.getHash(),
198198
saved.getParentHash(),
199+
saved.getDataSetHash(),
199200
saved.getTimestampNanos(),
200201
saved.getPathwayLatencyNano(),
201202
saved.getEdgeLatencyNano(),

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
5353
static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5);
5454

5555
private static final StatsPoint REPORT =
56-
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0);
56+
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0);
5757
private static final StatsPoint POISON_PILL =
58-
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0);
58+
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0);
5959

6060
private final Map<Long, StatsBucket> timeToBucket = new HashMap<>();
6161
private final BlockingQueue<InboxItem> inbox = new MpscBlockingConsumerArrayQueue<>(1024);

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java

+19
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ public class DefaultPathwayContext implements PathwayContext {
6161
TagsProcessor.TOPIC_TAG,
6262
TagsProcessor.EXCHANGE_TAG));
6363

64+
private static final Set<String> dataSetTagKeys =
65+
new HashSet<String>(
66+
Arrays.asList(TagsProcessor.DATASET_NAME_TAG, TagsProcessor.DATASET_NAMESPACE_TAG));
67+
6468
public DefaultPathwayContext(TimeSource timeSource, WellKnownTags wellKnownTags) {
6569
this.timeSource = timeSource;
6670
this.wellKnownTags = wellKnownTags;
@@ -121,6 +125,7 @@ public void setCheckpoint(
121125
// the number of tag keys for now. We should revisit this later if it's no longer the case.
122126
List<String> allTags = new ArrayList<>(sortedTags.size());
123127
PathwayHashBuilder pathwayHashBuilder = new PathwayHashBuilder(wellKnownTags);
128+
DataSetHashBuilder dataSetHashBuilder = new DataSetHashBuilder();
124129

125130
if (!started) {
126131
if (defaultTimestamp == 0) {
@@ -148,6 +153,9 @@ public void setCheckpoint(
148153
if (hashableTagKeys.contains(entry.getKey())) {
149154
pathwayHashBuilder.addTag(tag);
150155
}
156+
if (dataSetTagKeys.contains(entry.getKey())) {
157+
dataSetHashBuilder.addValue(tag);
158+
}
151159
allTags.add(tag);
152160
}
153161

@@ -166,6 +174,7 @@ public void setCheckpoint(
166174
}
167175

168176
long newHash = generatePathwayHash(nodeHash, hash);
177+
long dataSetHash = dataSetHashBuilder.addValue(String.valueOf(newHash));
169178

170179
long pathwayLatencyNano = nanoTicks - pathwayStartNanoTicks;
171180
long edgeLatencyNano = nanoTicks - edgeStartNanoTicks;
@@ -175,6 +184,7 @@ public void setCheckpoint(
175184
allTags,
176185
newHash,
177186
hash,
187+
dataSetHash,
178188
startNanos,
179189
pathwayLatencyNano,
180190
edgeLatencyNano,
@@ -388,6 +398,15 @@ private static DefaultPathwayContext decode(
388398
hash);
389399
}
390400

401+
static class DataSetHashBuilder {
402+
private long currentHash = 0L;
403+
404+
public long addValue(String val) {
405+
currentHash = FNV64Hash.generateHash(currentHash + val, FNV64Hash.Version.v1);
406+
return currentHash;
407+
}
408+
}
409+
391410
private static class PathwayHashBuilder {
392411
private final StringBuilder builder;
393412

0 commit comments

Comments
 (0)