Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -695,9 +695,7 @@ private CompletableFuture<Collection<Instant>> renewMessageLockBatchAsync(UUID[]
for(UUID lockToken : lockTokens) {
if (lockTimeIterator.hasNext()) {
Instant lockedUntilUtc = lockTimeIterator.next();
if (this.requestResponseLockTokensToLockTimesMap.containsKey(lockToken)) {
this.requestResponseLockTokensToLockTimesMap.put(lockToken, lockedUntilUtc);
}
this.requestResponseLockTokensToLockTimesMap.computeIfPresent(lockToken, (k, v) -> lockedUntilUtc);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SpotBugs-P3]

Sequence of calls to java.util.concurrent.ConcurrentHashMap may not be atomic in com.microsoft.azure.servicebus.MessageReceiver.lambda$renewMessageLockBatchAsync$20(UUID[], Collection)

}
}
return newLockedUntilTimes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,22 @@

package com.microsoft.azure.servicebus.amqp;

import org.apache.qpid.proton.amqp.transport.*;

import org.apache.qpid.proton.amqp.transport.ErrorCondition;

/**
* All AmqpExceptions - which EventHub client handles internally.
*/
public class AmqpException extends Exception
{
public class AmqpException extends Exception {
private static final long serialVersionUID = -750417419234273714L;
private ErrorCondition errorCondition;
private transient ErrorCondition errorCondition;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SpotBugs-P3]
This Serializable class defines a non-primitive instance field which is neither transient, Serializable, or java.lang.Object, and does not appear to implement the Externalizable interface or the readObject() and writeObject() methods. Objects of this class will not be deserialized correctly if a non-Serializable object is stored in this field.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ErrorCondition already not serializable, Ignore it with using transient modifier to suppress the SpotBugs error


public AmqpException(ErrorCondition errorCondition)
{
public AmqpException(ErrorCondition errorCondition) {
super(errorCondition.getDescription());
this.errorCondition = errorCondition;
}

public ErrorCondition getError()
{
public ErrorCondition getError() {
return this.errorCondition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,51 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;

// To complete futures using a different thread. Otherwise every future is completed on the single reactor thread
// which badly affects perf and a client can potentially kill the thread or lock the thread.
class AsyncUtil {

public static <T> boolean completeFutureAndGetStatus(CompletableFuture<T> future, T result)
{
public static <T> boolean completeFutureAndGetStatus(CompletableFuture<T> future, T result) {
try {
return MessagingFactory.INTERNAL_THREAD_POOL.submit(new CompleteCallable<T>(future, result)).get();
return MessagingFactory.INTERNAL_THREAD_POOL.submit(new CompleteCallable<>(future, result)).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return false;
}
}

public static <T> void completeFuture(CompletableFuture<T> future, T result)
{
MessagingFactory.INTERNAL_THREAD_POOL.submit(new CompleteCallable<T>(future, result));
public static <T> void completeFuture(CompletableFuture<T> future, T result) {
MessagingFactory.INTERNAL_THREAD_POOL.submit(new CompleteCallable<>(future, result));
}

public static <T> boolean completeFutureExceptionallyAndGetStatus(CompletableFuture<T> future, Throwable exception)
{
public static <T> boolean completeFutureExceptionallyAndGetStatus(CompletableFuture<T> future, Throwable exception) {
try {
return MessagingFactory.INTERNAL_THREAD_POOL.submit(new CompleteExceptionallyCallable<T>(future, exception)).get();
return MessagingFactory.INTERNAL_THREAD_POOL.submit(new CompleteExceptionallyCallable<>(future, exception)).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return false;
}
}

public static <T> void completeFutureExceptionally(CompletableFuture<T> future, Throwable exception)
{
MessagingFactory.INTERNAL_THREAD_POOL.submit(new CompleteExceptionallyCallable<T>(future, exception));
public static <T> void completeFutureExceptionally(CompletableFuture<T> future, Throwable exception) {
MessagingFactory.INTERNAL_THREAD_POOL.submit(new CompleteExceptionallyCallable<>(future, exception));
}

public static void run(Runnable runnable)
{
MessagingFactory.INTERNAL_THREAD_POOL.submit(runnable);
public static void run(Runnable runnable) {
try {
Comment thread
mssfang marked this conversation as resolved.
Outdated
MessagingFactory.INTERNAL_THREAD_POOL.submit(runnable);
} catch (RejectedExecutionException | NullPointerException e) {
e.printStackTrace();
Comment thread
mssfang marked this conversation as resolved.
Outdated
}
}

private static class CompleteCallable<T> implements Callable<Boolean>
{
private static class CompleteCallable<T> implements Callable<Boolean> {
private CompletableFuture<T> future;
private T result;

CompleteCallable(CompletableFuture<T> future, T result)
{
CompleteCallable(CompletableFuture<T> future, T result) {
this.future = future;
this.result = result;
}
Expand All @@ -63,13 +61,11 @@ public Boolean call() throws Exception {
}
}

private static class CompleteExceptionallyCallable<T> implements Callable<Boolean>
{
private static class CompleteExceptionallyCallable<T> implements Callable<Boolean> {
private CompletableFuture<T> future;
private Throwable exception;

CompleteExceptionallyCallable(CompletableFuture<T> future, Throwable exception)
{
CompleteExceptionallyCallable(CompletableFuture<T> future, Throwable exception) {
this.future = future;
this.exception = exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,24 @@

package com.microsoft.azure.servicebus.primitives;

import java.io.Serializable;
import java.util.Locale;

abstract class ErrorContext
{
abstract class ErrorContext implements Serializable {

private static final long serialVersionUID = -6342329018037308640L;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SpotBugs]
Since ServiceBusException has serialVersionUID, it means it needs to implement Serializable. Change the ErrorContext to implements Serializable cause all its sub-extended class to have the serialVersionUID

private final String namespaceName;

ErrorContext(final String namespaceName)
{
ErrorContext(final String namespaceName) {
this.namespaceName = namespaceName;
}

protected String getNamespaceName()
{
protected String getNamespaceName() {
return this.namespaceName;
}

@Override
public String toString()
{
public String toString() {
return StringUtil.isNullOrEmpty(this.namespaceName) ? null : String.format(Locale.US, "NS: %s", this.namespaceName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,186 +16,118 @@
import com.microsoft.azure.servicebus.amqp.AmqpErrorCode;
import com.microsoft.azure.servicebus.amqp.AmqpException;

public final class ExceptionUtil
{
static Exception toException(ErrorCondition errorCondition)
{
if (errorCondition == null)
{
public final class ExceptionUtil {
static Exception toException(ErrorCondition errorCondition) {
if (errorCondition == null) {
throw new IllegalArgumentException("'null' errorCondition cannot be translated to ServiceBusException");
}
if (errorCondition.getCondition() == ClientConstants.TIMEOUT_ERROR)
{
if (errorCondition.getCondition() == ClientConstants.TIMEOUT_ERROR) {
return new TimeoutException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == ClientConstants.SERVER_BUSY_ERROR)
{
} else if (errorCondition.getCondition() == ClientConstants.SERVER_BUSY_ERROR) {
return new ServerBusyException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == AmqpErrorCode.NotFound)
{
} else if (errorCondition.getCondition() == AmqpErrorCode.NotFound) {
return new MessagingEntityNotFoundException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == ClientConstants.ENTITY_DISABLED_ERROR)
{
} else if (errorCondition.getCondition() == ClientConstants.ENTITY_DISABLED_ERROR) {
return new MessagingEntityDisabledException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == AmqpErrorCode.Stolen)
{
} else if (errorCondition.getCondition() == AmqpErrorCode.Stolen) {
return new ReceiverDisconnectedException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == AmqpErrorCode.UnauthorizedAccess)
{
} else if (errorCondition.getCondition() == AmqpErrorCode.UnauthorizedAccess) {
return new AuthorizationFailedException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == AmqpErrorCode.PayloadSizeExceeded)
{
} else if (errorCondition.getCondition() == AmqpErrorCode.PayloadSizeExceeded) {
return new PayloadSizeExceededException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == AmqpErrorCode.InternalError)
{
} else if (errorCondition.getCondition() == AmqpErrorCode.InternalError) {
return new ServiceBusException(true, new AmqpException(errorCondition));
}
else if (errorCondition.getCondition() == ClientConstants.ARGUMENT_ERROR)
{
} else if (errorCondition.getCondition() == ClientConstants.ARGUMENT_ERROR) {
return new ServiceBusException(false, errorCondition.getDescription(), new AmqpException(errorCondition));
}
else if (errorCondition.getCondition() == ClientConstants.ARGUMENT_OUT_OF_RANGE_ERROR)
{
} else if (errorCondition.getCondition() == ClientConstants.ARGUMENT_OUT_OF_RANGE_ERROR) {
return new ServiceBusException(false, errorCondition.getDescription(), new AmqpException(errorCondition));
}
else if (errorCondition.getCondition() == AmqpErrorCode.NotImplemented)
{
} else if (errorCondition.getCondition() == AmqpErrorCode.NotImplemented) {
return new UnsupportedOperationException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == AmqpErrorCode.NotAllowed)
{
} else if (errorCondition.getCondition() == AmqpErrorCode.NotAllowed) {
return new UnsupportedOperationException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == ClientConstants.PARTITION_NOT_OWNED_ERROR)
{
} else if (errorCondition.getCondition() == ClientConstants.PARTITION_NOT_OWNED_ERROR) {
return new ServiceBusException(false, errorCondition.getDescription());
}
else if (errorCondition.getCondition() == ClientConstants.STORE_LOCK_LOST_ERROR)
{
} else if (errorCondition.getCondition() == ClientConstants.STORE_LOCK_LOST_ERROR) {
return new ServiceBusException(false, errorCondition.getDescription());
}
else if (errorCondition.getCondition() == AmqpErrorCode.AmqpLinkDetachForced)
{
} else if (errorCondition.getCondition() == AmqpErrorCode.AmqpLinkDetachForced) {
return new ServiceBusException(true, new AmqpException(errorCondition));
}
else if (errorCondition.getCondition() == AmqpErrorCode.ConnectionForced)
{
} else if (errorCondition.getCondition() == AmqpErrorCode.ConnectionForced) {
return new ServiceBusException(true, new AmqpException(errorCondition));
}
else if (errorCondition.getCondition() == AmqpErrorCode.FramingError)
{
} else if (errorCondition.getCondition() == AmqpErrorCode.FramingError) {
return new ServiceBusException(true, new AmqpException(errorCondition));
}
else if (errorCondition.getCondition() == AmqpErrorCode.ResourceLimitExceeded)
{
} else if (errorCondition.getCondition() == AmqpErrorCode.ResourceLimitExceeded) {
return new QuotaExceededException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == ClientConstants.MESSAGE_LOCK_LOST_ERROR)
{
} else if (errorCondition.getCondition() == ClientConstants.MESSAGE_LOCK_LOST_ERROR) {
return new MessageLockLostException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == ClientConstants.SESSION_LOCK_LOST_ERROR)
{
} else if (errorCondition.getCondition() == ClientConstants.SESSION_LOCK_LOST_ERROR) {
return new SessionLockLostException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == ClientConstants.SESSIONS_CANNOT_BE_LOCKED_ERROR)
{
} else if (errorCondition.getCondition() == ClientConstants.SESSIONS_CANNOT_BE_LOCKED_ERROR) {
return new SessionCannotBeLockedException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == ClientConstants.MESSAGE_NOT_FOUND_ERROR)
{
} else if (errorCondition.getCondition() == ClientConstants.MESSAGE_NOT_FOUND_ERROR) {
return new MessageNotFoundException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == ClientConstants.ENTITY_ALREADY_EXISTS_ERROR)
{
} else if (errorCondition.getCondition() == ClientConstants.ENTITY_ALREADY_EXISTS_ERROR) {
return new MessagingEntityAlreadyExistsException(errorCondition.getDescription());
}
else if (errorCondition.getCondition() == AmqpErrorCode.DecodeError)
{
} else if (errorCondition.getCondition() == AmqpErrorCode.DecodeError) {
return new ServiceBusException(false, new AmqpException(errorCondition));
}

return new ServiceBusException(ClientConstants.DEFAULT_IS_TRANSIENT, errorCondition.toString());
}

static <T> void completeExceptionally(CompletableFuture<T> future, Throwable exception, IErrorContextProvider contextProvider, boolean completeAsynchronously)
{
if (exception != null && exception instanceof ServiceBusException)
{
static <T> void completeExceptionally(CompletableFuture<T> future, Throwable exception, IErrorContextProvider contextProvider, boolean completeAsynchronously) {
if (exception != null && exception instanceof ServiceBusException) {
ErrorContext errorContext = contextProvider.getContext();
((ServiceBusException) exception).setContext(errorContext);
}

if(completeAsynchronously)
{
if(completeAsynchronously) {
AsyncUtil.completeFutureExceptionally(future, exception);
}
else
{
} else {
future.completeExceptionally(exception);
}
}

// not a specific message related error
static boolean isGeneralError(Symbol amqpError)
{
static boolean isGeneralError(Symbol amqpError) {
return (amqpError == ClientConstants.SERVER_BUSY_ERROR
|| amqpError == ClientConstants.TIMEOUT_ERROR
|| amqpError == AmqpErrorCode.ResourceLimitExceeded);
}

static String getTrackingIDAndTimeToLog()
{
static String getTrackingIDAndTimeToLog() {
return String.format(Locale.US, "TrackingId: %s, at: %s", UUID.randomUUID().toString(), ZonedDateTime.now());
}

static String toStackTraceString(final Throwable exception, final String customErrorMessage)
{
static String toStackTraceString(final Throwable exception, final String customErrorMessage) {
final StringBuilder builder = new StringBuilder();

if (!StringUtil.isNullOrEmpty(customErrorMessage))
{
if (!StringUtil.isNullOrEmpty(customErrorMessage)) {
builder.append(customErrorMessage);
builder.append(System.lineSeparator());
}

builder.append(exception.getMessage());
if (exception.getStackTrace() != null)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SpotBugs-P3]
Redundant nullcheck of Throwable.getStackTrace(), which is known to be non-null in com.microsoft.azure.servicebus.primitives.ExceptionUtil.toStackTraceString(Throwable, String)

for (StackTraceElement ste: exception.getStackTrace())
{
builder.append(System.lineSeparator());
builder.append(ste.toString());
}
for (StackTraceElement ste : exception.getStackTrace()) {
builder.append(System.lineSeparator());
builder.append(ste.toString());
}

Throwable innerException = exception.getCause();
if (innerException != null)
{
if (innerException != null) {
builder.append("Cause: " + innerException.getMessage());
if (innerException.getStackTrace() != null)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SpotBugs-P3]
Redundant nullcheck of Throwable.getStackTrace(), which is known to be non-null in com.microsoft.azure.servicebus.primitives.ExceptionUtil.toStackTraceString(Throwable, String)

for (StackTraceElement ste: innerException.getStackTrace())
{
builder.append(System.lineSeparator());
builder.append(ste.toString());
}
for (StackTraceElement ste : innerException.getStackTrace()) {
builder.append(System.lineSeparator());
builder.append(ste.toString());
}
}

return builder.toString();
}

public static Throwable extractAsyncCompletionCause(Throwable completionEx)
{
if(completionEx instanceof CompletionException || completionEx instanceof ExecutionException)
{
public static Throwable extractAsyncCompletionCause(Throwable completionEx) {
if(completionEx instanceof CompletionException || completionEx instanceof ExecutionException) {
return completionEx.getCause();
}
else
{
} else {
return completionEx;
}
}
Expand Down
Loading