diff --git a/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml b/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml index af0bc325cb4a..fa5d11072a52 100755 --- a/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml +++ b/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml @@ -47,6 +47,9 @@ + + + @@ -128,6 +131,9 @@ + + + @@ -188,6 +194,8 @@ + + diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java index a652a6ace801..e6a8a056f7ff 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java @@ -24,7 +24,6 @@ import java.time.Instant; import java.util.Date; import java.util.HashMap; -import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -155,8 +154,8 @@ private T deserializeManagementResponse(Message message, Class deserializ } else if (deserializedType == EventHubProperties.class) { return (T) toEventHubProperties(amqpBody); } else { - throw logger.logExceptionAsError(new IllegalArgumentException(String.format(Locale.US, - "Class '%s' is not a supported deserializable type.", deserializedType))); + throw logger.logExceptionAsError(new IllegalArgumentException(String.format( + Messages.CLASS_NOT_A_SUPPORTED_TYPE, deserializedType))); } } @@ -214,8 +213,7 @@ private EventData deserializeEventData(Message message) { Data bodyData = (Data) bodySection; body = bodyData.getValue().getArray(); } else { - logger.warning(String.format(Locale.US, - "Message body type is not of type Data, but type: %s. Not setting body contents.", + logger.warning(String.format(Messages.MESSAGE_NOT_OF_TYPE, bodySection != null ? bodySection.getType() : "null")); body = new byte[0]; @@ -347,7 +345,6 @@ private static void setSystemProperties(EventData eventData, Message message) { default: throw new IllegalArgumentException( String.format( - Locale.US, "Property is not a recognized reserved property name: %s", key)); } @@ -426,7 +423,7 @@ private static int sizeof(Object obj) { return Double.BYTES; } - throw new IllegalArgumentException(String.format(Locale.US, "Encoding Type: %s is not supported", + throw new IllegalArgumentException(String.format(Messages.ENCODING_TYPE_NOT_SUPPORTED, obj.getClass())); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java index 074ec73e2dfc..f997cb5004bf 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java @@ -111,7 +111,7 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable { }) .doOnRequest(request -> { if (request < MINIMUM_REQUEST) { - logger.warning("Back pressure request value not valid. It must be between {} and {}.", + logger.warning(Messages.REQUEST_VALUE_NOT_VALID, MINIMUM_REQUEST, MAXIMUM_REQUEST); return; } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java index f8559e44c997..d1857d6e19a6 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java @@ -392,7 +392,7 @@ public Mono send(EventDataBatch batch) { if (batch == null) { return monoError(logger, new NullPointerException("'batch' cannot be null.")); } else if (batch.getEvents().isEmpty()) { - logger.warning("Cannot send an EventBatch that is empty."); + logger.warning(Messages.CANNOT_SEND_EVENT_BATCH_EMPTY); return Mono.empty(); } @@ -478,7 +478,7 @@ private Mono sendInternal(Flux eventBatches) { .flatMap(this::send) .then() .doOnError(error -> { - logger.error("Error sending batch.", error); + logger.error(Messages.ERROR_SENDING_BATCH, error); }); } @@ -570,7 +570,7 @@ public BiConsumer, EventData> accumulator() { if (maxNumberOfBatches != null && list.size() == maxNumberOfBatches) { final String message = String.format(Locale.US, - "EventData does not fit into maximum number of batches. '%s'", maxNumberOfBatches); + Messages.EVENT_DATA_DOES_NOT_FIT, maxNumberOfBatches); throw new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, message, contextProvider.getErrorContext()); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/Messages.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/Messages.java new file mode 100644 index 000000000000..0580ea3468bc --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/Messages.java @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.core.util.logging.ClientLogger; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +/** + * I18n messages loaded from the messages.properties file located within the same package. + */ +public enum Messages { + ; + private static final ClientLogger LOGGER = new ClientLogger(Messages.class); + private static Properties properties; + private static final String MESSAGES_PROPERTIES_PATH = "com/azure/messaging/eventhubs/messages.properties"; + public static final String CLASS_NOT_A_SUPPORTED_TYPE = getMessage("CLASS_NOT_A_SUPPORTED_TYPE"); + public static final String ENCODING_TYPE_NOT_SUPPORTED = getMessage("ENCODING_TYPE_NOT_SUPPORTED"); + public static final String PROCESS_SPAN_SCOPE_TYPE_ERROR = getMessage("PROCESS_SPAN_SCOPE_TYPE_ERROR"); + public static final String MESSAGE_NOT_OF_TYPE = getMessage("MESSAGE_NOT_OF_TYPE"); + public static final String REQUEST_VALUE_NOT_VALID = getMessage("REQUEST_VALUE_NOT_VALID"); + public static final String EVENT_DATA_DOES_NOT_FIT = getMessage("EVENT_DATA_DOES_NOT_FIT"); + public static final String CANNOT_SEND_EVENT_BATCH_EMPTY = getMessage("CANNOT_SEND_EVENT_BATCH_EMPTY"); + public static final String ERROR_SENDING_BATCH = getMessage("ERROR_SENDING_BATCH"); + public static final String FAILED_TO_CLAIM_OWNERSHIP = getMessage("FAILED_TO_CLAIM_OWNERSHIP"); + public static final String LOAD_BALANCING_FAILED = getMessage("LOAD_BALANCING_FAILED"); + public static final String EVENT_PROCESSOR_RUN_END = getMessage("EVENT_PROCESSOR_RUN_END"); + public static final String FAILED_PROCESSING_ERROR_RECEIVE = getMessage("FAILED_PROCESSING_ERROR_RECEIVE"); + public static final String FAILED_WHILE_PROCESSING_ERROR = getMessage("FAILED_WHILE_PROCESSING_ERROR"); + public static final String FAILED_CLOSE_CONSUMER_PARTITION = getMessage("FAILED_CLOSE_CONSUMER_PARTITION"); + public static final String ERROR_OCCURRED_IN_SUBSCRIBER_ERROR = getMessage("ERROR_OCCURRED_IN_SUBSCRIBER_ERROR"); + public static final String EXCEPTION_OCCURRED_WHILE_EMITTING = getMessage("EXCEPTION_OCCURRED_WHILE_EMITTING"); + + private static synchronized Properties getProperties() { + if (properties != null) { + return properties; + } + properties = new Properties(); + try (InputStream inputStream = + Thread.currentThread().getContextClassLoader().getResourceAsStream(MESSAGES_PROPERTIES_PATH)) { + if (inputStream != null) { + properties.load(inputStream); + } else { + LOGGER.error("Message properties [{}] not found", MESSAGES_PROPERTIES_PATH); //NON-NLS + } + } catch (IOException exception) { + LOGGER.error("Error loading message properties [{}]", MESSAGES_PROPERTIES_PATH, exception); //NON-NLS + } + return properties; + } + + /** + * @param key the key of the message to retrieve + * @return the message matching the given key + */ + public static String getMessage(String key) { + return String.valueOf(getProperties().getOrDefault(key, key)); + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java index 99e3534c21d4..7744ff243bd2 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java @@ -107,7 +107,7 @@ void loadBalance() { Mono.zip(partitionOwnershipMono, partitionsMono) .flatMap(this::loadBalance) // if there was an error, log warning and TODO: call user provided error handler - .doOnError(ex -> logger.warning("Load balancing for event processor failed - {}", ex.getMessage())) + .doOnError(ex -> logger.warning(Messages.LOAD_BALANCING_FAILED, ex.getMessage())) .subscribe(); } @@ -332,7 +332,7 @@ private void claimOwnership(final Map partitionOwner .doOnNext(partitionOwnership -> logger.info("Successfully claimed ownership of partition {}", partitionOwnership.getPartitionId())) .doOnError(ex -> logger - .warning("Failed to claim ownership of partition {} - {}", ownershipRequest.getPartitionId(), + .warning(Messages.FAILED_TO_CLAIM_OWNERSHIP, ownershipRequest.getPartitionId(), ex.getMessage(), ex)) .collectList() .zipWith(checkpointStore.listCheckpoints(fullyQualifiedNamespace, eventHubName, consumerGroupName) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index 8c7f82e81182..5f662aa47e4f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -90,7 +90,7 @@ void stopAllPartitionPumps() { try { eventHubConsumer.close(); } catch (Exception ex) { - logger.warning("Failed to close consumer for partition {}", partitionId, ex); + logger.warning(Messages.FAILED_CLOSE_CONSUMER_PARTITION, partitionId, ex); } finally { partitionPumps.remove(partitionId); } @@ -168,7 +168,7 @@ private void handleProcessingError(PartitionOwnership claimedOwnership, Partitio // also fails just log and continue partitionProcessor.processError(new ErrorContext(partitionContext, error)); } catch (Exception ex) { - logger.warning("Failed while processing error {}", claimedOwnership.getPartitionId(), ex); + logger.warning(Messages.FAILED_WHILE_PROCESSING_ERROR, claimedOwnership.getPartitionId(), ex); } } @@ -186,7 +186,7 @@ private void handleReceiveError(PartitionOwnership claimedOwnership, EventHubCon } partitionProcessor.close(new CloseContext(partitionContext, closeReason)); } catch (Exception ex) { - logger.warning("Failed while processing error on receive {}", claimedOwnership.getPartitionId(), ex); + logger.warning(Messages.FAILED_PROCESSING_ERROR_RECEIVE, claimedOwnership.getPartitionId(), ex); } finally { try { // close the consumer @@ -225,12 +225,12 @@ private void endProcessTracingSpan(Context processSpanContext, Signal sign close.close(); tracerProvider.endSpan(processSpanContext, signal); } catch (IOException ioException) { - logger.error("EventProcessor.run() endTracingSpan().close() failed with an error %s", ioException); + logger.error(Messages.EVENT_PROCESSOR_RUN_END, ioException); } } else { logger.warning(String.format(Locale.US, - "Process span scope type is not of type Closeable, but type: %s. Not closing the scope and span", + Messages.PROCESS_SPAN_SCOPE_TYPE_ERROR, spanScope.get() != null ? spanScope.getClass() : "null")); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.java index 5b5c1779977d..b588d7107d9e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.java @@ -4,6 +4,7 @@ package com.azure.messaging.eventhubs.implementation; import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.eventhubs.Messages; import com.azure.messaging.eventhubs.models.PartitionEvent; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; @@ -69,7 +70,7 @@ protected void hookOnComplete() { */ @Override protected void hookOnError(Throwable throwable) { - logger.error("Error occurred in subscriber. Error: {}", throwable); + logger.error(Messages.ERROR_OCCURRED_IN_SUBSCRIBER_ERROR, throwable); work.error(throwable); dispose(); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousReceiveWork.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousReceiveWork.java index 8637fca6c24b..1ec6fd0ddee2 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousReceiveWork.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousReceiveWork.java @@ -4,6 +4,7 @@ package com.azure.messaging.eventhubs.implementation; import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.eventhubs.Messages; import com.azure.messaging.eventhubs.models.PartitionEvent; import reactor.core.publisher.FluxSink; @@ -88,7 +89,7 @@ public void next(PartitionEvent event) { emitter.next(event); remaining.decrementAndGet(); } catch (Exception e) { - logger.warning("Exception occurred while emitting next received event.", e); + logger.warning(Messages.EXCEPTION_OCCURRED_WHILE_EMITTING, e); isTerminal = true; emitter.error(e); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/resources/com/azure/messaging/eventhubs/messages.properties b/sdk/eventhubs/azure-messaging-eventhubs/src/main/resources/com/azure/messaging/eventhubs/messages.properties new file mode 100644 index 000000000000..c3f1ffcbf7cb --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/resources/com/azure/messaging/eventhubs/messages.properties @@ -0,0 +1,16 @@ +PROCESS_SPAN_SCOPE_TYPE_ERROR=Process span scope type is not of type Closeable, but type: %s. Not closing the scope and span +MESSAGE_NOT_OF_TYPE=Message body type is not of type Data, but type: %s. Not setting body contents. +REQUEST_VALUE_NOT_VALID=Back pressure request value not valid. It must be between {} and {}. +EVENT_DATA_DOES_NOT_FIT=EventData does not fit into maximum number of batches. '%s' +CANNOT_SEND_EVENT_BATCH_EMPTY=Cannot send an EventBatch that is empty. +ERROR_SENDING_BATCH=Error sending batch. +FAILED_TO_CLAIM_OWNERSHIP=Failed to claim ownership of partition {} - {} +LOAD_BALANCING_FAILED=Load balancing for event processor failed - {} +EVENT_PROCESSOR_RUN_END=EventProcessor.run() endTracingSpan().close() failed with an error %s +FAILED_PROCESSING_ERROR_RECEIVE=Failed while processing error on receive {} +FAILED_WHILE_PROCESSING_ERROR=Failed while processing error {} +FAILED_CLOSE_CONSUMER_PARTITION=Failed to close consumer for partition {} +ERROR_OCCURRED_IN_SUBSCRIBER_ERROR=Error occurred in subscriber. Error: {} +EXCEPTION_OCCURRED_WHILE_EMITTING=Exception occurred while emitting next received event. +CLASS_NOT_A_SUPPORTED_TYPE=Class '%s' is not a supported deserializable type. +ENCODING_TYPE_NOT_SUPPORTED=Encoding Type: %s is not supported diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/MessagesTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/MessagesTest.java new file mode 100644 index 000000000000..afb6a9b716c6 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/MessagesTest.java @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.lang.reflect.Field; +import java.util.Objects; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +class MessagesTest { + + static Stream keys() { + return Stream.of(Messages.class.getFields()).map(Field::getName); + } + + @ParameterizedTest + @MethodSource("keys") + void getMessage(String messageKey) { + assertNotEquals(messageKey, Messages.getMessage(messageKey)); + } + + @ParameterizedTest + @MethodSource("keys") + void messageField(String messageKey) throws NoSuchFieldException, IllegalAccessException { + Field field = Messages.class.getField(messageKey); + field.setAccessible(true); + assertEquals(Messages.getMessage(messageKey), Objects.toString(field.get(Messages.class))); + } +}