Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -336,6 +337,9 @@ public String getSpecVersion() {
* @return the extension attributes as an unmodifiable map.
*/
public Map<String, Object> getExtensionAttributes() {
if (this.cloudEvent.getAdditionalProperties() == null) {
return null;
}
return Collections.unmodifiableMap(this.cloudEvent.getAdditionalProperties());
}

Expand All @@ -348,6 +352,9 @@ public Map<String, Object> getExtensionAttributes() {
* @return the cloud event itself.
*/
public CloudEvent addExtensionAttribute(String name, Object value) {
if (this.cloudEvent.getAdditionalProperties() == null) {
this.cloudEvent.setAdditionalProperties(new HashMap<>());
}
this.cloudEvent.getAdditionalProperties().put(name.toLowerCase(Locale.ENGLISH), value);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.SerializerAdapter;
import com.azure.core.util.tracing.TracerProxy;
import com.azure.messaging.eventgrid.implementation.Constants;
import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImpl;
import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImplBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;

/**
* A service client that publishes events to an EventGrid topic or domain. Use {@link EventGridPublisherClientBuilder}
Expand All @@ -33,6 +38,9 @@ public final class EventGridPublisherAsyncClient {

private final EventGridServiceVersion serviceVersion;

private final ClientLogger logger = new ClientLogger(EventGridPublisherAsyncClient.class);


EventGridPublisherAsyncClient(HttpPipeline pipeline, String hostname, SerializerAdapter serializerAdapter,
EventGridServiceVersion serviceVersion) {
this.impl = new EventGridPublisherClientImplBuilder()
Expand Down Expand Up @@ -63,14 +71,19 @@ public EventGridServiceVersion getServiceVersion() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendEvents(Iterable<EventGridEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendEvents(events, context));
}

Mono<Void> sendEvents(Iterable<EventGridEvent> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.map(EventGridEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishEventsAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishEventsAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

/**
Expand All @@ -81,14 +94,20 @@ Mono<Void> sendEvents(Iterable<EventGridEvent> events, Context context) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendCloudEvents(Iterable<CloudEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCloudEvents(events, context));
}

Mono<Void> sendCloudEvents(Iterable<CloudEvent> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
this.addCloudEventTracePlaceHolder(events);
return Flux.fromIterable(events)
.map(CloudEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishCloudEventEventsAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishCloudEventEventsAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

/**
Expand All @@ -99,13 +118,18 @@ Mono<Void> sendCloudEvents(Iterable<CloudEvent> events, Context context) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendCustomEvents(Iterable<Object> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCustomEvents(events, context));
}

Mono<Void> sendCustomEvents(Iterable<Object> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.collectList()
.flatMap(list -> this.impl.publishCustomEventEventsAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishCustomEventEventsAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

/**
Expand All @@ -116,14 +140,19 @@ Mono<Void> sendCustomEvents(Iterable<Object> events, Context context) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendEventsWithResponse(Iterable<EventGridEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendEventsWithResponse(events, context));
}

Mono<Response<Void>> sendEventsWithResponse(Iterable<EventGridEvent> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.map(EventGridEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishEventsWithResponseAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishEventsWithResponseAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

/**
Expand All @@ -134,14 +163,20 @@ Mono<Response<Void>> sendEventsWithResponse(Iterable<EventGridEvent> events, Con
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCloudEventsWithResponse(events, context));
}

Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
this.addCloudEventTracePlaceHolder(events);
return Flux.fromIterable(events)
.map(CloudEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishCloudEventEventsWithResponseAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishCloudEventEventsWithResponseAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

/**
Expand All @@ -152,12 +187,31 @@ Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events, Co
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendCustomEventsWithResponse(Iterable<Object> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCustomEventsWithResponse(events, context));
}

Mono<Response<Void>> sendCustomEventsWithResponse(Iterable<Object> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.collectList()
.flatMap(list -> this.impl.publishCustomEventEventsWithResponseAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishCustomEventEventsWithResponseAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

private void addCloudEventTracePlaceHolder(Iterable<CloudEvent> events) {
if (TracerProxy.isTracingEnabled()) {
for (CloudEvent event : events) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

events can be null since the public APIs don't seem to check. It might be better to have the null check and include an error message to indicate that events cannot be null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added null check like in EventHubs

if (event.getExtensionAttributes() == null ||
(event.getExtensionAttributes().get(Constants.TRACE_PARENT) == null &&
event.getExtensionAttributes().get(Constants.TRACE_STATE) == null)) {

event.addExtensionAttribute(Constants.TRACE_PARENT, Constants.TRACE_PARENT_PLACEHOLDER_UUID);
event.addExtensionAttribute(Constants.TRACE_STATE, Constants.TRACE_STATE_PLACEHOLDER_UUID);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.JacksonAdapter;
import com.azure.core.util.serializer.SerializerAdapter;
import com.azure.core.util.tracing.TracerProxy;
import com.azure.messaging.eventgrid.implementation.CloudEventTracingPipelinePolicy;

import java.net.MalformedURLException;
import java.net.URL;
Expand Down Expand Up @@ -144,6 +146,9 @@ public EventGridPublisherAsyncClient buildAsyncClient() {

HttpPolicyProviders.addAfterRetryPolicies(httpPipelinePolicies);

if (TracerProxy.isTracingEnabled()) {
httpPipelinePolicies.add(new CloudEventTracingPipelinePolicy());
}
httpPipelinePolicies.add(new HttpLoggingPolicy(httpLogOptions));

HttpPipeline buildPipeline = new HttpPipelineBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.azure.messaging.eventgrid.implementation;

import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.tracing.TracerProxy;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;

import com.azure.messaging.eventgrid.CloudEvent;
/**
* This pipeline policy should be added after OpenTelemetryPolicy in the http pipeline.
*
* It checks whether the {@link HttpRequest} headers have "traceparent" or "tracestate" and whether the serialized
* http body json string for a list of {@link CloudEvent} instances has place holders
* {@link Constants#TRACE_PARENT_PLACEHOLDER} or {@link Constants#TRACE_STATE_PLACEHOLDER}.
* The place holders will be replaced by the value from headers if the headers have "traceparent" or "tracestate",
* or be removed if the headers don't have.
*
* The place holders won't exist in the json string if the {@link TracerProxy#isTracingEnabled()} returns false.
*/
public class CloudEventTracingPipelinePolicy implements HttpPipelinePolicy {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@Override
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
final HttpRequest request = context.getHttpRequest();
final HttpHeader contentType = request.getHeaders().get(Constants.CONTENT_TYPE);
StringBuilder bodyStringBuilder = new StringBuilder();
if (TracerProxy.isTracingEnabled() && contentType != null &&
Constants.CLOUD_EVENT_CONTENT_TYPE.equals(contentType.getValue())) {
return request.getBody().map(byteBuffer -> bodyStringBuilder.append(new String(byteBuffer.array(),
StandardCharsets.UTF_8)))
.then(Mono.fromCallable(() -> replaceTracingPlaceHolder(request, bodyStringBuilder)))
.then(next.process());
}
else {
return next.process();
}
}

/**
*
* @param request The {@link HttpRequest}, whose body will be mutated by replacing traceparent and tracestate
* placeholders.
* @param bodyStringBuilder The {@link StringBuilder} that contains the full HttpRequest body string.
* @return The new body string with the place holders replaced (if header has tracing)
* or removed (if header no tracing).
*/
static String replaceTracingPlaceHolder(HttpRequest request, StringBuilder bodyStringBuilder) {
final int traceParentPlaceHolderIndex = bodyStringBuilder.indexOf(Constants.TRACE_PARENT_PLACEHOLDER);
if (traceParentPlaceHolderIndex >= 0) { // There is "traceparent" placeholder in body, replace it.
final HttpHeader traceparentHeader = request.getHeaders().get(Constants.TRACE_PARENT);
bodyStringBuilder.replace(traceParentPlaceHolderIndex,
Constants.TRACE_PARENT_PLACEHOLDER.length() + traceParentPlaceHolderIndex,
traceparentHeader != null
? String.format(",\"%s\":\"%s\"", Constants.TRACE_PARENT, traceparentHeader.getValue())
: "");
}
final int traceStatePlaceHolderIndex = bodyStringBuilder.indexOf(Constants.TRACE_STATE_PLACEHOLDER);
if (traceStatePlaceHolderIndex >= 0) { // There is "tracestate" placeholder in body, replace it.
final HttpHeader tracestateHeader = request.getHeaders().get(Constants.TRACE_STATE);
bodyStringBuilder.replace(traceStatePlaceHolderIndex,
Constants.TRACE_STATE_PLACEHOLDER.length() + traceStatePlaceHolderIndex,
tracestateHeader != null
? String.format(",\"%s\":\"%s\"", Constants.TRACE_STATE, tracestateHeader.getValue())
: "");
}
String newBodyString = bodyStringBuilder.toString();
request.setHeader(Constants.CONTENT_LENGTH, String.valueOf(newBodyString.length()));
request.setBody(newBodyString);
return newBodyString;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.azure.messaging.eventgrid.implementation;

public class Constants {
public static final String CONTENT_TYPE = "Content-Type";
public static final String CONTENT_LENGTH = "Content-Length";
public static final String CLOUD_EVENT_CONTENT_TYPE = "application/cloudevents-batch+json; charset=utf-8";
public static final String TRACE_PARENT = "traceparent";
public static final String TRACE_STATE = "tracestate";
public static final String TRACE_PARENT_PLACEHOLDER_UUID = "TP-14b6b15b-74b6-4178-847e-d142aa2727b2";
public static final String TRACE_STATE_PLACEHOLDER_UUID = "TS-14b6b15b-74b6-4178-847e-d142aa2727b2";
public static final String TRACE_PARENT_PLACEHOLDER = ",\"" + TRACE_PARENT + "\":\"TP-14b6b15b-74b6-4178-847e-d142aa2727b2\"";
public static final String TRACE_STATE_PLACEHOLDER = ",\"" + TRACE_STATE + "\":\"TS-14b6b15b-74b6-4178-847e-d142aa2727b2\"";

// Please see <a href=https://docs.microsoft.com/en-us/azure/azure-resource-manager/management/azure-services-resource-providers>here</a>
// for more information on Azure resource provider namespaces.
public static final String EVENT_GRID_TRACING_NAMESPACE_VALUE = "Microsoft.EventGrid";
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class AcsChatMessageEventBaseProperties extends AcsChatEventBasePropertie
* The version of the message
*/
@JsonProperty(value = "version")
private Integer version;
private Long version;

/**
* Get the messageId property: The chat message id.
Expand Down Expand Up @@ -152,7 +152,7 @@ public AcsChatMessageEventBaseProperties setType(String type) {
*
* @return the version value.
*/
public Integer getVersion() {
public Long getVersion() {
return this.version;
}

Expand All @@ -162,7 +162,7 @@ public Integer getVersion() {
* @param version the version value to set.
* @return the AcsChatMessageEventBaseProperties object itself.
*/
public AcsChatMessageEventBaseProperties setVersion(Integer version) {
public AcsChatMessageEventBaseProperties setVersion(Long version) {
this.version = version;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class AcsChatThreadEventBaseProperties extends AcsChatEventBaseProperties
* The version of the thread
*/
@JsonProperty(value = "version")
private Integer version;
private Long version;

/**
* Get the createTime property: The original creation time of the thread.
Expand All @@ -48,7 +48,7 @@ public AcsChatThreadEventBaseProperties setCreateTime(OffsetDateTime createTime)
*
* @return the version value.
*/
public Integer getVersion() {
public Long getVersion() {
return this.version;
}

Expand All @@ -58,7 +58,7 @@ public Integer getVersion() {
* @param version the version value to set.
* @return the AcsChatThreadEventBaseProperties object itself.
*/
public AcsChatThreadEventBaseProperties setVersion(Integer version) {
public AcsChatThreadEventBaseProperties setVersion(Long version) {
this.version = version;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
exports com.azure.messaging.eventgrid;
exports com.azure.messaging.eventgrid.systemevents;

opens com.azure.messaging.eventgrid.implementation;
opens com.azure.messaging.eventgrid.implementation.models to com.fasterxml.jackson.databind;
opens com.azure.messaging.eventgrid.systemevents to com.fasterxml.jackson.databind;
}
Loading