Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions sdk/eventgrid/azure-messaging-eventgrid/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,29 @@
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
<profile>
<id>java-lts</id>
<activation>
<jdk>[11,)</jdk>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-surefire-plugin;external_dependency} -->
<configuration>
<argLine>
--add-opens com.azure.core.amqp/com.azure.core.amqp.implementation=ALL-UNNAMED
--add-opens com.azure.core.amqp/com.azure.core.amqp.implementation.handler=ALL-UNNAMED
--add-reads com.azure.core.amqp=ALL-UNNAMED
</argLine>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -336,6 +337,9 @@ public String getSpecVersion() {
* @return the extension attributes as an unmodifiable map.
*/
public Map<String, Object> getExtensionAttributes() {
if (this.cloudEvent.getAdditionalProperties() == null) {
return null;
}
return Collections.unmodifiableMap(this.cloudEvent.getAdditionalProperties());
}

Expand All @@ -348,6 +352,9 @@ public Map<String, Object> getExtensionAttributes() {
* @return the cloud event itself.
*/
public CloudEvent addExtensionAttribute(String name, Object value) {
if (this.cloudEvent.getAdditionalProperties() == null) {
this.cloudEvent.setAdditionalProperties(new HashMap<>());
}
this.cloudEvent.getAdditionalProperties().put(name.toLowerCase(Locale.ENGLISH), value);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.SerializerAdapter;
import com.azure.core.util.tracing.TracerProxy;
import com.azure.messaging.eventgrid.implementation.Constants;
import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImpl;
import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImplBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;

/**
* A service client that publishes events to an EventGrid topic or domain. Use {@link EventGridPublisherClientBuilder}
Expand All @@ -33,6 +38,9 @@ public final class EventGridPublisherAsyncClient {

private final EventGridServiceVersion serviceVersion;

private final ClientLogger logger = new ClientLogger(EventGridPublisherAsyncClient.class);


EventGridPublisherAsyncClient(HttpPipeline pipeline, String hostname, SerializerAdapter serializerAdapter,
EventGridServiceVersion serviceVersion) {
this.impl = new EventGridPublisherClientImplBuilder()
Expand Down Expand Up @@ -63,14 +71,18 @@ public EventGridServiceVersion getServiceVersion() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendEvents(Iterable<EventGridEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendEvents(events, context));
}

Mono<Void> sendEvents(Iterable<EventGridEvent> events, Context context) {
return Flux.fromIterable(events)
.map(EventGridEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishEventsAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishEventsAsync(this.hostname,
list, context.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have a null check for context before using it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This internal api is called by a public API, which calls FluxUtil.withContext to create a Context. So I assume the context won't be null.
In debugging, I see it's an empty Context instance instead of null.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YijunXieMS - this method is also called from sync client and the user can pass a null context.

User can call sendEventsWithResponse(events, null)

    public Response<Void> sendEventsWithResponse(Iterable<EventGridEvent> events, Context context) {
        return asyncClient.sendEventsWithResponse(events, context).block();
    }

}

/**
Expand All @@ -81,14 +93,19 @@ Mono<Void> sendEvents(Iterable<EventGridEvent> events, Context context) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendCloudEvents(Iterable<CloudEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCloudEvents(events, context));
}

Mono<Void> sendCloudEvents(Iterable<CloudEvent> events, Context context) {
this.addCloudEventTracePlaceHolder(events);
return Flux.fromIterable(events)
.map(CloudEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishCloudEventEventsAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishCloudEventEventsAsync(this.hostname, list,
context.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to check context for null on the calling function to avoid NPE when doing addData

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does withContext guarantee context to be not null?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases, we end up calling this from the sync client and hence the context could be null. Need a check there when passing.

}

/**
Expand All @@ -99,13 +116,17 @@ Mono<Void> sendCloudEvents(Iterable<CloudEvent> events, Context context) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendCustomEvents(Iterable<Object> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCustomEvents(events, context));
}

Mono<Void> sendCustomEvents(Iterable<Object> events, Context context) {
return Flux.fromIterable(events)
.collectList()
.flatMap(list -> this.impl.publishCustomEventEventsAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishCustomEventEventsAsync(this.hostname, list,
context.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

/**
Expand All @@ -116,14 +137,18 @@ Mono<Void> sendCustomEvents(Iterable<Object> events, Context context) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendEventsWithResponse(Iterable<EventGridEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendEventsWithResponse(events, context));
}

Mono<Response<Void>> sendEventsWithResponse(Iterable<EventGridEvent> events, Context context) {
return Flux.fromIterable(events)
.map(EventGridEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishEventsWithResponseAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishEventsWithResponseAsync(this.hostname, list,
context.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my comment about user passing null for context applies to this method.

}

/**
Expand All @@ -134,14 +159,19 @@ Mono<Response<Void>> sendEventsWithResponse(Iterable<EventGridEvent> events, Con
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCloudEventsWithResponse(events, context));
}

Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events, Context context) {
this.addCloudEventTracePlaceHolder(events);
return Flux.fromIterable(events)
.map(CloudEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishCloudEventEventsWithResponseAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishCloudEventEventsWithResponseAsync(this.hostname, list,
context.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

/**
Expand All @@ -152,12 +182,30 @@ Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events, Co
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendCustomEventsWithResponse(Iterable<Object> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCustomEventsWithResponse(events, context));
}

Mono<Response<Void>> sendCustomEventsWithResponse(Iterable<Object> events, Context context) {
return Flux.fromIterable(events)
.collectList()
.flatMap(list -> this.impl.publishCustomEventEventsWithResponseAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishCustomEventEventsWithResponseAsync(this.hostname, list,
context.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

private void addCloudEventTracePlaceHolder(Iterable<CloudEvent> events) {
if (TracerProxy.isTracingEnabled()) {
for (CloudEvent event : events) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

events can be null since the public APIs don't seem to check. It might be better to have the null check and include an error message to indicate that events cannot be null.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added null check like in EventHubs

if (event.getExtensionAttributes() == null ||
(event.getExtensionAttributes().get(Constants.TRACE_PARENT) == null &&
event.getExtensionAttributes().get(Constants.TRACE_STATE) == null)) {

event.addExtensionAttribute(Constants.TRACE_PARENT, Constants.TRACE_PARENT_PLACEHOLDER_UUID);
event.addExtensionAttribute(Constants.TRACE_STATE, Constants.TRACE_STATE_PLACEHOLDER_UUID);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.JacksonAdapter;
import com.azure.core.util.serializer.SerializerAdapter;
import com.azure.core.util.tracing.TracerProxy;
import com.azure.messaging.eventgrid.implementation.CloudEventTracingPipelinePolicy;

import java.net.MalformedURLException;
import java.net.URL;
Expand Down Expand Up @@ -144,6 +146,9 @@ public EventGridPublisherAsyncClient buildAsyncClient() {

HttpPolicyProviders.addAfterRetryPolicies(httpPipelinePolicies);

if (TracerProxy.isTracingEnabled()) {
httpPipelinePolicies.add(new CloudEventTracingPipelinePolicy());
}
httpPipelinePolicies.add(new HttpLoggingPolicy(httpLogOptions));

HttpPipeline buildPipeline = new HttpPipelineBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.azure.messaging.eventgrid.implementation;

import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.tracing.TracerProxy;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;

import com.azure.messaging.eventgrid.CloudEvent;
/**
* This pipeline policy should be added after OpenTelemetryPolicy in the http pipeline.
*
* It checks whether the {@link HttpRequest} headers have "traceparent" or "tracestate" and whether the serialized
* http body json string for a list of {@link CloudEvent} instances has place holders
* {@link Constants#TRACE_PARENT_PLACEHOLDER} or {@link Constants#TRACE_STATE_PLACEHOLDER}.
* The place holders will be replaced by the value from headers if the headers have "traceparent" or "tracestate",
* or be removed if the headers don't have.
*
* The place holders won't exist in the json string if the {@link TracerProxy#isTracingEnabled()} returns false.
*/
public class CloudEventTracingPipelinePolicy implements HttpPipelinePolicy {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add javadoc

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@Override
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
final HttpRequest request = context.getHttpRequest();
final HttpHeader contentType = request.getHeaders().get(Constants.CONTENT_TYPE);
StringBuilder bodyStringBuilder = new StringBuilder();
if (TracerProxy.isTracingEnabled() && contentType != null &&
Constants.CLOUD_EVENT_CONTENT_TYPE.equals(contentType.getValue())) {
return request.getBody().map(byteBuffer -> bodyStringBuilder.append(new String(byteBuffer.array(),
StandardCharsets.UTF_8)))
.then(Mono.fromCallable(() -> replaceTracingPlaceHolder(request, bodyStringBuilder)))
.then(next.process());
}
else {
return next.process();
}
}

/**
*
* @param request The {@link HttpRequest}, whose body will be mutated by replacing traceparent and tracestate
* placeholders.
* @param bodyStringBuilder The {@link StringBuilder} that contains the full HttpRequest body string.
* @return The new body string with the place holders replaced (if header has tracing)
* or removed (if header no tracing).
*/
static String replaceTracingPlaceHolder(HttpRequest request, StringBuilder bodyStringBuilder) {
final int traceParentPlaceHolderIndex = bodyStringBuilder.indexOf(Constants.TRACE_PARENT_PLACEHOLDER);
if (traceParentPlaceHolderIndex >= 0) { // There is "traceparent" placeholder in body, replace it.
final HttpHeader traceparentHeader = request.getHeaders().get(Constants.TRACE_PARENT);
bodyStringBuilder.replace(traceParentPlaceHolderIndex,
Constants.TRACE_PARENT_PLACEHOLDER.length() + traceParentPlaceHolderIndex,
traceparentHeader != null
? String.format(",\"%s\":\"%s\"", Constants.TRACE_PARENT, traceparentHeader.getValue())
: "");
Comment thread
samvaity marked this conversation as resolved.
}
final int traceStatePlaceHolderIndex = bodyStringBuilder.indexOf(Constants.TRACE_STATE_PLACEHOLDER);
if (traceStatePlaceHolderIndex >= 0) { // There is "tracestate" placeholder in body, replace it.
final HttpHeader tracestateHeader = request.getHeaders().get(Constants.TRACE_STATE);
bodyStringBuilder.replace(traceStatePlaceHolderIndex,
Constants.TRACE_STATE_PLACEHOLDER.length() + traceStatePlaceHolderIndex,
tracestateHeader != null
? String.format(",\"%s\":\"%s\"", Constants.TRACE_STATE, tracestateHeader.getValue())
: "");
}
String newBodyString = bodyStringBuilder.toString();
request.setHeader(Constants.CONTENT_LENGTH, String.valueOf(newBodyString.length()));
request.setBody(newBodyString);
return newBodyString;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.azure.messaging.eventgrid.implementation;

public class Constants {
public static final String CONTENT_TYPE = "Content-Type";
public static final String CONTENT_LENGTH = "Content-Length";
public static final String CLOUD_EVENT_CONTENT_TYPE = "application/cloudevents-batch+json; charset=utf-8";
public static final String TRACE_PARENT = "traceparent";
public static final String TRACE_STATE = "tracestate";
public static final String TRACE_PARENT_PLACEHOLDER_UUID = "TP-14b6b15b-74b6-4178-847e-d142aa2727b2";
public static final String TRACE_STATE_PLACEHOLDER_UUID = "TS-14b6b15b-74b6-4178-847e-d142aa2727b2";
public static final String TRACE_PARENT_PLACEHOLDER = ",\"" + TRACE_PARENT + "\":\"TP-14b6b15b-74b6-4178-847e-d142aa2727b2\"";
public static final String TRACE_STATE_PLACEHOLDER = ",\"" + TRACE_STATE + "\":\"TS-14b6b15b-74b6-4178-847e-d142aa2727b2\"";

// Please see <a href=https://docs.microsoft.com/en-us/azure/azure-resource-manager/management/azure-services-resource-providers>here</a>
// for more information on Azure resource provider namespaces.
public static final String EVENT_GRID_TRACING_NAMESPACE_VALUE = "Microsoft.EventGrid";
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class AcsChatMessageEventBaseProperties extends AcsChatEventBasePropertie
* The version of the message
*/
@JsonProperty(value = "version")
private Integer version;
private Long version;
Comment thread
samvaity marked this conversation as resolved.

/**
* Get the messageId property: The chat message id.
Expand Down Expand Up @@ -152,7 +152,7 @@ public AcsChatMessageEventBaseProperties setType(String type) {
*
* @return the version value.
*/
public Integer getVersion() {
public Long getVersion() {
return this.version;
}

Expand All @@ -162,7 +162,7 @@ public Integer getVersion() {
* @param version the version value to set.
* @return the AcsChatMessageEventBaseProperties object itself.
*/
public AcsChatMessageEventBaseProperties setVersion(Integer version) {
public AcsChatMessageEventBaseProperties setVersion(Long version) {
this.version = version;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class AcsChatThreadEventBaseProperties extends AcsChatEventBaseProperties
* The version of the thread
*/
@JsonProperty(value = "version")
private Integer version;
private Long version;

/**
* Get the createTime property: The original creation time of the thread.
Expand All @@ -48,7 +48,7 @@ public AcsChatThreadEventBaseProperties setCreateTime(OffsetDateTime createTime)
*
* @return the version value.
*/
public Integer getVersion() {
public Long getVersion() {
return this.version;
}

Expand All @@ -58,7 +58,7 @@ public Integer getVersion() {
* @param version the version value to set.
* @return the AcsChatThreadEventBaseProperties object itself.
*/
public AcsChatThreadEventBaseProperties setVersion(Integer version) {
public AcsChatThreadEventBaseProperties setVersion(Long version) {
this.version = version;
return this;
}
Expand Down
Loading