Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.concurrent.Semaphore;
import java.util.function.Function;

import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.SEQUENCE_NUMBER_KEY;

/**
* Flux operator that auto-completes or auto-abandons messages when control is returned successfully.
*/
Expand Down Expand Up @@ -78,7 +80,9 @@ protected void hookOnNext(ServiceBusMessageContext value) {
final ServiceBusReceivedMessage message = value.getMessage();
final String sequenceNumber = message != null ? String.valueOf(message.getSequenceNumber()) : "n/a";

logger.verbose("ON NEXT: Passing message downstream. sequenceNumber[{}]", sequenceNumber);
logger.atVerbose()
.addKeyValue(SEQUENCE_NUMBER_KEY, sequenceNumber)
.log("ON NEXT: Passing message downstream.");
try {
semaphore.acquire();
} catch (InterruptedException e) {
Expand All @@ -89,12 +93,15 @@ protected void hookOnNext(ServiceBusMessageContext value) {
downstream.onNext(value);
applyWithCatch(onComplete, value, "complete");
} catch (Exception e) {
logger.error("Error occurred processing message. Abandoning. sequenceNumber[{}]",
sequenceNumber, e);
logger.atError()
.addKeyValue(SEQUENCE_NUMBER_KEY, sequenceNumber)
.log("Error occurred processing message. Abandoning.", e);

applyWithCatch(onAbandon, value, "abandon");
} finally {
logger.verbose("ON NEXT: Finished. sequenceNumber[{}]", sequenceNumber);
logger.atVerbose()
.addKeyValue(SEQUENCE_NUMBER_KEY, sequenceNumber)
.log("ON NEXT: Finished.");
semaphore.release();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.function.Consumer;
import java.util.function.Function;

import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.LOCK_TOKEN_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.SEQUENCE_NUMBER_KEY;

/**
* Receives messages from to upstream, subscribe lock renewal subscriber.
*/
Expand Down Expand Up @@ -133,12 +136,14 @@ protected void hookOnNext(ServiceBusMessageContext messageContext) {
final LockRenewalOperation renewOperation;

if (Objects.isNull(lockToken)) {
logger.warning("Unexpected, LockToken is not present in message. sequenceNumber[{}].",
message.getSequenceNumber());
logger.atWarning()
.addKeyValue(SEQUENCE_NUMBER_KEY, message.getSequenceNumber())
.log("Unexpected, LockToken is not present in message.");
return;
} else if (Objects.isNull(lockedUntil)) {
logger.warning("Unexpected, lockedUntil is not present in message. sequenceNumber[{}].",
message.getSequenceNumber());
logger.atWarning()
.addKeyValue(SEQUENCE_NUMBER_KEY, message.getSequenceNumber())
.log("Unexpected, lockedUntil is not present in message.");
return;
}

Expand All @@ -155,7 +160,9 @@ protected void hookOnNext(ServiceBusMessageContext messageContext) {
messageLockContainer.addOrUpdate(lockToken, OffsetDateTime.now().plus(maxAutoLockRenewal),
renewOperation);
} catch (Exception e) {
logger.info("Exception occurred while updating lockContainer for token [{}].", lockToken, e);
logger.atInfo()
.addKeyValue(LOCK_TOKEN_KEY, lockToken)
.log("Exception occurred while updating lockContainer.", e);
}

lockCleanup = context -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.LOCK_TOKEN_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.MAX_RENEWAL_BUFFER_DURATION;

/**
* Represents a renewal session or message lock renewal operation that.
*/
class LockRenewalOperation implements AutoCloseable {
private final ClientLogger logger = new ClientLogger(LockRenewalOperation.class);
private final ClientLogger logger;
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final AtomicReference<OffsetDateTime> lockedUntil = new AtomicReference<>();
private final AtomicReference<Throwable> throwable = new AtomicReference<>();
Expand Down Expand Up @@ -67,6 +70,11 @@ class LockRenewalOperation implements AutoCloseable {
Objects.requireNonNull(tokenLockedUntil, "'lockedUntil cannot be null.'");
Objects.requireNonNull(maxLockRenewalDuration, "'maxLockRenewalDuration' cannot be null.");

Map<String, Object> loggingContext = new HashMap<>(2);
loggingContext.put(LOCK_TOKEN_KEY, lockToken);
loggingContext.put("isSession", isSession);
this.logger = new ClientLogger(LockRenewalOperation.class, loggingContext);

if (maxLockRenewalDuration.isNegative()) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"'maxLockRenewalDuration' cannot be negative."));
Expand All @@ -82,13 +90,13 @@ class LockRenewalOperation implements AutoCloseable {
this.completionMono = renewLockOperation.then();
this.subscription = renewLockOperation.subscribe(until -> this.lockedUntil.set(until),
error -> {
logger.error("token[{}]. Error occurred while renewing lock token.", error);
logger.error("Error occurred while renewing lock token.", error);
Comment thread
lmolkova marked this conversation as resolved.
status.set(LockRenewalStatus.FAILED);
throwable.set(error);
cancellationProcessor.onComplete();
}, () -> {
if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) {
logger.verbose("token[{}]. Renewing session lock task completed.", lockToken);
logger.verbose("Renewing session lock task completed.");
}

cancellationProcessor.onComplete();
Expand Down Expand Up @@ -159,7 +167,7 @@ public void close() {
}

if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.CANCELLED)) {
logger.verbose("token[{}] Cancelled operation.", lockToken);
logger.verbose("Cancelled operation.");
}

cancellationProcessor.onComplete();
Expand Down Expand Up @@ -191,13 +199,17 @@ private Flux<OffsetDateTime> getRenewLockOperation(OffsetDateTime initialLockedU
.thenReturn(Flux.create(s -> s.next(interval)))))
.takeUntilOther(cancellationSignals)
.flatMap(delay -> {
logger.info("token[{}]. now[{}]. Starting lock renewal.", lockToken, OffsetDateTime.now());
logger.info("Starting lock renewal.");

return renewalOperation.apply(lockToken);
})
.map(offsetDateTime -> {
final Duration next = Duration.between(OffsetDateTime.now(), offsetDateTime);
logger.info("token[{}]. nextExpiration[{}]. next: [{}]. isSession[{}]", lockToken, offsetDateTime, next,
isSession);

logger.atInfo()
.addKeyValue("nextExpiration", offsetDateTime)
.addKeyValue("next", next)
.log("Starting lock renewal.");

sink.next(calculateRenewalDelay(offsetDateTime));
return offsetDateTime;
Expand All @@ -213,16 +225,20 @@ private Duration calculateRenewalDelay(OffsetDateTime initialLockedUntil) {
// C# class : github.com/azure-sdk-for-net/.../Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs#L367

if (remainingTime.toMillis() < 400) {
logger.info("Duration was less than 400ms. now[{}] lockedUntil[{}]", now, initialLockedUntil);
logger.atInfo()
.addKeyValue("lockedUntil", initialLockedUntil)
.log("Duration was less than 400ms.");
return Duration.ZERO;
} else {
// Adjust the interval, so we can buffer time for the time it'll take to refresh.
final long bufferInMilliSec = Math.min(remainingTime.toMillis() / 2,
MAX_RENEWAL_BUFFER_DURATION.toMillis());
final Duration renewAfter = Duration.ofMillis(remainingTime.toMillis() - bufferInMilliSec);
if (renewAfter.isNegative()) {
logger.info("Adjusted duration is negative. renewAfter: {}ms. Buffer: {}ms.",
remainingTime.toMillis(), bufferInMilliSec);
logger.atInfo()
.addKeyValue("renewAfter", remainingTime.toMillis())
.addKeyValue("buffer", bufferInMilliSec)
.log("Adjusted duration is negative.");
}
return renewAfter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import java.util.function.Consumer;
import java.util.regex.Pattern;

import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;

/**
* The builder to create Service Bus clients:
*
Expand Down Expand Up @@ -291,9 +293,12 @@ public ServiceBusClientBuilder connectionString(String connectionString) {

this.fullyQualifiedNamespace = properties.getEndpoint().getHost();

if (properties.getEntityPath() != null && !properties.getEntityPath().isEmpty()) {
logger.info("Setting 'entityName' [{}] from connectionString.", properties.getEntityPath());
this.connectionStringEntityName = properties.getEntityPath();
String entityPath = properties.getEntityPath();
if (CoreUtils.isNullOrEmpty(entityPath)) {
logger.atInfo()
.addKeyValue(ENTITY_PATH_KEY, entityPath)
.log("Setting entity from connection string.");
this.connectionStringEntityName = entityPath;
}

return credential(properties.getEndpoint().getHost(), tokenCredential);
Expand Down Expand Up @@ -610,17 +615,22 @@ public ServiceBusSessionProcessorClientBuilder sessionProcessor() {
void onClientClose() {
synchronized (connectionLock) {
final int numberOfOpenClients = openClients.decrementAndGet();
logger.info("Closing a dependent client. # of open clients: {}", numberOfOpenClients);
logger.atInfo()
.addKeyValue("numberOfOpenClients", numberOfOpenClients)
.log("Closing a dependent client.");

if (numberOfOpenClients > 0) {
return;
}

if (numberOfOpenClients < 0) {
logger.warning("There should not be less than 0 clients. actual: {}", numberOfOpenClients);
logger.atWarning()
.addKeyValue("numberOfOpenClients", numberOfOpenClients)
.log("There should not be less than 0 clients.");
}

logger.info("No more open clients, closing shared connection [{}].", sharedConnection);
logger.info("No more open clients, closing shared connection.");

if (sharedConnection != null) {
sharedConnection.dispose();
sharedConnection = null;
Expand Down
Loading