From 0ce8d23a79a855553c79b88ed5ff20d0a9ebbf1a Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Wed, 14 Oct 2020 15:30:42 -0700 Subject: [PATCH 1/6] Remove usage of environment variable AZURE_SERVICE_BUS_CONNECTION_STRING --- .../servicebus/ServiceBusClientBuilder.java | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index 3b8c59220f34..da91565b3411 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -54,7 +54,6 @@ @ServiceClientBuilder(serviceClients = {ServiceBusReceiverAsyncClient.class, ServiceBusSenderAsyncClient.class, ServiceBusSenderClient.class, ServiceBusReceiverClient.class}) public final class ServiceBusClientBuilder { - private static final String AZURE_SERVICE_BUS_CONNECTION_STRING = "AZURE_SERVICE_BUS_CONNECTION_STRING"; private static final AmqpRetryOptions DEFAULT_RETRY = new AmqpRetryOptions().setTryTimeout(ServiceBusConstants.OPERATION_TIMEOUT); @@ -332,19 +331,11 @@ private ServiceBusConnectionProcessor getOrCreateConnectionProcessor(MessageSeri } private ConnectionOptions getConnectionOptions() { - configuration = configuration == null ? Configuration.getGlobalConfiguration().clone() : configuration; - if (credentials == null) { - final String connectionString = configuration.get(AZURE_SERVICE_BUS_CONNECTION_STRING); - - if (CoreUtils.isNullOrEmpty(connectionString)) { - throw logger.logExceptionAsError(new IllegalArgumentException("Credentials have not been set. " - + "They can be set using: connectionString(String), connectionString(String, String), " - + "credentials(String, String, TokenCredential), or setting the environment variable '" - + AZURE_SERVICE_BUS_CONNECTION_STRING + "' with a connection string")); - } - - connectionString(connectionString); + throw logger.logExceptionAsError(new IllegalArgumentException("Credentials have not been set. " + + "They can be set using: connectionString(String), connectionString(String, String), " + + "or credentials(String, String, TokenCredential)" + )); } // If the proxy has been configured by the user but they have overridden the TransportType with something that From e2077f63f2b23aecedd31fbc2137721bab7549e6 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Mon, 19 Oct 2020 10:12:28 -0700 Subject: [PATCH 2/6] Prepare tracing for MessageProcessor, schedule and other APIs --- .../servicebus/ServiceBusMessageBatch.java | 50 +--------- .../implementation/MessageUtils.java | 93 +++++++++++++++++++ .../servicebus/implementation/Messages.java | 2 + .../azure-messaging-servicebus.properties | 3 + 4 files changed, 103 insertions(+), 45 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java index ec3ce29e08a3..99a906be1bf9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java @@ -8,31 +8,21 @@ import com.azure.core.amqp.implementation.ErrorContextProvider; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.TracerProvider; -import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; -import com.azure.core.util.tracing.ProcessKind; -import reactor.core.publisher.Signal; import java.nio.BufferOverflowException; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Objects; -import java.util.Optional; -import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; -import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; -import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY; -import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; -import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; -import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME; +import static com.azure.messaging.servicebus.implementation.MessageUtils.traceMessageSpan; /** * A class for aggregating {@link ServiceBusMessage messages} into a single, size-limited, batch. It is treated as a * single AMQP message when sent to the Azure Service Bus service. */ public final class ServiceBusMessageBatch { - private static final String AZ_TRACING_NAMESPACE_VALUE = "Microsoft.ServiceBus"; private final ClientLogger logger = new ClientLogger(ServiceBusMessageBatch.class); private final Object lock = new Object(); private final int maxMessageSize; @@ -102,7 +92,10 @@ public boolean tryAdd(final ServiceBusMessage serviceBusMessage) { throw logger.logExceptionAsWarning(new IllegalArgumentException("message cannot be null")); } ServiceBusMessage serviceBusMessageUpdated = - tracerProvider.isEnabled() ? traceMessageSpan(serviceBusMessage) : serviceBusMessage; + tracerProvider.isEnabled() + ? traceMessageSpan(serviceBusMessage, serviceBusMessage.getContext(), hostname, entityPath, + tracerProvider) + : serviceBusMessage; final int size; try { @@ -135,39 +128,6 @@ List getMessages() { return serviceBusMessageList; } - /** - * Method to start and end a "Azure.EventHubs.message" span and add the "DiagnosticId" as a property of the - * message. - * - * @param serviceBusMessage The Message to add tracing span for. - * - * @return the updated Message data object. - */ - private ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage) { - Optional eventContextData = serviceBusMessage.getContext().getData(SPAN_CONTEXT_KEY); - if (eventContextData.isPresent()) { - // if message has context (in case of retries), don't start a message span or add a new context - return serviceBusMessage; - } else { - // Starting the span makes the sampling decision (nothing is logged at this time) - Context messageContext = serviceBusMessage.getContext() - .addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE) - .addData(ENTITY_PATH_KEY, entityPath) - .addData(HOST_NAME_KEY, hostname); - Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, messageContext, - ProcessKind.MESSAGE); - Optional eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY); - if (eventDiagnosticIdOptional.isPresent()) { - serviceBusMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get() - .toString()); - tracerProvider.endSpan(eventSpanContext, Signal.complete()); - serviceBusMessage.addContext(SPAN_CONTEXT_KEY, eventSpanContext); - } - } - - return serviceBusMessage; - } - private int getSize(final ServiceBusMessage serviceBusMessage, final boolean isFirst) { Objects.requireNonNull(serviceBusMessage, "'serviceBusMessage' cannot be null."); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java index 622e3b89e9c4..613ab6cb7ad8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java @@ -4,7 +4,13 @@ package com.azure.messaging.servicebus.implementation; import com.azure.core.amqp.implementation.AmqpConstants; +import com.azure.core.amqp.implementation.TracerProvider; +import com.azure.core.util.Context; import com.azure.core.util.CoreUtils; +import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.tracing.ProcessKind; +import com.azure.messaging.servicebus.ServiceBusMessage; +import com.azure.messaging.servicebus.ServiceBusReceivedMessage; import com.azure.messaging.servicebus.ServiceBusTransactionContext; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; @@ -15,16 +21,32 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import reactor.core.publisher.Signal; +import java.io.Closeable; +import java.io.IOException; import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.UUID; +import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; +import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; +import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY; +import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; +import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME; +import static com.azure.core.util.tracing.Tracer.SCOPE_KEY; +import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; +import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME; +import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE; + + /** * Contains helper methods for message conversions, reading status codes, and getting delivery state. */ @@ -253,4 +275,75 @@ private static TransactionalState getTransactionState(ByteBuffer transactionId, transactionalState.setOutcome(outcome); return transactionalState; } + + public static ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage, + Context messageContext, String hostname, String entityPath, TracerProvider tracerProvider) { + Optional eventContextData = messageContext.getData(SPAN_CONTEXT_KEY); + if (eventContextData.isPresent()) { + // if message has context (in case of retries), don't start a message span or add a new context + return serviceBusMessage; + } else { + // Starting the span makes the sampling decision (nothing is logged at this time) + Context newMessageContext = messageContext + .addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE) + .addData(ENTITY_PATH_KEY, entityPath) + .addData(HOST_NAME_KEY, hostname); + Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, newMessageContext, + ProcessKind.MESSAGE); + Optional eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY); + if (eventDiagnosticIdOptional.isPresent()) { + serviceBusMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get() + .toString()); + tracerProvider.endSpan(eventSpanContext, Signal.complete()); + serviceBusMessage.addContext(SPAN_CONTEXT_KEY, eventSpanContext); + } + } + return serviceBusMessage; + } + + /* + * Starts a new process tracing span and attaches the returned context to the EventData object for users. + */ + public static Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage, + String hostname, String entityPath, TracerProvider tracerProvider, ProcessKind processKind) { + Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY); + if (diagnosticId == null || !tracerProvider.isEnabled()) { + return Context.NONE; + } + + Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE) + .addData(ENTITY_PATH_KEY, entityPath) + .addData(HOST_NAME_KEY, hostname) + .addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE); + spanContext = receivedMessage.getEnqueuedTime() == null + ? spanContext + : spanContext.addData(MESSAGE_ENQUEUED_TIME, receivedMessage.getEnqueuedTime().toEpochSecond()); + return tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, spanContext, processKind); + } + + /* + * Ends the process tracing span and the scope of that span. + */ + public static void endProcessTracingSpan(Context processSpanContext, Signal signal, + TracerProvider tracerProvider, ClientLogger logger) { + if (processSpanContext != null) { + Optional spanScope = processSpanContext.getData(SCOPE_KEY); + // Disposes of the scope when the trace span closes. + if (tracerProvider.isEnabled() && spanScope.isPresent()) { + if (spanScope.get() instanceof Closeable) { + Closeable close = (Closeable) spanScope.get(); + try { + close.close(); + tracerProvider.endSpan(processSpanContext, signal); + } catch (IOException ioException) { + logger.error(Messages.MESSAGE_PROCESSOR_RUN_END, ioException); + } + + } else { + logger.warning(String.format(Locale.US, + Messages.PROCESS_SPAN_SCOPE_TYPE_ERROR, spanScope.getClass())); + } + } + } + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java index 49671a2fdb02..c6b70756564f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java @@ -23,6 +23,8 @@ public class Messages { public static final String INVALID_LOCK_TOKEN_STRING = getMessage("INVALID_LOCK_TOKEN_STRING"); public static final String MESSAGE_NOT_OF_TYPE = getMessage("MESSAGE_NOT_OF_TYPE"); public static final String REQUEST_VALUE_NOT_VALID = getMessage("REQUEST_VALUE_NOT_VALID"); + public static final String PROCESS_SPAN_SCOPE_TYPE_ERROR = getMessage("PROCESS_SPAN_SCOPE_TYPE_ERROR"); + public static final String MESSAGE_PROCESSOR_RUN_END = getMessage("MESSAGE_PROCESSOR_RUN_END"); private static synchronized Properties getProperties() { if (properties != null) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties b/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties index 0600b337ca0e..3876503c3bc5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties @@ -5,3 +5,6 @@ MESSAGE_NOT_OF_TYPE=Message body type is not of type Data, but type: %s. Not set REQUEST_VALUE_NOT_VALID=Back pressure request value not valid. It must be between {} and {}. INVALID_OPERATION_DISPOSED_RECEIVER=Cannot perform operation '%s' on a disposed receiver. INVALID_LOCK_TOKEN_STRING=Invalid lock token '%s'. +PROCESS_SPAN_SCOPE_TYPE_ERROR=Process span scope type is not of type Closeable, but type: %s. Not closing the scope\ + and span +MESSAGE_PROCESSOR_RUN_END=MessageProcessor.run() endTracingSpan().close() failed with an error %s From cc988128405870a301960f02d2ee3d5aee11e7c1 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Mon, 19 Oct 2020 10:28:53 -0700 Subject: [PATCH 3/6] Revert "Remove usage of environment variable AZURE_SERVICE_BUS_CONNECTION_STRING" This reverts commit 0ce8d23a --- .../servicebus/ServiceBusClientBuilder.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index da91565b3411..3b8c59220f34 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -54,6 +54,7 @@ @ServiceClientBuilder(serviceClients = {ServiceBusReceiverAsyncClient.class, ServiceBusSenderAsyncClient.class, ServiceBusSenderClient.class, ServiceBusReceiverClient.class}) public final class ServiceBusClientBuilder { + private static final String AZURE_SERVICE_BUS_CONNECTION_STRING = "AZURE_SERVICE_BUS_CONNECTION_STRING"; private static final AmqpRetryOptions DEFAULT_RETRY = new AmqpRetryOptions().setTryTimeout(ServiceBusConstants.OPERATION_TIMEOUT); @@ -331,11 +332,19 @@ private ServiceBusConnectionProcessor getOrCreateConnectionProcessor(MessageSeri } private ConnectionOptions getConnectionOptions() { + configuration = configuration == null ? Configuration.getGlobalConfiguration().clone() : configuration; + if (credentials == null) { - throw logger.logExceptionAsError(new IllegalArgumentException("Credentials have not been set. " - + "They can be set using: connectionString(String), connectionString(String, String), " - + "or credentials(String, String, TokenCredential)" - )); + final String connectionString = configuration.get(AZURE_SERVICE_BUS_CONNECTION_STRING); + + if (CoreUtils.isNullOrEmpty(connectionString)) { + throw logger.logExceptionAsError(new IllegalArgumentException("Credentials have not been set. " + + "They can be set using: connectionString(String), connectionString(String, String), " + + "credentials(String, String, TokenCredential), or setting the environment variable '" + + AZURE_SERVICE_BUS_CONNECTION_STRING + "' with a connection string")); + } + + connectionString(connectionString); } // If the proxy has been configured by the user but they have overridden the TransportType with something that From ab3908c55535858faa12348778637f9998c5ef4d Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Tue, 27 Oct 2020 16:43:58 -0700 Subject: [PATCH 4/6] Update sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java Co-authored-by: Srikanta <51379715+srnagar@users.noreply.github.com> --- .../azure/messaging/servicebus/implementation/MessageUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java index 613ab6cb7ad8..b26d1b7fee4f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java @@ -302,7 +302,7 @@ public static ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMes } /* - * Starts a new process tracing span and attaches the returned context to the EventData object for users. + * Starts a new process tracing span and attaches the returned context to the ServiceBusReceivedMessage object for users. */ public static Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage, String hostname, String entityPath, TracerProvider tracerProvider, ProcessKind processKind) { From ea7cecf2ecc383fd57a8a0c1df2b1d98630e6f69 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Tue, 27 Oct 2020 16:56:57 -0700 Subject: [PATCH 5/6] Fix checkstyle format error --- .../messaging/servicebus/implementation/MessageUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java index b26d1b7fee4f..309b04cc134a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java @@ -302,7 +302,8 @@ public static ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMes } /* - * Starts a new process tracing span and attaches the returned context to the ServiceBusReceivedMessage object for users. + * Starts a new process tracing span and attaches the returned context to the ServiceBusReceivedMessage object for + * users. */ public static Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage, String hostname, String entityPath, TracerProvider tracerProvider, ProcessKind processKind) { From da71bea3492a883861a16f80170fcda61b5c460a Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Tue, 27 Oct 2020 18:49:20 -0700 Subject: [PATCH 6/6] Add javadoc for traceMessageSpan --- .../messaging/servicebus/implementation/MessageUtils.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java index 309b04cc134a..f628e07d7d9c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java @@ -276,6 +276,9 @@ private static TransactionalState getTransactionState(ByteBuffer transactionId, return transactionalState; } + /** + * Used in ServiceBusMessageBatch.tryAddMessage() to start tracing for to-be-sent out messages. + */ public static ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage, Context messageContext, String hostname, String entityPath, TracerProvider tracerProvider) { Optional eventContextData = messageContext.getData(SPAN_CONTEXT_KEY);