diff --git a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/CloudEvent.java b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/CloudEvent.java index 2a8915dc60ce..dc5c01ec074d 100644 --- a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/CloudEvent.java +++ b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/CloudEvent.java @@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -336,6 +337,9 @@ public String getSpecVersion() { * @return the extension attributes as an unmodifiable map. */ public Map getExtensionAttributes() { + if (this.cloudEvent.getAdditionalProperties() == null) { + return null; + } return Collections.unmodifiableMap(this.cloudEvent.getAdditionalProperties()); } @@ -348,6 +352,9 @@ public Map getExtensionAttributes() { * @return the cloud event itself. */ public CloudEvent addExtensionAttribute(String name, Object value) { + if (this.cloudEvent.getAdditionalProperties() == null) { + this.cloudEvent.setAdditionalProperties(new HashMap<>()); + } this.cloudEvent.getAdditionalProperties().put(name.toLowerCase(Locale.ENGLISH), value); return this; } diff --git a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/EventGridPublisherAsyncClient.java b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/EventGridPublisherAsyncClient.java index 8d2254218393..c1c212d340d6 100644 --- a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/EventGridPublisherAsyncClient.java +++ b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/EventGridPublisherAsyncClient.java @@ -9,13 +9,18 @@ import com.azure.core.http.HttpPipeline; import com.azure.core.http.rest.Response; import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; import com.azure.core.util.serializer.SerializerAdapter; +import com.azure.core.util.tracing.TracerProxy; +import com.azure.messaging.eventgrid.implementation.Constants; import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImpl; import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImplBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.monoError; import static com.azure.core.util.FluxUtil.withContext; +import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; /** * A service client that publishes events to an EventGrid topic or domain. Use {@link EventGridPublisherClientBuilder} @@ -33,6 +38,9 @@ public final class EventGridPublisherAsyncClient { private final EventGridServiceVersion serviceVersion; + private final ClientLogger logger = new ClientLogger(EventGridPublisherAsyncClient.class); + + EventGridPublisherAsyncClient(HttpPipeline pipeline, String hostname, SerializerAdapter serializerAdapter, EventGridServiceVersion serviceVersion) { this.impl = new EventGridPublisherClientImplBuilder() @@ -63,14 +71,19 @@ public EventGridServiceVersion getServiceVersion() { */ @ServiceMethod(returns = ReturnType.SINGLE) public Mono sendEvents(Iterable events) { + if (events == null) { + return monoError(logger, new NullPointerException("'events' cannot be null.")); + } return withContext(context -> sendEvents(events, context)); } Mono sendEvents(Iterable events, Context context) { + final Context finalContext = context != null ? context : Context.NONE; return Flux.fromIterable(events) .map(EventGridEvent::toImpl) .collectList() - .flatMap(list -> this.impl.publishEventsAsync(this.hostname, list, context)); + .flatMap(list -> this.impl.publishEventsAsync(this.hostname, list, + finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE))); } /** @@ -81,14 +94,20 @@ Mono sendEvents(Iterable events, Context context) { */ @ServiceMethod(returns = ReturnType.SINGLE) public Mono sendCloudEvents(Iterable events) { + if (events == null) { + return monoError(logger, new NullPointerException("'events' cannot be null.")); + } return withContext(context -> sendCloudEvents(events, context)); } Mono sendCloudEvents(Iterable events, Context context) { + final Context finalContext = context != null ? context : Context.NONE; + this.addCloudEventTracePlaceHolder(events); return Flux.fromIterable(events) .map(CloudEvent::toImpl) .collectList() - .flatMap(list -> this.impl.publishCloudEventEventsAsync(this.hostname, list, context)); + .flatMap(list -> this.impl.publishCloudEventEventsAsync(this.hostname, list, + finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE))); } /** @@ -99,13 +118,18 @@ Mono sendCloudEvents(Iterable events, Context context) { */ @ServiceMethod(returns = ReturnType.SINGLE) public Mono sendCustomEvents(Iterable events) { + if (events == null) { + return monoError(logger, new NullPointerException("'events' cannot be null.")); + } return withContext(context -> sendCustomEvents(events, context)); } Mono sendCustomEvents(Iterable events, Context context) { + final Context finalContext = context != null ? context : Context.NONE; return Flux.fromIterable(events) .collectList() - .flatMap(list -> this.impl.publishCustomEventEventsAsync(this.hostname, list, context)); + .flatMap(list -> this.impl.publishCustomEventEventsAsync(this.hostname, list, + finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE))); } /** @@ -116,14 +140,19 @@ Mono sendCustomEvents(Iterable events, Context context) { */ @ServiceMethod(returns = ReturnType.SINGLE) public Mono> sendEventsWithResponse(Iterable events) { + if (events == null) { + return monoError(logger, new NullPointerException("'events' cannot be null.")); + } return withContext(context -> sendEventsWithResponse(events, context)); } Mono> sendEventsWithResponse(Iterable events, Context context) { + final Context finalContext = context != null ? context : Context.NONE; return Flux.fromIterable(events) .map(EventGridEvent::toImpl) .collectList() - .flatMap(list -> this.impl.publishEventsWithResponseAsync(this.hostname, list, context)); + .flatMap(list -> this.impl.publishEventsWithResponseAsync(this.hostname, list, + finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE))); } /** @@ -134,14 +163,20 @@ Mono> sendEventsWithResponse(Iterable events, Con */ @ServiceMethod(returns = ReturnType.SINGLE) public Mono> sendCloudEventsWithResponse(Iterable events) { + if (events == null) { + return monoError(logger, new NullPointerException("'events' cannot be null.")); + } return withContext(context -> sendCloudEventsWithResponse(events, context)); } Mono> sendCloudEventsWithResponse(Iterable events, Context context) { + final Context finalContext = context != null ? context : Context.NONE; + this.addCloudEventTracePlaceHolder(events); return Flux.fromIterable(events) .map(CloudEvent::toImpl) .collectList() - .flatMap(list -> this.impl.publishCloudEventEventsWithResponseAsync(this.hostname, list, context)); + .flatMap(list -> this.impl.publishCloudEventEventsWithResponseAsync(this.hostname, list, + finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE))); } /** @@ -152,12 +187,31 @@ Mono> sendCloudEventsWithResponse(Iterable events, Co */ @ServiceMethod(returns = ReturnType.SINGLE) public Mono> sendCustomEventsWithResponse(Iterable events) { + if (events == null) { + return monoError(logger, new NullPointerException("'events' cannot be null.")); + } return withContext(context -> sendCustomEventsWithResponse(events, context)); } Mono> sendCustomEventsWithResponse(Iterable events, Context context) { + final Context finalContext = context != null ? context : Context.NONE; return Flux.fromIterable(events) .collectList() - .flatMap(list -> this.impl.publishCustomEventEventsWithResponseAsync(this.hostname, list, context)); + .flatMap(list -> this.impl.publishCustomEventEventsWithResponseAsync(this.hostname, list, + finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE))); + } + + private void addCloudEventTracePlaceHolder(Iterable events) { + if (TracerProxy.isTracingEnabled()) { + for (CloudEvent event : events) { + if (event.getExtensionAttributes() == null || + (event.getExtensionAttributes().get(Constants.TRACE_PARENT) == null && + event.getExtensionAttributes().get(Constants.TRACE_STATE) == null)) { + + event.addExtensionAttribute(Constants.TRACE_PARENT, Constants.TRACE_PARENT_PLACEHOLDER_UUID); + event.addExtensionAttribute(Constants.TRACE_STATE, Constants.TRACE_STATE_PLACEHOLDER_UUID); + } + } + } } } diff --git a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/EventGridPublisherClientBuilder.java b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/EventGridPublisherClientBuilder.java index eda03f156985..90118359a7be 100644 --- a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/EventGridPublisherClientBuilder.java +++ b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/EventGridPublisherClientBuilder.java @@ -22,6 +22,8 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.core.util.serializer.JacksonAdapter; import com.azure.core.util.serializer.SerializerAdapter; +import com.azure.core.util.tracing.TracerProxy; +import com.azure.messaging.eventgrid.implementation.CloudEventTracingPipelinePolicy; import java.net.MalformedURLException; import java.net.URL; @@ -144,6 +146,9 @@ public EventGridPublisherAsyncClient buildAsyncClient() { HttpPolicyProviders.addAfterRetryPolicies(httpPipelinePolicies); + if (TracerProxy.isTracingEnabled()) { + httpPipelinePolicies.add(new CloudEventTracingPipelinePolicy()); + } httpPipelinePolicies.add(new HttpLoggingPolicy(httpLogOptions)); HttpPipeline buildPipeline = new HttpPipelineBuilder() diff --git a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/implementation/CloudEventTracingPipelinePolicy.java b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/implementation/CloudEventTracingPipelinePolicy.java new file mode 100644 index 000000000000..a16e44ce53e8 --- /dev/null +++ b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/implementation/CloudEventTracingPipelinePolicy.java @@ -0,0 +1,76 @@ +package com.azure.messaging.eventgrid.implementation; + +import com.azure.core.http.HttpHeader; +import com.azure.core.http.HttpPipelineCallContext; +import com.azure.core.http.HttpPipelineNextPolicy; +import com.azure.core.http.HttpRequest; +import com.azure.core.http.HttpResponse; +import com.azure.core.http.policy.HttpPipelinePolicy; +import com.azure.core.util.tracing.TracerProxy; +import reactor.core.publisher.Mono; + +import java.nio.charset.StandardCharsets; + +import com.azure.messaging.eventgrid.CloudEvent; +/** + * This pipeline policy should be added after OpenTelemetryPolicy in the http pipeline. + * + * It checks whether the {@link HttpRequest} headers have "traceparent" or "tracestate" and whether the serialized + * http body json string for a list of {@link CloudEvent} instances has place holders + * {@link Constants#TRACE_PARENT_PLACEHOLDER} or {@link Constants#TRACE_STATE_PLACEHOLDER}. + * The place holders will be replaced by the value from headers if the headers have "traceparent" or "tracestate", + * or be removed if the headers don't have. + * + * The place holders won't exist in the json string if the {@link TracerProxy#isTracingEnabled()} returns false. + */ +public class CloudEventTracingPipelinePolicy implements HttpPipelinePolicy { + @Override + public Mono process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) { + final HttpRequest request = context.getHttpRequest(); + final HttpHeader contentType = request.getHeaders().get(Constants.CONTENT_TYPE); + StringBuilder bodyStringBuilder = new StringBuilder(); + if (TracerProxy.isTracingEnabled() && contentType != null && + Constants.CLOUD_EVENT_CONTENT_TYPE.equals(contentType.getValue())) { + return request.getBody().map(byteBuffer -> bodyStringBuilder.append(new String(byteBuffer.array(), + StandardCharsets.UTF_8))) + .then(Mono.fromCallable(() -> replaceTracingPlaceHolder(request, bodyStringBuilder))) + .then(next.process()); + } + else { + return next.process(); + } + } + + /** + * + * @param request The {@link HttpRequest}, whose body will be mutated by replacing traceparent and tracestate + * placeholders. + * @param bodyStringBuilder The {@link StringBuilder} that contains the full HttpRequest body string. + * @return The new body string with the place holders replaced (if header has tracing) + * or removed (if header no tracing). + */ + static String replaceTracingPlaceHolder(HttpRequest request, StringBuilder bodyStringBuilder) { + final int traceParentPlaceHolderIndex = bodyStringBuilder.indexOf(Constants.TRACE_PARENT_PLACEHOLDER); + if (traceParentPlaceHolderIndex >= 0) { // There is "traceparent" placeholder in body, replace it. + final HttpHeader traceparentHeader = request.getHeaders().get(Constants.TRACE_PARENT); + bodyStringBuilder.replace(traceParentPlaceHolderIndex, + Constants.TRACE_PARENT_PLACEHOLDER.length() + traceParentPlaceHolderIndex, + traceparentHeader != null + ? String.format(",\"%s\":\"%s\"", Constants.TRACE_PARENT, traceparentHeader.getValue()) + : ""); + } + final int traceStatePlaceHolderIndex = bodyStringBuilder.indexOf(Constants.TRACE_STATE_PLACEHOLDER); + if (traceStatePlaceHolderIndex >= 0) { // There is "tracestate" placeholder in body, replace it. + final HttpHeader tracestateHeader = request.getHeaders().get(Constants.TRACE_STATE); + bodyStringBuilder.replace(traceStatePlaceHolderIndex, + Constants.TRACE_STATE_PLACEHOLDER.length() + traceStatePlaceHolderIndex, + tracestateHeader != null + ? String.format(",\"%s\":\"%s\"", Constants.TRACE_STATE, tracestateHeader.getValue()) + : ""); + } + String newBodyString = bodyStringBuilder.toString(); + request.setHeader(Constants.CONTENT_LENGTH, String.valueOf(newBodyString.length())); + request.setBody(newBodyString); + return newBodyString; + } +} diff --git a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/implementation/Constants.java b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/implementation/Constants.java new file mode 100644 index 000000000000..bb4b3b3b5f11 --- /dev/null +++ b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/implementation/Constants.java @@ -0,0 +1,17 @@ +package com.azure.messaging.eventgrid.implementation; + +public class Constants { + public static final String CONTENT_TYPE = "Content-Type"; + public static final String CONTENT_LENGTH = "Content-Length"; + public static final String CLOUD_EVENT_CONTENT_TYPE = "application/cloudevents-batch+json; charset=utf-8"; + public static final String TRACE_PARENT = "traceparent"; + public static final String TRACE_STATE = "tracestate"; + public static final String TRACE_PARENT_PLACEHOLDER_UUID = "TP-14b6b15b-74b6-4178-847e-d142aa2727b2"; + public static final String TRACE_STATE_PLACEHOLDER_UUID = "TS-14b6b15b-74b6-4178-847e-d142aa2727b2"; + public static final String TRACE_PARENT_PLACEHOLDER = ",\"" + TRACE_PARENT + "\":\"TP-14b6b15b-74b6-4178-847e-d142aa2727b2\""; + public static final String TRACE_STATE_PLACEHOLDER = ",\"" + TRACE_STATE + "\":\"TS-14b6b15b-74b6-4178-847e-d142aa2727b2\""; + + // Please see here + // for more information on Azure resource provider namespaces. + public static final String EVENT_GRID_TRACING_NAMESPACE_VALUE = "Microsoft.EventGrid"; +} diff --git a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/systemevents/AcsChatMessageEventBaseProperties.java b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/systemevents/AcsChatMessageEventBaseProperties.java index b6548af50f58..69844fa110c7 100644 --- a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/systemevents/AcsChatMessageEventBaseProperties.java +++ b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/systemevents/AcsChatMessageEventBaseProperties.java @@ -45,7 +45,7 @@ public class AcsChatMessageEventBaseProperties extends AcsChatEventBasePropertie * The version of the message */ @JsonProperty(value = "version") - private Integer version; + private Long version; /** * Get the messageId property: The chat message id. @@ -152,7 +152,7 @@ public AcsChatMessageEventBaseProperties setType(String type) { * * @return the version value. */ - public Integer getVersion() { + public Long getVersion() { return this.version; } @@ -162,7 +162,7 @@ public Integer getVersion() { * @param version the version value to set. * @return the AcsChatMessageEventBaseProperties object itself. */ - public AcsChatMessageEventBaseProperties setVersion(Integer version) { + public AcsChatMessageEventBaseProperties setVersion(Long version) { this.version = version; return this; } diff --git a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/systemevents/AcsChatThreadEventBaseProperties.java b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/systemevents/AcsChatThreadEventBaseProperties.java index b3f18b0b3d80..32eb71f2c9f4 100644 --- a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/systemevents/AcsChatThreadEventBaseProperties.java +++ b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/com/azure/messaging/eventgrid/systemevents/AcsChatThreadEventBaseProperties.java @@ -21,7 +21,7 @@ public class AcsChatThreadEventBaseProperties extends AcsChatEventBaseProperties * The version of the thread */ @JsonProperty(value = "version") - private Integer version; + private Long version; /** * Get the createTime property: The original creation time of the thread. @@ -48,7 +48,7 @@ public AcsChatThreadEventBaseProperties setCreateTime(OffsetDateTime createTime) * * @return the version value. */ - public Integer getVersion() { + public Long getVersion() { return this.version; } @@ -58,7 +58,7 @@ public Integer getVersion() { * @param version the version value to set. * @return the AcsChatThreadEventBaseProperties object itself. */ - public AcsChatThreadEventBaseProperties setVersion(Integer version) { + public AcsChatThreadEventBaseProperties setVersion(Long version) { this.version = version; return this; } diff --git a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/module-info.java b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/module-info.java index 4d700ac5fc4c..de698998b4b4 100644 --- a/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/module-info.java +++ b/sdk/eventgrid/azure-messaging-eventgrid/src/main/java/module-info.java @@ -7,6 +7,7 @@ exports com.azure.messaging.eventgrid; exports com.azure.messaging.eventgrid.systemevents; + opens com.azure.messaging.eventgrid.implementation; opens com.azure.messaging.eventgrid.implementation.models to com.fasterxml.jackson.databind; opens com.azure.messaging.eventgrid.systemevents to com.fasterxml.jackson.databind; } diff --git a/sdk/eventgrid/azure-messaging-eventgrid/src/test/java/com/azure/messaging/eventgrid/implementation/CloudEventTracingPipelinePolicyTests.java b/sdk/eventgrid/azure-messaging-eventgrid/src/test/java/com/azure/messaging/eventgrid/implementation/CloudEventTracingPipelinePolicyTests.java new file mode 100644 index 000000000000..255f42b4424d --- /dev/null +++ b/sdk/eventgrid/azure-messaging-eventgrid/src/test/java/com/azure/messaging/eventgrid/implementation/CloudEventTracingPipelinePolicyTests.java @@ -0,0 +1,73 @@ +package com.azure.messaging.eventgrid.implementation; + +import com.azure.core.http.HttpMethod; +import com.azure.core.http.HttpRequest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.MalformedURLException; +import java.net.URL; + +public class CloudEventTracingPipelinePolicyTests { + private HttpRequest httpRequest; + + @BeforeEach + public void setup() throws MalformedURLException { + httpRequest = new HttpRequest(HttpMethod.POST, new URL("https://something.com")); + } + + @Test + void processBodyWithNoHeader() { + String testBodyString = "[{\"id\":\"313ac785-2dca-467e-a6a7-623f1baa2890\",\"source\":\"source\"," + + "\"type\":\"json\",\"specversion\":\"1.0\",\"tracestate\":\"TS-14b6b15b-74b6-4178-847e-d142aa2727b2\"," + + "\"traceparent\":\"TP-14b6b15b-74b6-4178-847e-d142aa2727b2\"}]"; + String expectedNewBody = "[{\"id\":\"313ac785-2dca-467e-a6a7-623f1baa2890\",\"source\":\"source\"," + + "\"type\":\"json\",\"specversion\":\"1.0\"}]"; + + httpRequest.setBody(testBodyString); + httpRequest.setHeader(Constants.CONTENT_LENGTH, testBodyString); + String newBody = CloudEventTracingPipelinePolicy.replaceTracingPlaceHolder( + httpRequest, new StringBuilder(testBodyString)); + Assertions.assertEquals(expectedNewBody, newBody); + Assertions.assertEquals(httpRequest.getHeaders().get(Constants.CONTENT_LENGTH).getValue(), + String.valueOf(newBody.length())); + } + + @Test + void processBodyWithTraceParentHeader() { + httpRequest.setHeader(Constants.TRACE_PARENT, "aTraceParent"); + String testBodyString = "[{\"id\":\"313ac785-2dca-467e-a6a7-623f1baa2890\",\"source\":\"source\"," + + "\"type\":\"json\",\"specversion\":\"1.0\",\"tracestate\":\"TS-14b6b15b-74b6-4178-847e-d142aa2727b2\"," + + "\"traceparent\":\"TP-14b6b15b-74b6-4178-847e-d142aa2727b2\"}]"; + String expectedNewBody = "[{\"id\":\"313ac785-2dca-467e-a6a7-623f1baa2890\",\"source\":\"source\"," + + "\"type\":\"json\",\"specversion\":\"1.0\"," + + "\"traceparent\":\"aTraceParent\"}]"; + httpRequest.setBody(testBodyString); + httpRequest.setHeader(Constants.CONTENT_LENGTH, testBodyString); + String newBody = CloudEventTracingPipelinePolicy.replaceTracingPlaceHolder( + httpRequest, new StringBuilder(testBodyString)); + Assertions.assertEquals(expectedNewBody, newBody); + Assertions.assertEquals(httpRequest.getHeaders().get(Constants.CONTENT_LENGTH).getValue(), + String.valueOf(newBody.length())); + } + + @Test + void processBodyWithTraceStateHeader() { + httpRequest.setHeader(Constants.TRACE_STATE, "aTraceState"); + String testBodyString = "[{\"id\":\"313ac785-2dca-467e-a6a7-623f1baa2890\",\"source\":\"source\"," + + "\"type\":\"json\",\"specversion\":\"1.0\",\"tracestate\":\"TS-14b6b15b-74b6-4178-847e-d142aa2727b2\"," + + "\"traceparent\":\"TP-14b6b15b-74b6-4178-847e-d142aa2727b2\"}]"; + String expectedNewBody = "[{\"id\":\"313ac785-2dca-467e-a6a7-623f1baa2890\",\"source\":\"source\"," + + "\"type\":\"json\",\"specversion\":\"1.0\"," + + "\"tracestate\":\"aTraceState\"}]"; + + httpRequest.setBody(testBodyString); + httpRequest.setHeader(Constants.CONTENT_LENGTH, testBodyString); + String newBody = CloudEventTracingPipelinePolicy.replaceTracingPlaceHolder( + httpRequest, new StringBuilder(testBodyString)); + Assertions.assertEquals(expectedNewBody, newBody); + Assertions.assertEquals(httpRequest.getHeaders().get(Constants.CONTENT_LENGTH).getValue(), + String.valueOf(newBody.length())); + } +}