Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
<suppress checks="MethodName" files="SymmetricEncryptionAlgorithm.java"/>
<suppress checks="MethodName" files="RsaSignature.java"/>

<!-- empty Enum Singleton Pattern -->
<suppress checks="NoWhitespaceBefore" files="Messages.java"/>

<!-- Public API already released with incorrect constant variable naming -->
<suppress checks="ConstantName" files="AlgorithmResolver.java"/>
<suppress checks="ConstantName" files="AmqpErrorCode.java"/>
Expand Down Expand Up @@ -128,6 +131,9 @@
<!-- Suppress LineLength for Track 1 libraries. -->
<suppress checks="LineLength" files=".*[/\\]com[/\\]microsoft[/\\]"/>

<!-- Suppress LineLength for i18n message properties -->
<suppress checks="LineLength" files="messages.properties"/>

<!-- Don't apply custom Checkstyle rules to files under checkstyle package. -->
<suppress checks="com\.azure\.tools\.checkstyle\.checks\..+" files=".*[/\\]tools[/\\]checkstyle[/\\].*"/>

Expand Down Expand Up @@ -188,6 +194,8 @@
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck" files=".*[/\\]core[/\\]test[/\\].*\.java"/>
<!-- Class has static methods which using static logger instance, issue link: https://github.com/Azure/azure-sdk-for-java/issues/5137-->
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck" files="CertificateUtil.java"/>
<!-- Requires static access to logger to report errors while loading i18n messages (from within a static initializer )-->
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck" files="Messages.java"/>

<!-- Event Hubs uses AMQP, which does not contain an HTTP response. Returning PagedResponse and Response does not apply. -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.messaging.eventhubs.EventHubClient.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -155,8 +154,8 @@ private <T> T deserializeManagementResponse(Message message, Class<T> deserializ
} else if (deserializedType == EventHubProperties.class) {
return (T) toEventHubProperties(amqpBody);
} else {
throw logger.logExceptionAsError(new IllegalArgumentException(String.format(Locale.US,
"Class '%s' is not a supported deserializable type.", deserializedType)));
throw logger.logExceptionAsError(new IllegalArgumentException(String.format(
Messages.CLASS_NOT_A_SUPPORTED_TYPE, deserializedType)));
}
}

Expand Down Expand Up @@ -214,8 +213,7 @@ private EventData deserializeEventData(Message message) {
Data bodyData = (Data) bodySection;
body = bodyData.getValue().getArray();
} else {
logger.warning(String.format(Locale.US,
"Message body type is not of type Data, but type: %s. Not setting body contents.",
logger.warning(String.format(Messages.MESSAGE_NOT_OF_TYPE,
bodySection != null ? bodySection.getType() : "null"));

body = new byte[0];
Expand Down Expand Up @@ -347,7 +345,6 @@ private static void setSystemProperties(EventData eventData, Message message) {
default:
throw new IllegalArgumentException(
String.format(
Locale.US,
"Property is not a recognized reserved property name: %s",
key));
}
Expand Down Expand Up @@ -426,7 +423,7 @@ private static int sizeof(Object obj) {
return Double.BYTES;
}

throw new IllegalArgumentException(String.format(Locale.US, "Encoding Type: %s is not supported",
throw new IllegalArgumentException(String.format(Messages.ENCODING_TYPE_NOT_SUPPORTED,
obj.getClass()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable {
})
.doOnRequest(request -> {
if (request < MINIMUM_REQUEST) {
logger.warning("Back pressure request value not valid. It must be between {} and {}.",
logger.warning(Messages.REQUEST_VALUE_NOT_VALID,
MINIMUM_REQUEST, MAXIMUM_REQUEST);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public Mono<Void> send(EventDataBatch batch) {
if (batch == null) {
return monoError(logger, new NullPointerException("'batch' cannot be null."));
} else if (batch.getEvents().isEmpty()) {
logger.warning("Cannot send an EventBatch that is empty.");
logger.warning(Messages.CANNOT_SEND_EVENT_BATCH_EMPTY);
return Mono.empty();
}

Expand Down Expand Up @@ -478,7 +478,7 @@ private Mono<Void> sendInternal(Flux<EventDataBatch> eventBatches) {
.flatMap(this::send)
.then()
.doOnError(error -> {
logger.error("Error sending batch.", error);
logger.error(Messages.ERROR_SENDING_BATCH, error);
});
}

Expand Down Expand Up @@ -570,7 +570,7 @@ public BiConsumer<List<EventDataBatch>, EventData> accumulator() {

if (maxNumberOfBatches != null && list.size() == maxNumberOfBatches) {
final String message = String.format(Locale.US,
"EventData does not fit into maximum number of batches. '%s'", maxNumberOfBatches);
Messages.EVENT_DATA_DOES_NOT_FIT, maxNumberOfBatches);

throw new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, message,
contextProvider.getErrorContext());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

import com.azure.core.util.logging.ClientLogger;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
* I18n messages loaded from the messages.properties file located within the same package.
*/
public enum Messages {
;
private static final ClientLogger LOGGER = new ClientLogger(Messages.class);
private static Properties properties;
private static final String MESSAGES_PROPERTIES_PATH = "com/azure/messaging/eventhubs/messages.properties";
public static final String CLASS_NOT_A_SUPPORTED_TYPE = getMessage("CLASS_NOT_A_SUPPORTED_TYPE");
public static final String ENCODING_TYPE_NOT_SUPPORTED = getMessage("ENCODING_TYPE_NOT_SUPPORTED");
public static final String PROCESS_SPAN_SCOPE_TYPE_ERROR = getMessage("PROCESS_SPAN_SCOPE_TYPE_ERROR");
public static final String MESSAGE_NOT_OF_TYPE = getMessage("MESSAGE_NOT_OF_TYPE");
public static final String REQUEST_VALUE_NOT_VALID = getMessage("REQUEST_VALUE_NOT_VALID");
public static final String EVENT_DATA_DOES_NOT_FIT = getMessage("EVENT_DATA_DOES_NOT_FIT");
public static final String CANNOT_SEND_EVENT_BATCH_EMPTY = getMessage("CANNOT_SEND_EVENT_BATCH_EMPTY");
public static final String ERROR_SENDING_BATCH = getMessage("ERROR_SENDING_BATCH");
public static final String FAILED_TO_CLAIM_OWNERSHIP = getMessage("FAILED_TO_CLAIM_OWNERSHIP");
public static final String LOAD_BALANCING_FAILED = getMessage("LOAD_BALANCING_FAILED");
public static final String EVENT_PROCESSOR_RUN_END = getMessage("EVENT_PROCESSOR_RUN_END");
public static final String FAILED_PROCESSING_ERROR_RECEIVE = getMessage("FAILED_PROCESSING_ERROR_RECEIVE");
public static final String FAILED_WHILE_PROCESSING_ERROR = getMessage("FAILED_WHILE_PROCESSING_ERROR");
public static final String FAILED_CLOSE_CONSUMER_PARTITION = getMessage("FAILED_CLOSE_CONSUMER_PARTITION");
public static final String ERROR_OCCURRED_IN_SUBSCRIBER_ERROR = getMessage("ERROR_OCCURRED_IN_SUBSCRIBER_ERROR");
public static final String EXCEPTION_OCCURRED_WHILE_EMITTING = getMessage("EXCEPTION_OCCURRED_WHILE_EMITTING");

private static synchronized Properties getProperties() {
if (properties != null) {
return properties;
}
properties = new Properties();
try (InputStream inputStream =
Thread.currentThread().getContextClassLoader().getResourceAsStream(MESSAGES_PROPERTIES_PATH)) {
if (inputStream != null) {
properties.load(inputStream);
} else {
LOGGER.error("Message properties [{}] not found", MESSAGES_PROPERTIES_PATH); //NON-NLS
}
} catch (IOException exception) {
LOGGER.error("Error loading message properties [{}]", MESSAGES_PROPERTIES_PATH, exception); //NON-NLS
}
return properties;
}

/**
* @param key the key of the message to retrieve
* @return the message matching the given key
*/
public static String getMessage(String key) {
return String.valueOf(getProperties().getOrDefault(key, key));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void loadBalance() {
Mono.zip(partitionOwnershipMono, partitionsMono)
.flatMap(this::loadBalance)
// if there was an error, log warning and TODO: call user provided error handler
.doOnError(ex -> logger.warning("Load balancing for event processor failed - {}", ex.getMessage()))
.doOnError(ex -> logger.warning(Messages.LOAD_BALANCING_FAILED, ex.getMessage()))
.subscribe();
}

Expand Down Expand Up @@ -332,7 +332,7 @@ private void claimOwnership(final Map<String, PartitionOwnership> partitionOwner
.doOnNext(partitionOwnership -> logger.info("Successfully claimed ownership of partition {}",
partitionOwnership.getPartitionId()))
.doOnError(ex -> logger
.warning("Failed to claim ownership of partition {} - {}", ownershipRequest.getPartitionId(),
.warning(Messages.FAILED_TO_CLAIM_OWNERSHIP, ownershipRequest.getPartitionId(),
ex.getMessage(), ex))
.collectList()
.zipWith(checkpointStore.listCheckpoints(fullyQualifiedNamespace, eventHubName, consumerGroupName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void stopAllPartitionPumps() {
try {
eventHubConsumer.close();
} catch (Exception ex) {
logger.warning("Failed to close consumer for partition {}", partitionId, ex);
logger.warning(Messages.FAILED_CLOSE_CONSUMER_PARTITION, partitionId, ex);
} finally {
partitionPumps.remove(partitionId);
}
Expand Down Expand Up @@ -168,7 +168,7 @@ private void handleProcessingError(PartitionOwnership claimedOwnership, Partitio
// also fails just log and continue
partitionProcessor.processError(new ErrorContext(partitionContext, error));
} catch (Exception ex) {
logger.warning("Failed while processing error {}", claimedOwnership.getPartitionId(), ex);
logger.warning(Messages.FAILED_WHILE_PROCESSING_ERROR, claimedOwnership.getPartitionId(), ex);
}
}

Expand All @@ -186,7 +186,7 @@ private void handleReceiveError(PartitionOwnership claimedOwnership, EventHubCon
}
partitionProcessor.close(new CloseContext(partitionContext, closeReason));
} catch (Exception ex) {
logger.warning("Failed while processing error on receive {}", claimedOwnership.getPartitionId(), ex);
logger.warning(Messages.FAILED_PROCESSING_ERROR_RECEIVE, claimedOwnership.getPartitionId(), ex);
} finally {
try {
// close the consumer
Expand Down Expand Up @@ -225,12 +225,12 @@ private void endProcessTracingSpan(Context processSpanContext, Signal<Void> sign
close.close();
tracerProvider.endSpan(processSpanContext, signal);
} catch (IOException ioException) {
logger.error("EventProcessor.run() endTracingSpan().close() failed with an error %s", ioException);
logger.error(Messages.EVENT_PROCESSOR_RUN_END, ioException);
}

} else {
logger.warning(String.format(Locale.US,
"Process span scope type is not of type Closeable, but type: %s. Not closing the scope and span",
Messages.PROCESS_SPAN_SCOPE_TYPE_ERROR,
spanScope.get() != null ? spanScope.getClass() : "null"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.messaging.eventhubs.implementation;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.Messages;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
Expand Down Expand Up @@ -69,7 +70,7 @@ protected void hookOnComplete() {
*/
@Override
protected void hookOnError(Throwable throwable) {
logger.error("Error occurred in subscriber. Error: {}", throwable);
logger.error(Messages.ERROR_OCCURRED_IN_SUBSCRIBER_ERROR, throwable);
work.error(throwable);
dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.messaging.eventhubs.implementation;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.Messages;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import reactor.core.publisher.FluxSink;

Expand Down Expand Up @@ -88,7 +89,7 @@ public void next(PartitionEvent event) {
emitter.next(event);
remaining.decrementAndGet();
} catch (Exception e) {
logger.warning("Exception occurred while emitting next received event.", e);
logger.warning(Messages.EXCEPTION_OCCURRED_WHILE_EMITTING, e);
isTerminal = true;
emitter.error(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
PROCESS_SPAN_SCOPE_TYPE_ERROR=Process span scope type is not of type Closeable, but type: %s. Not closing the scope and span
MESSAGE_NOT_OF_TYPE=Message body type is not of type Data, but type: %s. Not setting body contents.
REQUEST_VALUE_NOT_VALID=Back pressure request value not valid. It must be between {} and {}.
EVENT_DATA_DOES_NOT_FIT=EventData does not fit into maximum number of batches. '%s'
CANNOT_SEND_EVENT_BATCH_EMPTY=Cannot send an EventBatch that is empty.
ERROR_SENDING_BATCH=Error sending batch.
FAILED_TO_CLAIM_OWNERSHIP=Failed to claim ownership of partition {} - {}
LOAD_BALANCING_FAILED=Load balancing for event processor failed - {}
EVENT_PROCESSOR_RUN_END=EventProcessor.run() endTracingSpan().close() failed with an error %s
FAILED_PROCESSING_ERROR_RECEIVE=Failed while processing error on receive {}
FAILED_WHILE_PROCESSING_ERROR=Failed while processing error {}
FAILED_CLOSE_CONSUMER_PARTITION=Failed to close consumer for partition {}
ERROR_OCCURRED_IN_SUBSCRIBER_ERROR=Error occurred in subscriber. Error: {}
EXCEPTION_OCCURRED_WHILE_EMITTING=Exception occurred while emitting next received event.
CLASS_NOT_A_SUPPORTED_TYPE=Class '%s' is not a supported deserializable type.
ENCODING_TYPE_NOT_SUPPORTED=Encoding Type: %s is not supported
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.lang.reflect.Field;
import java.util.Objects;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

class MessagesTest {

static Stream<String> keys() {
return Stream.of(Messages.class.getFields()).map(Field::getName);
}

@ParameterizedTest
@MethodSource("keys")
void getMessage(String messageKey) {
assertNotEquals(messageKey, Messages.getMessage(messageKey));
}

@ParameterizedTest
@MethodSource("keys")
void messageField(String messageKey) throws NoSuchFieldException, IllegalAccessException {
Field field = Messages.class.getField(messageKey);
field.setAccessible(true);
assertEquals(Messages.getMessage(messageKey), Objects.toString(field.get(Messages.class)));
}
}