Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,21 @@
import com.azure.core.amqp.implementation.ErrorContextProvider;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import reactor.core.publisher.Signal;

import java.nio.BufferOverflowException;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;
import static com.azure.messaging.servicebus.implementation.MessageUtils.traceMessageSpan;

/**
* A class for aggregating {@link ServiceBusMessage messages} into a single, size-limited, batch. It is treated as a
* single AMQP message when sent to the Azure Service Bus service.
*/
public final class ServiceBusMessageBatch {
private static final String AZ_TRACING_NAMESPACE_VALUE = "Microsoft.ServiceBus";
private final ClientLogger logger = new ClientLogger(ServiceBusMessageBatch.class);
private final Object lock = new Object();
private final int maxMessageSize;
Expand Down Expand Up @@ -102,7 +92,10 @@ public boolean tryAdd(final ServiceBusMessage serviceBusMessage) {
throw logger.logExceptionAsWarning(new IllegalArgumentException("message cannot be null"));
}
ServiceBusMessage serviceBusMessageUpdated =
tracerProvider.isEnabled() ? traceMessageSpan(serviceBusMessage) : serviceBusMessage;
tracerProvider.isEnabled()
? traceMessageSpan(serviceBusMessage, serviceBusMessage.getContext(), hostname, entityPath,
tracerProvider)
: serviceBusMessage;

final int size;
try {
Expand Down Expand Up @@ -135,39 +128,6 @@ List<ServiceBusMessage> getMessages() {
return serviceBusMessageList;
}

/**
* Method to start and end a "Azure.EventHubs.message" span and add the "DiagnosticId" as a property of the
* message.
*
* @param serviceBusMessage The Message to add tracing span for.
*
* @return the updated Message data object.
*/
private ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage) {
Optional<Object> eventContextData = serviceBusMessage.getContext().getData(SPAN_CONTEXT_KEY);
if (eventContextData.isPresent()) {
// if message has context (in case of retries), don't start a message span or add a new context
return serviceBusMessage;
} else {
// Starting the span makes the sampling decision (nothing is logged at this time)
Context messageContext = serviceBusMessage.getContext()
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE)
.addData(ENTITY_PATH_KEY, entityPath)
.addData(HOST_NAME_KEY, hostname);
Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, messageContext,
ProcessKind.MESSAGE);
Optional<Object> eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY);
if (eventDiagnosticIdOptional.isPresent()) {
serviceBusMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get()
.toString());
tracerProvider.endSpan(eventSpanContext, Signal.complete());
serviceBusMessage.addContext(SPAN_CONTEXT_KEY, eventSpanContext);
}
}

return serviceBusMessage;
}

private int getSize(final ServiceBusMessage serviceBusMessage, final boolean isFirst) {
Objects.requireNonNull(serviceBusMessage, "'serviceBusMessage' cannot be null.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
Expand All @@ -15,16 +21,32 @@
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import reactor.core.publisher.Signal;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME;
import static com.azure.core.util.tracing.Tracer.SCOPE_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;


/**
* Contains helper methods for message conversions, reading status codes, and getting delivery state.
*/
Expand Down Expand Up @@ -253,4 +275,79 @@ private static TransactionalState getTransactionState(ByteBuffer transactionId,
transactionalState.setOutcome(outcome);
return transactionalState;
}

/**
* Used in ServiceBusMessageBatch.tryAddMessage() to start tracing for to-be-sent out messages.
*/
public static ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage,
Context messageContext, String hostname, String entityPath, TracerProvider tracerProvider) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@YijunXieMS YijunXieMS Oct 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessKind.MESSAGE should be used to create a message. Just like in EventHubs.

Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, eventContext,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ProcessKind helps the tracer apply specific attributes on the span. For example, in the case of MESSAGE, the tracer will attribute the spanType=Producer, here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samvaity is this javadoc misleading? It looks like MESSAGE is used for receiving.

    /**
     * Amqp message process call to receive data.
     */
    MESSAGE,

Copy link
Member

@samvaity samvaity Oct 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we initially had this as RECEIVE and later updated to MESSAGE so the left over javadoc. But yes it should be updated to suit process kind "message". AMQP process kind for message spans.

Optional<Object> eventContextData = messageContext.getData(SPAN_CONTEXT_KEY);
if (eventContextData.isPresent()) {
// if message has context (in case of retries), don't start a message span or add a new context
return serviceBusMessage;
} else {
// Starting the span makes the sampling decision (nothing is logged at this time)
Context newMessageContext = messageContext
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE)
.addData(ENTITY_PATH_KEY, entityPath)
.addData(HOST_NAME_KEY, hostname);
Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, newMessageContext,
ProcessKind.MESSAGE);
Optional<Object> eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY);
if (eventDiagnosticIdOptional.isPresent()) {
serviceBusMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get()
.toString());
tracerProvider.endSpan(eventSpanContext, Signal.complete());
serviceBusMessage.addContext(SPAN_CONTEXT_KEY, eventSpanContext);
}
}
return serviceBusMessage;
}

/*
* Starts a new process tracing span and attaches the returned context to the ServiceBusReceivedMessage object for
* users.
*/
public static Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage,
String hostname, String entityPath, TracerProvider tracerProvider, ProcessKind processKind) {
Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY);
if (diagnosticId == null || !tracerProvider.isEnabled()) {
return Context.NONE;
}

Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE)
.addData(ENTITY_PATH_KEY, entityPath)
.addData(HOST_NAME_KEY, hostname)
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
spanContext = receivedMessage.getEnqueuedTime() == null
? spanContext
: spanContext.addData(MESSAGE_ENQUEUED_TIME, receivedMessage.getEnqueuedTime().toEpochSecond());
return tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, spanContext, processKind);
}

/*
* Ends the process tracing span and the scope of that span.
*/
public static void endProcessTracingSpan(Context processSpanContext, Signal<Void> signal,
TracerProvider tracerProvider, ClientLogger logger) {
if (processSpanContext != null) {
Optional<Object> spanScope = processSpanContext.getData(SCOPE_KEY);
// Disposes of the scope when the trace span closes.
if (tracerProvider.isEnabled() && spanScope.isPresent()) {
if (spanScope.get() instanceof Closeable) {
Closeable close = (Closeable) spanScope.get();
try {
close.close();
tracerProvider.endSpan(processSpanContext, signal);
} catch (IOException ioException) {
logger.error(Messages.MESSAGE_PROCESSOR_RUN_END, ioException);
}

} else {
logger.warning(String.format(Locale.US,
Messages.PROCESS_SPAN_SCOPE_TYPE_ERROR, spanScope.getClass()));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class Messages {
public static final String INVALID_LOCK_TOKEN_STRING = getMessage("INVALID_LOCK_TOKEN_STRING");
public static final String MESSAGE_NOT_OF_TYPE = getMessage("MESSAGE_NOT_OF_TYPE");
public static final String REQUEST_VALUE_NOT_VALID = getMessage("REQUEST_VALUE_NOT_VALID");
public static final String PROCESS_SPAN_SCOPE_TYPE_ERROR = getMessage("PROCESS_SPAN_SCOPE_TYPE_ERROR");
public static final String MESSAGE_PROCESSOR_RUN_END = getMessage("MESSAGE_PROCESSOR_RUN_END");

private static synchronized Properties getProperties() {
if (properties != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ MESSAGE_NOT_OF_TYPE=Message body type is not of type Data, but type: %s. Not set
REQUEST_VALUE_NOT_VALID=Back pressure request value not valid. It must be between {} and {}.
INVALID_OPERATION_DISPOSED_RECEIVER=Cannot perform operation '%s' on a disposed receiver.
INVALID_LOCK_TOKEN_STRING=Invalid lock token '%s'.
PROCESS_SPAN_SCOPE_TYPE_ERROR=Process span scope type is not of type Closeable, but type: %s. Not closing the scope\
and span
MESSAGE_PROCESSOR_RUN_END=MessageProcessor.run() endTracingSpan().close() failed with an error %s