Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
0efe342
Fix issue#21717 Extend EventData with AmqpAnnotatedMessage by lihong …
v-hongli1 Jun 16, 2021
8d33358
Fixed issue#21717 Extend EventData with AmqpAnnotatedMessage by lihon…
v-hongli1 Jun 16, 2021
837ed79
Fixed ci exception (build) for issue#21717 Extend EventData with Amqp…
v-hongli1 Jun 16, 2021
070e4a7
Fixed ci exception (Test case) for issue#21717 Extend EventData with …
v-hongli1 Jun 16, 2021
56a6f9b
Fixed ci exception (Run SpotBugs, Checkstyle, RevApi, and Javadoc) fo…
v-hongli1 Jun 16, 2021
b5bf879
Fixed ci exception (Run SpotBugs, Checkstyle, RevApi, and Javadoc) fo…
v-hongli1 Jun 16, 2021
a77eae0
Fixed ci exception (Run SpotBugs, Checkstyle, RevApi, and Javadoc) fo…
v-hongli1 Jun 16, 2021
fe37756
Fixed comment exception (Run SpotBugs, Checkstyle, RevApi, and Javado…
v-hongli1 Jun 30, 2021
d96cf39
Fixed comment exception (Run SpotBugs, Checkstyle, RevApi, and Javado…
v-hongli1 Jun 30, 2021
a3f0137
Revert change in migration-guide.
conniey Jul 1, 2021
609a4dd
Revert EventData
conniey Jul 2, 2021
16c6e1c
Revert EventHubMessageSerializer
conniey Jul 2, 2021
6d7e080
Adds MessageUtils class.
conniey Jul 2, 2021
4eb1cfd
Adding overload to take AmqpAnnotatedMessage.
conniey Jul 2, 2021
1493566
Update EventData to take AmqpAnnotatedMessage as a body.
conniey Jul 3, 2021
61a92c0
Adds contentType, correlationId, messageId to EventData
conniey Jul 3, 2021
aa2c29a
Fixing build break in EventHubMessageSerializer
conniey Jul 3, 2021
757f168
Fixing test breaks.
conniey Jul 3, 2021
3c14bf6
Update MessageSerializer to add in systemProperties.
conniey Jul 4, 2021
9cb5dd8
Make EventData more permissive when adding items to the map.
conniey Jul 4, 2021
4e2c819
(WIP)
conniey Jul 5, 2021
7587e91
Unifying logic for AMQP message conversion.
conniey Jul 6, 2021
e28aa67
Revert changes in tests.
conniey Jul 6, 2021
847562b
Only add logic to serialize when properties set.
conniey Jul 6, 2021
7f180bf
Aggregate data for checking if objects are the same.
conniey Jul 7, 2021
b71441c
Update serializer test to also check AmqpRawMessage
conniey Jul 7, 2021
456d0ba
Fix assertion in SystemProperties test.
conniey Jul 7, 2021
a3ddaa9
Revert changes to InteropAmqpPropertiesTest.
conniey Jul 7, 2021
ff884b0
Adding comment to EventData.
conniey Jul 7, 2021
660ce3c
Add extra assertions for RawAmqpMessage.
conniey Jul 7, 2021
65dfd58
Add additional tests for message conversions.
conniey Jul 7, 2021
31a1e45
Fixing spacing.
conniey Jul 7, 2021
dd8b114
Update changelog.
conniey Jul 7, 2021
f914cea
Fixing checkstyle issues.
conniey Jul 7, 2021
ff78517
Fixing issues where we poll the convenience method rather than the pr…
conniey Jul 8, 2021
02e4f01
Fixing bugs in system properties.
conniey Jul 8, 2021
0c2bdb9
Fixing interop tests.
conniey Jul 9, 2021
66239d5
Fix test error.
conniey Jul 9, 2021
def4eb0
Adding extra assertions for properties.
conniey Jul 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.models.AmqpAnnotatedMessage;
import com.azure.core.amqp.models.AmqpMessageBody;
import com.azure.core.amqp.models.AmqpMessageId;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;

import java.nio.ByteBuffer;
import java.time.Instant;
Expand Down Expand Up @@ -54,8 +58,8 @@ public class EventData {
static final Set<String> RESERVED_SYSTEM_PROPERTIES;

private final Map<String, Object> properties;
private final BinaryData body;
private final SystemProperties systemProperties;
private final AmqpAnnotatedMessage annotatedMessage;
private Context context;

static {
Expand All @@ -73,16 +77,24 @@ public class EventData {
* Creates an event containing the {@code body}.
*
* @param body The data to set for this event.
*
* @throws NullPointerException if {@code body} is {@code null}.
*/
public EventData(byte[] body) {
this(BinaryData.fromBytes(Objects.requireNonNull(body, "'body' cannot be null.")));
this.systemProperties = new SystemProperties();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since AmqpAnnotatedMessage exposes byte[] as the body, I felt like it was best to store it as such rather than BinaryData that we were using before to stop a lot of the conversions to and from BinaryData.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@conniey, @yijun: This looks like SystemProperties is a copy of data that also appears on the AmqpAnnotatedMessage instance. Since the AmqpAnnotatedMessage instance is fully mutable, if callers change the data there, then the EventData projection is no longer accurate and you've got drift....

Am I overlooking something?

this.context = Context.NONE;

final AmqpMessageBody messageBody = AmqpMessageBody.fromData(
Objects.requireNonNull(body, "'body' cannot be null."));
this.annotatedMessage = new AmqpAnnotatedMessage(messageBody);
this.properties = annotatedMessage.getApplicationProperties();
}

/**
* Creates an event containing the {@code body}.
*
* @param body The data to set for this event.
*
* @throws NullPointerException if {@code body} is {@code null}.
*/
public EventData(ByteBuffer body) {
Expand All @@ -93,6 +105,7 @@ public EventData(ByteBuffer body) {
* Creates an event by encoding the {@code body} using UTF-8 charset.
*
* @param body The string that will be UTF-8 encoded to create an event.
*
* @throws NullPointerException if {@code body} is {@code null}.
*/
public EventData(String body) {
Expand All @@ -105,22 +118,38 @@ public EventData(String body) {
* @param body The {@link BinaryData} payload for this event.
*/
public EventData(BinaryData body) {
this(body, new SystemProperties(), Context.NONE);
this(Objects.requireNonNull(body, "'body' cannot be null.").toBytes());
}

/**
* Creates an event with the given {@code body}, system properties and context.
*
* @param body The data to set for this event.
* @param systemProperties System properties set by message broker for this event.
* @param context A specified key-value pair of type {@link Context}.
* @throws NullPointerException if {@code body}, {@code systemProperties}, or {@code context} is {@code null}.
* @param amqpAnnotatedMessage Backing annotated message.
*
* @throws NullPointerException if {@code amqpAnnotatedMessage}, {@code systemProperties}, or {@code context} is
* {@code null}.
* @throws IllegalArgumentException if {@code amqpAnnotatedMessage}'s body type is unknown.
*/
EventData(BinaryData body, SystemProperties systemProperties, Context context) {
this.body = Objects.requireNonNull(body, "'body' cannot be null.");
EventData(AmqpAnnotatedMessage amqpAnnotatedMessage, SystemProperties systemProperties, Context context) {
this.context = Objects.requireNonNull(context, "'context' cannot be null.");
this.systemProperties = Objects.requireNonNull(systemProperties, "'systemProperties' cannot be null.");
this.properties = new HashMap<>();
this.systemProperties = Objects.requireNonNull(systemProperties, "'systemProperties' cannot be null.");
Comment thread
v-xuto marked this conversation as resolved.
Outdated
this.properties = Collections.unmodifiableMap(amqpAnnotatedMessage.getApplicationProperties());
this.annotatedMessage = Objects.requireNonNull(amqpAnnotatedMessage, "'amqpAnnotatedMessage' cannot be null.");

switch (annotatedMessage.getBody().getBodyType()) {
case DATA:
break;
case SEQUENCE:
case VALUE:
new ClientLogger(EventData.class).warning("Message body type '{}' is not supported in EH. "
+ " Getting contents of body may throw.", annotatedMessage.getBody().getBodyType());
break;
default:
throw new ClientLogger(EventData.class).logExceptionAsError(new IllegalArgumentException(
"Body type not valid " + annotatedMessage.getBody().getBodyType()));
}
}

/**
Expand All @@ -133,7 +162,8 @@ public EventData(BinaryData body) {
*
* {@codesnippet com.azure.messaging.eventhubs.eventdata.getProperties}
*
* @return Application properties associated with this {@link EventData}.
* @return Application properties associated with this {@link EventData}. For received {@link EventData}, the map is
* a read-only view.
*/
public Map<String, Object> getProperties() {
return properties;
Expand All @@ -143,8 +173,8 @@ public Map<String, Object> getProperties() {
* Properties that are populated by Event Hubs service. As these are populated by the Event Hubs service, they are
* only present on a <b>received</b> {@link EventData}.
*
* @return An encapsulation of all system properties appended by EventHubs service into {@link EventData}.
* {@code null} if the {@link EventData} is not received from the Event Hubs service.
* @return An encapsulation of all system properties appended by EventHubs service into {@link EventData}. {@code
* null} if the {@link EventData} is not received from the Event Hubs service.
*/
public Map<String, Object> getSystemProperties() {
Comment thread
v-xuto marked this conversation as resolved.
Outdated
return systemProperties;
Expand All @@ -162,7 +192,7 @@ public Map<String, Object> getSystemProperties() {
* @return A byte array representing the data.
*/
public byte[] getBody() {
return body.toBytes();
return annotatedMessage.getBody().getFirstData();
}

/**
Expand All @@ -171,7 +201,7 @@ public byte[] getBody() {
* @return UTF-8 decoded string representation of the event data.
*/
public String getBodyAsString() {
return new String(body.toBytes(), UTF_8);
return new String(annotatedMessage.getBody().getFirstData(), UTF_8);
}

/**
Expand All @@ -180,7 +210,7 @@ public String getBodyAsString() {
* @return the {@link BinaryData} payload associated with this event.
*/
public BinaryData getBodyAsBinaryData() {
return body;
return BinaryData.fromBytes(annotatedMessage.getBody().getFirstData());
}

/**
Expand Down Expand Up @@ -219,8 +249,8 @@ public Instant getEnqueuedTime() {

/**
* Gets the sequence number assigned to the event when it was enqueued in the associated Event Hub partition. This
* is unique for every message received in the Event Hub partition. This is only present on a <b>received</b>
* {@link EventData}.
* is unique for every message received in the Event Hub partition. This is only present on a <b>received</b> {@link
* EventData}.
*
* @return The sequence number for this event. {@code null} if the {@link EventData} was not received from Event
* Hubs service.
Expand All @@ -229,6 +259,84 @@ public Long getSequenceNumber() {
return systemProperties.getSequenceNumber();
}

/**
* Gets the underlying AMQP message.
*
* @return The underlying AMQP message.
*/
public AmqpAnnotatedMessage getRawAmqpMessage() {
return annotatedMessage;
}

/**
* Gets the content type.
*
* @return The content type.
*/
public String getContentType() {
return annotatedMessage.getProperties().getContentType();
}

/**
* Sets the content type.
*
* @param contentType The content type.
*
* @return The updated {@link EventData}.
*/
public EventData setContentType(String contentType) {
annotatedMessage.getProperties().setContentType(contentType);
return this;
}

/**
* Gets the correlation id.
*
* @return The correlation id. {@code null} if there is none set.
*/
public String getCorrelationId() {
final AmqpMessageId messageId = annotatedMessage.getProperties().getCorrelationId();
return messageId != null ? messageId.toString() : null;
}

/**
* Sets the correlation id.
*
* @param correlationId The correlation id.
*
* @return The updated {@link EventData}.
*/
public EventData setCorrelationId(String correlationId) {
final AmqpMessageId id = correlationId != null ? new AmqpMessageId(correlationId) : null;

annotatedMessage.getProperties().setCorrelationId(id);
return this;
}

/**
* Gets the message id.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message id or user id?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the message id. We're not exposing user id as a top-level property.

*
* @return The message id. {@code null} if there is none set.
*/
public String getMessageId() {
final AmqpMessageId messageId = annotatedMessage.getProperties().getMessageId();
return messageId != null ? messageId.toString() : null;
}

/**
* Sets the message id.
*
* @param messageId The message id.
*
* @return The updated {@link EventData}.
*/
public EventData setMessageId(String messageId) {
final AmqpMessageId id = messageId != null ? new AmqpMessageId(messageId) : null;

annotatedMessage.getProperties().setMessageId(id);
return this;
}

/**
* {@inheritDoc}
*/
Expand All @@ -242,15 +350,16 @@ public boolean equals(Object o) {
}

EventData eventData = (EventData) o;
return Arrays.equals(body.toBytes(), eventData.body.toBytes());
return Arrays.equals(annotatedMessage.getBody().getFirstData(),
eventData.annotatedMessage.getBody().getFirstData());
}

/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
return Arrays.hashCode(body.toBytes());
return Arrays.hashCode(annotatedMessage.getBody().getFirstData());
}

/**
Expand All @@ -267,8 +376,10 @@ Context getContext() {
*
* @param key The key for this context object
* @param value The value for this context object.
* @throws NullPointerException if {@code key} or {@code value} is null.
*
* @return The updated {@link EventData}.
*
* @throws NullPointerException if {@code key} or {@code value} is null.
*/
public EventData addContext(String key, Object value) {
Objects.requireNonNull(key, "The 'key' parameter cannot be null.");
Expand Down Expand Up @@ -300,27 +411,29 @@ static class SystemProperties extends HashMap<String, Object> {
super(map);
this.partitionKey = removeSystemProperty(PARTITION_KEY_ANNOTATION_NAME.getValue());

final String offset = removeSystemProperty(OFFSET_ANNOTATION_NAME.getValue());
final Long offset = removeSystemPropertyAsLong(OFFSET_ANNOTATION_NAME.getValue());
if (offset == null) {
throw new IllegalStateException(String.format(Locale.US,
"offset: %s should always be in map.", OFFSET_ANNOTATION_NAME.getValue()));
}
this.offset = Long.valueOf(offset);
Comment thread
conniey marked this conversation as resolved.

this.offset = offset;
put(OFFSET_ANNOTATION_NAME.getValue(), this.offset);

final Date enqueuedTimeValue = removeSystemProperty(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue());
final Instant enqueuedTimeValue = removeSystemPropertyAsInstant(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue());
if (enqueuedTimeValue == null) {
throw new IllegalStateException(String.format(Locale.US,
"enqueuedTime: %s should always be in map.", ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue()));
}
this.enqueuedTime = enqueuedTimeValue.toInstant();
this.enqueuedTime = enqueuedTimeValue;
put(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue(), this.enqueuedTime);

final Long sequenceNumber = removeSystemProperty(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue());
final Long sequenceNumber = removeSystemPropertyAsLong(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue());
if (sequenceNumber == null) {
throw new IllegalStateException(String.format(Locale.US,
"sequenceNumber: %s should always be in map.", SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()));
}

this.sequenceNumber = sequenceNumber;
put(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), this.sequenceNumber);
}
Expand Down Expand Up @@ -358,8 +471,9 @@ private Instant getEnqueuedTime() {
* Event Hub.
*
* @return Sequence number for this event.
* @throws IllegalStateException if {@link SystemProperties} does not contain the sequence number in a retrieved
* event.
*
* @throws IllegalStateException if {@link SystemProperties} does not contain the sequence number in a
* retrieved event.
*/
private Long getSequenceNumber() {
return sequenceNumber;
Expand All @@ -373,5 +487,37 @@ private <T> T removeSystemProperty(final String key) {

return null;
}

private Long removeSystemPropertyAsLong(final String key) {
if (!this.containsKey(key)) {
return null;
}

final Object value = this.remove(key);
if (value instanceof String) {
return Long.valueOf((String) value);
} else if (value instanceof Long) {
return (Long) value;
} else {
throw new ClientLogger(EventData.class).logExceptionAsError(new IllegalStateException(
Comment thread
conniey marked this conversation as resolved.
Outdated
String.format(Locale.US, "Key: %s Value %s is not of type String or Long.", key, value)));
}
}

private Instant removeSystemPropertyAsInstant(final String key) {
if (!this.containsKey(key)) {
return null;
}

final Object value = this.remove(key);
if (value instanceof Date) {
return ((Date) value).toInstant();
} else if (value instanceof Instant) {
return (Instant) value;
} else {
throw new ClientLogger(EventData.class).logExceptionAsError(new IllegalStateException(
String.format(Locale.US, "Key: %s Value %s is not of type Date or Instant.", key, value)));
}
}
}
}
}
Loading