Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.StringUtil;
Expand Down Expand Up @@ -1255,12 +1254,9 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() {
})
.repeat();

final LinkErrorContext context = new LinkErrorContext(fullyQualifiedNamespace, entityPath, linkName,
null);
final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(connectionProcessor.getRetryOptions());
final ServiceBusReceiveLinkProcessor linkMessageProcessor = receiveLink.subscribeWith(
new ServiceBusReceiveLinkProcessor(receiverOptions.getPrefetchCount(), retryPolicy, connectionProcessor,
context));
new ServiceBusReceiveLinkProcessor(receiverOptions.getPrefetchCount(), retryPolicy));
final ServiceBusAsyncConsumer newConsumer = new ServiceBusAsyncConsumer(linkName, linkMessageProcessor,
messageSerializer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ class UnnamedSessionReceiver implements AutoCloseable {
final ServiceBusReceivedMessage deserialized = messageSerializer.deserialize(message,
ServiceBusReceivedMessage.class);

//TODO (conniey): For session receivers, do they have a message lock token?
if (!CoreUtils.isNullOrEmpty(deserialized.getLockToken())) {
lockContainer.addOrUpdate(deserialized.getLockToken(), deserialized.getLockedUntil());
} else {
logger.info("sessionId[{}] message[{}]. There is no lock token.",
deserialized.getSessionId(), deserialized.getMessageId());
}

return new ServiceBusReceivedMessageContext(deserialized);
Expand All @@ -101,9 +105,12 @@ class UnnamedSessionReceiver implements AutoCloseable {
return;
}

final String token = CoreUtils.isNullOrEmpty(context.getMessage().getLockToken())
? context.getMessage().getLockToken()
final ServiceBusReceivedMessage message = context.getMessage();
final String token = !CoreUtils.isNullOrEmpty(message.getLockToken())
? message.getLockToken()
: "";

logger.info("Received sessionId[{}] messageId[{}]", context.getSessionId(), message.getMessageId());
Comment thread
conniey marked this conversation as resolved.
Outdated
messageReceivedSink.next(token);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
Expand Down Expand Up @@ -42,27 +41,32 @@ public class ServiceBusReceiveLinkProcessor extends FluxProcessor<ServiceBusRece
implements Subscription {
private final ClientLogger logger = new ClientLogger(ServiceBusReceiveLinkProcessor.class);
private final Object lock = new Object();
private final Object queueLock = new Object();
private final AtomicBoolean isTerminated = new AtomicBoolean();
private final AtomicInteger retryAttempts = new AtomicInteger();
private final Deque<Message> messageQueue = new ConcurrentLinkedDeque<>();
private final AtomicBoolean hasFirstLink = new AtomicBoolean();
private final AtomicBoolean linkCreditsAdded = new AtomicBoolean();
private final AtomicReference<String> linkName = new AtomicReference<>();

// Queue containing all the prefetched messages.
private final Deque<Message> messageQueue = new ConcurrentLinkedDeque<>();
private final int minimumNumberOfMessages;
private final int prefetch;

private final AtomicReference<CoreSubscriber<? super Message>> downstream = new AtomicReference<>();
private final AtomicInteger wip = new AtomicInteger();

private final int prefetch;
private final AmqpRetryPolicy retryPolicy;
private final Disposable parentConnection;
private final AmqpErrorContext errorContext;

private volatile Throwable lastError;
private volatile boolean isCancelled;
private volatile ServiceBusReceiveLink currentLink;
private volatile Disposable currentLinkSubscriptions;
private volatile Disposable retrySubscription;

// size() on Deque is O(n) operation, so we use an integer to keep track. All reads and writes to this are gated by
// the `queueLock`.
private volatile int pendingMessages;

// Opting to use AtomicReferenceFieldUpdater because Project Reactor provides utility methods that calculates
// backpressure requests, sets the upstream correctly, and reports its state.
private volatile long requested;
Expand All @@ -78,39 +82,28 @@ public class ServiceBusReceiveLinkProcessor extends FluxProcessor<ServiceBusRece
*
* @param prefetch The number if messages to initially fetch.
* @param retryPolicy Retry policy to apply when fetching a new AMQP channel.
* @param parentConnection Represents the parent connection.
*
* @throws NullPointerException if {@code retryPolicy} is null.
* @throws IllegalArgumentException if {@code prefetch} is less than 0.
*/
public ServiceBusReceiveLinkProcessor(int prefetch, AmqpRetryPolicy retryPolicy,
Disposable parentConnection, AmqpErrorContext errorContext) {
public ServiceBusReceiveLinkProcessor(int prefetch, AmqpRetryPolicy retryPolicy) {
this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");
this.parentConnection = Objects.requireNonNull(parentConnection, "'parentConnection' cannot be null.");
this.errorContext = errorContext;

if (prefetch <= 0) {
if (prefetch < 0) {
throw logger.logExceptionAsError(
new IllegalArgumentException("'prefetch' cannot be less than or equal to 0."));
new IllegalArgumentException("'prefetch' cannot be less than 0."));
}

this.prefetch = prefetch;

// When the queue has this number of messages left, it's time to add more credits to refill the prefetch queue.
this.minimumNumberOfMessages = Math.floorDiv(prefetch, 3);
}

public String getLinkName() {
return linkName.get();
}

/**
* Gets the error context associated with this link.
*
* @return the error context associated with this link.
*/
public AmqpErrorContext getErrorContext() {
return errorContext;
}


public Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryState) {
if (isDisposed()) {
return monoError(logger, new IllegalStateException(String.format(
Expand All @@ -123,7 +116,15 @@ public Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryStat
"lockToken[%s]. state[%s]. Cannot update disposition with no link.", lockToken, deliveryState)));
}

return link.updateDisposition(lockToken, deliveryState);
return link.updateDisposition(lockToken, deliveryState)
.then(Mono.fromRunnable(() -> {
// Check if we should add more credits.
synchronized (queueLock) {
pendingMessages--;
}

checkAndAddCredits(link);
}));
}

/**
Expand Down Expand Up @@ -196,17 +197,20 @@ public void onNext(ServiceBusReceiveLink next) {
oldSubscription = currentLinkSubscriptions;

currentLink = next;
next.setEmptyCreditListener(() -> {
final int creditsToAdd = getCreditsToAdd(0);
linkCreditsAdded.set(creditsToAdd > 0);

if (!hasFirstLink.getAndSet(true)) {
linkCreditsAdded.set(true);
next.addCredits(prefetch);
}

next.setEmptyCreditListener(() -> getCreditsToAdd());
return creditsToAdd;
});

currentLinkSubscriptions = Disposables.composite(
next.receive().publishOn(Schedulers.boundedElastic()).subscribe(message -> {
messageQueue.add(message);
synchronized (queueLock) {
messageQueue.add(message);
pendingMessages++;
}

drain();
}),
next.getEndpointStates().subscribe(
Expand All @@ -221,9 +225,11 @@ public void onNext(ServiceBusReceiveLink next) {
onError(error);
},
() -> {
if (parentConnection.isDisposed() || isTerminated()
|| upstream == Operators.cancelledSubscription()) {
logger.info("Terminal state reached. Disposing of link processor.");
if (isTerminated()) {
logger.info("Processor is terminated. Disposing of link processor.");
dispose();
} else if (upstream == Operators.cancelledSubscription()) {
logger.info("Upstream has completed. Disposing of link processor.");
dispose();
} else {
logger.info("Receive link endpoint states are closed. Requesting another.");
Expand All @@ -239,6 +245,8 @@ public void onNext(ServiceBusReceiveLink next) {
}));
}

checkAndAddCredits(next);

if (oldChannel != null) {
oldChannel.dispose();
}
Expand Down Expand Up @@ -313,7 +321,7 @@ public void onError(Throwable throwable) {
final String linkName = link != null ? link.getLinkName() : "n/a";
final String entityPath = link != null ? link.getEntityPath() : "n/a";

if (retryInterval != null && !parentConnection.isDisposed() && upstream != Operators.cancelledSubscription()) {
if (retryInterval != null && upstream != Operators.cancelledSubscription()) {
logger.warning("linkName[{}] entityPath[{}]. Transient error occurred. Attempt: {}. Retrying after {} ms.",
linkName, entityPath, attempt, retryInterval.toMillis(), throwable);

Expand All @@ -322,10 +330,6 @@ public void onError(Throwable throwable) {
return;
}

if (parentConnection.isDisposed()) {
logger.info("Parent connection is disposed. Not reopening on error.");
}

logger.warning("linkName[{}] entityPath[{}]. Non-retryable error occurred in AMQP receive link.",
linkName, entityPath, throwable);
lastError = throwable;
Expand Down Expand Up @@ -371,12 +375,11 @@ public void request(long request) {
Operators.addCap(REQUESTED, this, request);

final AmqpReceiveLink link = currentLink;
if (link != null && !linkCreditsAdded.getAndSet(true)) {
int credits = getCreditsToAdd();
logger.info("Link credits not yet added. Adding: {}", credits);
link.addCredits(credits);
if (link == null) {
return;
}

checkAndAddCredits(link);
drain();
}

Expand Down Expand Up @@ -469,14 +472,19 @@ private void drainQueue() {
break;
}

Message message = messageQueue.poll();
final Message message = messageQueue.poll();
if (message == null) {
break;
}

if (isCancelled) {
Operators.onDiscard(message, subscriber.currentContext());
Operators.onDiscardQueueWithClear(messageQueue, subscriber.currentContext(), null);

synchronized (queueLock) {
Operators.onDiscardQueueWithClear(messageQueue, subscriber.currentContext(), null);
pendingMessages = 0;
}

return;
}

Expand Down Expand Up @@ -515,22 +523,62 @@ private boolean checkAndSetTerminated() {
currentLink.dispose();
}

messageQueue.clear();
synchronized (queueLock) {
messageQueue.clear();
pendingMessages = 0;
}

return true;
}

private int getCreditsToAdd() {
private void checkAndAddCredits(AmqpReceiveLink link) {
if (link == null) {
return;
}

// Credits have already been added to the link. We won't try again.
if (linkCreditsAdded.getAndSet(true)) {
return;
}

final int credits = getCreditsToAdd(link.getCredits());
linkCreditsAdded.set(credits > 0);

logger.info("Link credits to add. Credits: '{}'", credits);

if (credits > 0) {
link.addCredits(credits);
}
}

private int getCreditsToAdd(int linkCredits) {
final CoreSubscriber<? super Message> subscriber = downstream.get();
final long r = requested;
final boolean hasBackpressure = r != Long.MAX_VALUE;

if (subscriber == null || r == 0) {
logger.info("Not adding credits. No downstream subscribers or items requested.");
linkCreditsAdded.set(false);
return 0;
}

linkCreditsAdded.set(true);
final int creditsToAdd;
if (messageQueue.isEmpty() && !hasBackpressure) {
creditsToAdd = prefetch;
} else {
synchronized (queueLock) {
final int pending = pendingMessages + linkCredits;

if (hasBackpressure) {
creditsToAdd = Math.max(Long.valueOf(r).intValue() - pending, 0);
} else {
// If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full.
creditsToAdd = minimumNumberOfMessages >= pendingMessages
? Math.max(prefetch - pending, 1)
: 0;
}
}
}

// If there is no back pressure, always add 1. Otherwise, add whatever is requested.
return r == Long.MAX_VALUE ? 1 : Long.valueOf(r).intValue();
return creditsToAdd;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
Expand All @@ -22,7 +21,6 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -61,8 +59,6 @@ class ServiceBusAsyncConsumerTest {
@Mock
private AmqpRetryPolicy retryPolicy;
@Mock
private Disposable parentConnection;
@Mock
private MessageSerializer serializer;

@BeforeAll
Expand All @@ -83,8 +79,7 @@ void setup(TestInfo testInfo) {

when(link.getEndpointStates()).thenReturn(endpointStateFlux);
when(link.receive()).thenReturn(messageFlux);
linkProcessor = linkFlux.subscribeWith(new ServiceBusReceiveLinkProcessor(10, retryPolicy,
parentConnection, new AmqpErrorContext("a-namespace")));
linkProcessor = linkFlux.subscribeWith(new ServiceBusReceiveLinkProcessor(10, retryPolicy));

when(connection.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE)));
when(link.updateDisposition(anyString(), any(DeliveryState.class))).thenReturn(Mono.empty());
Expand Down
Loading