Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/servicebus/microsoft-azure-servicebus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>3.1.0</version>
<version>3.1.1</version>

<name>Microsoft Azure SDK for Service Bus</name>
<description>Java library for Azure Service Bus</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void onLinkRemoteOpen(Event event) {
this.amqpReceiver.onOpenComplete(null);
}
} else {
TRACE_LOGGER.debug("onLinkRemoteOpen: linkName:{}, remoteTarget:{}, remoteTarget:{}, action:{}", receiver.getName(), null, null, "waitingForError");
TRACE_LOGGER.debug("onLinkRemoteOpen: linkName:{}, remoteSource:{}, remoteTarget:{}, action:{}", receiver.getName(), null, null, "waitingForError");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,18 @@ protected boolean getIsClosingOrClosed() {
// used to force close when entity is faulted
protected final void setClosed() {
synchronized (this.syncClose) {
this.isClosing = false;
this.isClosed = true;
}
}

protected final void setClosing() {
synchronized (this.syncClose) {
if (!this.isClosed) {
this.isClosing = true;
}
}
}

public final CompletableFuture<Void> closeAsync() {
if (this.getIsClosingOrClosed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver,
private CompletableFuture<Void> receiveLinkReopenFuture;
private final Runnable timedOutUpdateStateRequestsDaemon;
private final Runnable returnMesagesLoopDaemon;
private final ScheduledFuture<?> updateStateRequestsTimeoutChecker;
private final ScheduledFuture<?> returnMessagesLoopRunner;
private final MessagingEntityType entityType;
private ScheduledFuture<?> updateStateRequestsTimeoutChecker;
private ScheduledFuture<?> returnMessagesLoopRunner;

// TODO: Change onReceiveComplete to handle empty deliveries. Change onError to retry updateState requests.
private CoreMessageReceiver(final MessagingFactory factory,
Expand Down Expand Up @@ -145,6 +145,11 @@ private CoreMessageReceiver(final MessagingFactory factory,

this.timedOutUpdateStateRequestsDaemon = () -> {
try {
if (CoreMessageReceiver.this.getIsClosed()) {
CoreMessageReceiver.this.updateStateRequestsTimeoutChecker.cancel(true);
return;
}

TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to complete timed out update state requests.", CoreMessageReceiver.this.receivePath);
for (Map.Entry<String, UpdateStateWorkItem> entry : CoreMessageReceiver.this.pendingUpdateStateRequests.entrySet()) {
Duration remainingTime = entry.getValue().getTimeoutTracker().remaining();
Expand All @@ -167,6 +172,11 @@ private CoreMessageReceiver(final MessagingFactory factory,
// CONTRACT: message should be delivered to the caller of MessageReceiver.receive() only from prefetched messages
this.returnMesagesLoopDaemon = () -> {
try {
if (CoreMessageReceiver.this.getIsClosed()) {
CoreMessageReceiver.this.returnMessagesLoopRunner.cancel(true);
return;
}

TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to return messages to waiting clients.", CoreMessageReceiver.this.receivePath);
while (!CoreMessageReceiver.this.prefetchedMessages.isEmpty()) {
ReceiveWorkItem currentReceive = CoreMessageReceiver.this.pendingReceives.poll();
Expand All @@ -187,11 +197,6 @@ private CoreMessageReceiver(final MessagingFactory factory,
// Shouldn't throw any exception for the executor to run multiple times.. Should never come here
}
};

// As all update state requests have the same timeout, one timer is better than having one timer per request
this.updateStateRequestsTimeoutChecker = Timer.schedule(timedOutUpdateStateRequestsDaemon, CoreMessageReceiver.UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
// Scheduling it as a separate thread that wakes up at regular very short intervals.. Doesn't wait on incoming receive requests from callers or incoming deliveries from reactor
this.returnMessagesLoopRunner = Timer.schedule(returnMesagesLoopDaemon, CoreMessageReceiver.RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
}

// Connection has to be associated with Reactor before Creating a receiver on it.
Expand Down Expand Up @@ -522,6 +527,11 @@ public void onOpenComplete(Exception exception) {
if (exception == null) {
if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
AsyncUtil.completeFuture(this.linkOpen.getWork(), this);

// As all update state requests have the same timeout, one timer is better than having one timer per request
this.updateStateRequestsTimeoutChecker = Timer.schedule(timedOutUpdateStateRequestsDaemon, CoreMessageReceiver.UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
// Scheduling it as a separate thread that wakes up at regular very short intervals.. Doesn't wait on incoming receive requests from callers or incoming deliveries from reactor
this.returnMessagesLoopRunner = Timer.schedule(returnMesagesLoopDaemon, CoreMessageReceiver.RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
}

if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
Expand Down Expand Up @@ -746,14 +756,15 @@ private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) {
Timer.schedule(
() -> {
if (!linkOpen.getWork().isDone()) {
CoreMessageReceiver.this.closeInternals(false);
CoreMessageReceiver.this.setClosed();

Exception operationTimedout = new TimeoutException(
String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageReceiver.this.receiveLink.getName(), CoreMessageReceiver.this.receivePath, ZonedDateTime.now()),
CoreMessageReceiver.this.lastKnownLinkError);
TRACE_LOGGER.warn(operationTimedout.getMessage());
ExceptionUtil.completeExceptionally(linkOpen.getWork(), operationTimedout, CoreMessageReceiver.this, true);

CoreMessageReceiver.this.setClosing();
CoreMessageReceiver.this.closeInternals(false);
CoreMessageReceiver.this.setClosed();
}
},
timeout.remaining(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,14 +618,15 @@ private void initializeLinkOpen(TimeoutTracker timeout) {
Timer.schedule(
() -> {
if (!CoreMessageSender.this.linkFirstOpen.isDone()) {
CoreMessageSender.this.closeInternals(false);
CoreMessageSender.this.setClosed();

Exception operationTimedout = new TimeoutException(
String.format(Locale.US, "Open operation on SendLink(%s) on Entity(%s) timed out at %s.", CoreMessageSender.this.sendLink.getName(), CoreMessageSender.this.getSendPath(), ZonedDateTime.now().toString()),
CoreMessageSender.this.lastKnownErrorReportedAt.isAfter(Instant.now().minusSeconds(ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS)) ? CoreMessageSender.this.lastKnownLinkError : null);
TRACE_LOGGER.warn(operationTimedout.getMessage());
ExceptionUtil.completeExceptionally(CoreMessageSender.this.linkFirstOpen, operationTimedout, CoreMessageSender.this, true);

CoreMessageSender.this.setClosing();
CoreMessageSender.this.closeInternals(false);
CoreMessageSender.this.setClosed();
}
},
timeout.remaining(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ static int encodeMessageToCustomArray(Message message, byte[] encodedBytes, int

// Pass little less than client timeout to the server so client doesn't time out before server times out
public static Duration adjustServerTimeout(Duration clientTimeout) {
return clientTimeout.minusMillis(100);
return clientTimeout.minusMillis(200);
}

// This is not super stable for some reason
Expand Down