diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java index ec3ce29e08a3..99a906be1bf9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java @@ -8,31 +8,21 @@ import com.azure.core.amqp.implementation.ErrorContextProvider; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.TracerProvider; -import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; -import com.azure.core.util.tracing.ProcessKind; -import reactor.core.publisher.Signal; import java.nio.BufferOverflowException; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Objects; -import java.util.Optional; -import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; -import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; -import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY; -import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; -import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; -import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME; +import static com.azure.messaging.servicebus.implementation.MessageUtils.traceMessageSpan; /** * A class for aggregating {@link ServiceBusMessage messages} into a single, size-limited, batch. It is treated as a * single AMQP message when sent to the Azure Service Bus service. */ public final class ServiceBusMessageBatch { - private static final String AZ_TRACING_NAMESPACE_VALUE = "Microsoft.ServiceBus"; private final ClientLogger logger = new ClientLogger(ServiceBusMessageBatch.class); private final Object lock = new Object(); private final int maxMessageSize; @@ -102,7 +92,10 @@ public boolean tryAdd(final ServiceBusMessage serviceBusMessage) { throw logger.logExceptionAsWarning(new IllegalArgumentException("message cannot be null")); } ServiceBusMessage serviceBusMessageUpdated = - tracerProvider.isEnabled() ? traceMessageSpan(serviceBusMessage) : serviceBusMessage; + tracerProvider.isEnabled() + ? traceMessageSpan(serviceBusMessage, serviceBusMessage.getContext(), hostname, entityPath, + tracerProvider) + : serviceBusMessage; final int size; try { @@ -135,39 +128,6 @@ List getMessages() { return serviceBusMessageList; } - /** - * Method to start and end a "Azure.EventHubs.message" span and add the "DiagnosticId" as a property of the - * message. - * - * @param serviceBusMessage The Message to add tracing span for. - * - * @return the updated Message data object. - */ - private ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage) { - Optional eventContextData = serviceBusMessage.getContext().getData(SPAN_CONTEXT_KEY); - if (eventContextData.isPresent()) { - // if message has context (in case of retries), don't start a message span or add a new context - return serviceBusMessage; - } else { - // Starting the span makes the sampling decision (nothing is logged at this time) - Context messageContext = serviceBusMessage.getContext() - .addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE) - .addData(ENTITY_PATH_KEY, entityPath) - .addData(HOST_NAME_KEY, hostname); - Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, messageContext, - ProcessKind.MESSAGE); - Optional eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY); - if (eventDiagnosticIdOptional.isPresent()) { - serviceBusMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get() - .toString()); - tracerProvider.endSpan(eventSpanContext, Signal.complete()); - serviceBusMessage.addContext(SPAN_CONTEXT_KEY, eventSpanContext); - } - } - - return serviceBusMessage; - } - private int getSize(final ServiceBusMessage serviceBusMessage, final boolean isFirst) { Objects.requireNonNull(serviceBusMessage, "'serviceBusMessage' cannot be null."); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java index 622e3b89e9c4..f628e07d7d9c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java @@ -4,7 +4,13 @@ package com.azure.messaging.servicebus.implementation; import com.azure.core.amqp.implementation.AmqpConstants; +import com.azure.core.amqp.implementation.TracerProvider; +import com.azure.core.util.Context; import com.azure.core.util.CoreUtils; +import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.tracing.ProcessKind; +import com.azure.messaging.servicebus.ServiceBusMessage; +import com.azure.messaging.servicebus.ServiceBusReceivedMessage; import com.azure.messaging.servicebus.ServiceBusTransactionContext; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; @@ -15,16 +21,32 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import reactor.core.publisher.Signal; +import java.io.Closeable; +import java.io.IOException; import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.UUID; +import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; +import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; +import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY; +import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; +import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME; +import static com.azure.core.util.tracing.Tracer.SCOPE_KEY; +import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; +import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME; +import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE; + + /** * Contains helper methods for message conversions, reading status codes, and getting delivery state. */ @@ -253,4 +275,79 @@ private static TransactionalState getTransactionState(ByteBuffer transactionId, transactionalState.setOutcome(outcome); return transactionalState; } + + /** + * Used in ServiceBusMessageBatch.tryAddMessage() to start tracing for to-be-sent out messages. + */ + public static ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage, + Context messageContext, String hostname, String entityPath, TracerProvider tracerProvider) { + Optional eventContextData = messageContext.getData(SPAN_CONTEXT_KEY); + if (eventContextData.isPresent()) { + // if message has context (in case of retries), don't start a message span or add a new context + return serviceBusMessage; + } else { + // Starting the span makes the sampling decision (nothing is logged at this time) + Context newMessageContext = messageContext + .addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE) + .addData(ENTITY_PATH_KEY, entityPath) + .addData(HOST_NAME_KEY, hostname); + Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, newMessageContext, + ProcessKind.MESSAGE); + Optional eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY); + if (eventDiagnosticIdOptional.isPresent()) { + serviceBusMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get() + .toString()); + tracerProvider.endSpan(eventSpanContext, Signal.complete()); + serviceBusMessage.addContext(SPAN_CONTEXT_KEY, eventSpanContext); + } + } + return serviceBusMessage; + } + + /* + * Starts a new process tracing span and attaches the returned context to the ServiceBusReceivedMessage object for + * users. + */ + public static Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage, + String hostname, String entityPath, TracerProvider tracerProvider, ProcessKind processKind) { + Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY); + if (diagnosticId == null || !tracerProvider.isEnabled()) { + return Context.NONE; + } + + Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE) + .addData(ENTITY_PATH_KEY, entityPath) + .addData(HOST_NAME_KEY, hostname) + .addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE); + spanContext = receivedMessage.getEnqueuedTime() == null + ? spanContext + : spanContext.addData(MESSAGE_ENQUEUED_TIME, receivedMessage.getEnqueuedTime().toEpochSecond()); + return tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, spanContext, processKind); + } + + /* + * Ends the process tracing span and the scope of that span. + */ + public static void endProcessTracingSpan(Context processSpanContext, Signal signal, + TracerProvider tracerProvider, ClientLogger logger) { + if (processSpanContext != null) { + Optional spanScope = processSpanContext.getData(SCOPE_KEY); + // Disposes of the scope when the trace span closes. + if (tracerProvider.isEnabled() && spanScope.isPresent()) { + if (spanScope.get() instanceof Closeable) { + Closeable close = (Closeable) spanScope.get(); + try { + close.close(); + tracerProvider.endSpan(processSpanContext, signal); + } catch (IOException ioException) { + logger.error(Messages.MESSAGE_PROCESSOR_RUN_END, ioException); + } + + } else { + logger.warning(String.format(Locale.US, + Messages.PROCESS_SPAN_SCOPE_TYPE_ERROR, spanScope.getClass())); + } + } + } + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java index 49671a2fdb02..c6b70756564f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java @@ -23,6 +23,8 @@ public class Messages { public static final String INVALID_LOCK_TOKEN_STRING = getMessage("INVALID_LOCK_TOKEN_STRING"); public static final String MESSAGE_NOT_OF_TYPE = getMessage("MESSAGE_NOT_OF_TYPE"); public static final String REQUEST_VALUE_NOT_VALID = getMessage("REQUEST_VALUE_NOT_VALID"); + public static final String PROCESS_SPAN_SCOPE_TYPE_ERROR = getMessage("PROCESS_SPAN_SCOPE_TYPE_ERROR"); + public static final String MESSAGE_PROCESSOR_RUN_END = getMessage("MESSAGE_PROCESSOR_RUN_END"); private static synchronized Properties getProperties() { if (properties != null) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties b/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties index 0600b337ca0e..3876503c3bc5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties @@ -5,3 +5,6 @@ MESSAGE_NOT_OF_TYPE=Message body type is not of type Data, but type: %s. Not set REQUEST_VALUE_NOT_VALID=Back pressure request value not valid. It must be between {} and {}. INVALID_OPERATION_DISPOSED_RECEIVER=Cannot perform operation '%s' on a disposed receiver. INVALID_LOCK_TOKEN_STRING=Invalid lock token '%s'. +PROCESS_SPAN_SCOPE_TYPE_ERROR=Process span scope type is not of type Closeable, but type: %s. Not closing the scope\ + and span +MESSAGE_PROCESSOR_RUN_END=MessageProcessor.run() endTracingSpan().close() failed with an error %s