Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class AmqpChannelProcessor<T> extends Mono<T> implements Processor<T, T>,

private final Object lock = new Object();
private final AmqpRetryPolicy retryPolicy;
private final String connectionId;
private final String fullyQualifiedNamespace;
private final String entityPath;
private final Function<T, Flux<AmqpEndpointState>> endpointStatesFunction;

Expand All @@ -48,9 +48,10 @@ public class AmqpChannelProcessor<T> extends Mono<T> implements Processor<T, T>,
private volatile Disposable connectionSubscription;
private volatile Disposable retrySubscription;

public AmqpChannelProcessor(String connectionId, String entityPath,
public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath,
Function<T, Flux<AmqpEndpointState>> endpointStatesFunction, AmqpRetryPolicy retryPolicy, ClientLogger logger) {
this.connectionId = Objects.requireNonNull(connectionId, "'connectionId' cannot be null.");
this.fullyQualifiedNamespace = Objects
.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
this.endpointStatesFunction = Objects.requireNonNull(endpointStatesFunction,
"'endpointStates' cannot be null.");
Expand All @@ -71,7 +72,7 @@ public void onSubscribe(Subscription subscription) {

@Override
public void onNext(T amqpChannel) {
logger.info("connectionId[{}] entityPath[{}]: Setting next AMQP channel.", connectionId, entityPath);
logger.info("namespace[{}] entityPath[{}]: Setting next AMQP channel.", fullyQualifiedNamespace, entityPath);

Objects.requireNonNull(amqpChannel, "'amqpChannel' cannot be null.");

Expand All @@ -84,7 +85,8 @@ public void onNext(T amqpChannel) {
currentChannel = amqpChannel;

final ConcurrentLinkedDeque<ChannelSubscriber<T>> currentSubscribers = subscribers;
subscribers = new ConcurrentLinkedDeque<>();
logger.info("namespace[{}] entityPath[{}]: Next AMQP channel received, updating {} current "
+ "subscribers: {}", fullyQualifiedNamespace, entityPath, subscribers.size(), subscribers);

currentSubscribers.forEach(subscription -> subscription.onNext(amqpChannel));

Expand All @@ -93,6 +95,8 @@ public void onNext(T amqpChannel) {
// Connection was successfully opened, we can reset the retry interval.
if (state == AmqpEndpointState.ACTIVE) {
retryAttempts.set(0);
logger.info("namespace[{}] entityPath[{}]: Channel is now active.",
fullyQualifiedNamespace, entityPath);
}
},
error -> {
Expand All @@ -101,9 +105,11 @@ public void onNext(T amqpChannel) {
},
() -> {
if (isDisposed()) {
logger.info("Channel is disposed.");
logger.info("namespace[{}] entityPath[{}]: Channel is disposed.",
fullyQualifiedNamespace, entityPath);
} else {
logger.info("Channel closed.");
logger.info("namespace[{}] entityPath[{}]: Channel is closed.",
fullyQualifiedNamespace, entityPath);
setAndClearChannel();
}
});
Expand Down Expand Up @@ -179,6 +185,8 @@ public void onError(Throwable throwable) {
synchronized (lock) {
final ConcurrentLinkedDeque<ChannelSubscriber<T>> currentSubscribers = subscribers;
subscribers = new ConcurrentLinkedDeque<>();
logger.info("namespace[{}] entityPath[{}]: Error in AMQP channel processor. Notifying {} "
+ "subscribers.", fullyQualifiedNamespace, entityPath, currentSubscribers.size());

currentSubscribers.forEach(subscriber -> subscriber.onError(throwable));
}
Expand All @@ -192,7 +200,8 @@ public void onComplete() {
synchronized (lock) {
final ConcurrentLinkedDeque<ChannelSubscriber<T>> currentSubscribers = subscribers;
subscribers = new ConcurrentLinkedDeque<>();

logger.info("namespace[{}] entityPath[{}]: AMQP channel processor completed. Notifying {} "
+ "subscribers.", fullyQualifiedNamespace, entityPath, currentSubscribers.size());
currentSubscribers.forEach(subscriber -> subscriber.onComplete());
}
}
Expand All @@ -205,8 +214,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
actual.onError(lastError);
} else {
Operators.error(actual, logger.logExceptionAsError(new IllegalStateException(
String.format("connectionId[%s] entityPath[%s]: Cannot subscribe. Processor is already terminated.",
connectionId, entityPath))));
String.format("namespace[%s] entityPath[%s]: Cannot subscribe. Processor is already terminated.",
fullyQualifiedNamespace, entityPath))));
}

return;
Expand All @@ -223,6 +232,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

subscribers.add(subscriber);
logger.info("Added a subscriber {} to AMQP channel processor. Total "
+ "subscribers = {}", subscriber, subscribers.size());

if (!isRetryPending.get()) {
requestUpstream();
Expand Down Expand Up @@ -253,25 +264,25 @@ public boolean isDisposed() {

private void requestUpstream() {
if (currentChannel != null) {
logger.verbose("connectionId[{}] entityPath[{}]: Connection exists, not requesting another.",
connectionId, entityPath);
logger.verbose("namespace[{}] entityPath[{}]: Connection exists, not requesting another.",
fullyQualifiedNamespace, entityPath);
return;
} else if (isDisposed()) {
logger.verbose("connectionId[{}] entityPath[{}]: Is already disposed.", connectionId, entityPath);
logger.verbose("namespace[{}] entityPath[{}]: Is already disposed.", fullyQualifiedNamespace, entityPath);
return;
}

final Subscription subscription = UPSTREAM.get(this);
if (subscription == null) {
logger.warning("connectionId[{}] entityPath[{}]: There is no upstream subscription.",
connectionId, entityPath);
logger.warning("namespace[{}] entityPath[{}]: There is no upstream subscription.",
fullyQualifiedNamespace, entityPath);
return;
}

// subscribe(CoreSubscriber) may have requested a subscriber already.
if (!isRequested.getAndSet(true)) {
logger.info("connectionId[{}] entityPath[{}]: Connection not requested, yet. Requesting one.",
connectionId, entityPath);
logger.info("namespace[{}] entityPath[{}]: Connection not requested, yet. Requesting one.",
fullyQualifiedNamespace, entityPath);
subscription.request(1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable {
});

this.emitterProcessor = amqpReceiveLinkProcessor
.map(message -> onMessageReceived(message))
.map(this::onMessageReceived)
.doOnNext(event -> {
// Keep track of the last position so if the link goes down, we don't start from the original location.
final Long offset = event.getData().getOffset();
Expand All @@ -92,7 +92,11 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable {
public void close() {
if (!isDisposed.getAndSet(true)) {
emitterProcessor.onComplete();
amqpReceiveLinkProcessor.cancel();
if (!amqpReceiveLinkProcessor.isTerminated()) {
// cancel only if the processor is not already terminated.
amqpReceiveLinkProcessor.cancel();
}
logger.info("Closed consumer for partition {}", this.partitionId);
}
}

Expand Down