Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fixes the ServiceBusProcessorClient to signal intermediate errors to the processor handler. ([#39669](https://github.com/Azure/azure-sdk-for-java/issues/39669))

### Other Changes

## 7.16.0-beta.1 (2024-03-14)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@ static MessagePumpTerminatedException forCompletion(long pumpId, String fullyQua
return new MessagePumpTerminatedException(pumpId, fullyQualifiedNamespace, entityPath, "pumping#reached-completion");
}

/**
* Gets the error context describing the reason for the termination of the pump.
*
* @return the error context.
*/
ServiceBusErrorContext getErrorContext() {
final Throwable cause = super.getCause();
if (cause == null) {
return null;
} else if (cause instanceof ServiceBusException) {
return new ServiceBusErrorContext(cause, fullyQualifiedNamespace, entityPath);
} else {
return new ServiceBusErrorContext(new ServiceBusException(cause, ServiceBusErrorSource.RECEIVE),
fullyQualifiedNamespace, entityPath);
}
}

/**
* Logs the given {@code message} along with pump identifier, fully qualified Service Bus namespace and entity path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ Mono<Void> beginIntern() {
},
true);
}
final Mono<Void> rollingPump = pumping.retryWhen(retrySpecForNextPump());
final Mono<Void> rollingPump = pumping
.onErrorResume(MessagePumpTerminatedException.class, t -> notifyError(t).then(Mono.error(t)))
.retryWhen(retrySpecForNextPump());
return rollingPump;
}

Expand All @@ -266,6 +268,29 @@ void dispose() {
disposable.dispose();
}

/**
* Notify the current pump termination cause to the processor handler.
* <p>
* The processor handler will be called on a worker thread so that application may do blocking calls in
* the handler.
* </p>
* @param t the input error with cause as the reason for pump termination.
* @return a Mono that when subscribed invokes the processor handler.
*/
private Mono<Void> notifyError(MessagePumpTerminatedException t) {
final ServiceBusErrorContext errorContext = t.getErrorContext();
if (errorContext == null) {
return Mono.empty();
}
return Mono.<Void>fromRunnable(() -> {
try {
processError.accept(errorContext);
} catch (Exception e) {
logger.atVerbose().log("Ignoring error from user processError handler.", e);
}
}).subscribeOn(Schedulers.boundedElastic());
}

/**
* Retry spec to roll to the next {@link SessionsMessagePump} with a back-off. If the spec is asked for retry after
* the {@link RollingMessagePump} is disposed of (due to {@link ServiceBusProcessor} closure),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,90 @@ public ServiceBusReceiverAsyncClient answer(InvocationOnMock invocation) {
verify(firstClient).close();
}

@Test
@Execution(ExecutionMode.SAME_THREAD)
public void shouldReportReceiverTerminalError() {
final ServiceBusReceivedMessage message0 = mock(ServiceBusReceivedMessage.class);
final ServiceBusReceivedMessage message1 = mock(ServiceBusReceivedMessage.class);
final ServiceBusReceiverClientBuilder builder = mock(ServiceBusReceiverClientBuilder.class);
final ServiceBusReceiverAsyncClient client0 = mock(ServiceBusReceiverAsyncClient.class);
final ServiceBusReceiverAsyncClient client1 = mock(ServiceBusReceiverAsyncClient.class);

final AtomicInteger buildClientCalls = new AtomicInteger(0);
when(builder.buildAsyncClientForProcessor()).thenAnswer(new Answer<ServiceBusReceiverAsyncClient>() {
@Override
public ServiceBusReceiverAsyncClient answer(InvocationOnMock invocation) {
final int callCount = buildClientCalls.incrementAndGet();
if (callCount == 1) {
return client0;
} else if (callCount == 2) {
return client1;
} else {
throw new UnsupportedOperationException("NoThirdClient");
}
}
});

when(client0.getInstrumentation()).thenReturn(INSTRUMENTATION);
final RuntimeException client0TerminalError = new RuntimeException("receiver-0-error");
final Flux<ServiceBusReceivedMessage> client0Messages = Flux.concat(Flux.just(message0), Flux.error(client0TerminalError));
when(client0.nonSessionProcessorReceiveV2()).thenReturn(client0Messages);
when(client0.isConnectionClosed()).thenReturn(false);
doNothing().when(client0).close();

when(client1.getInstrumentation()).thenReturn(INSTRUMENTATION);
final ServiceBusException client1TerminalError = new ServiceBusException(new RuntimeException("inner-error-1"), ServiceBusErrorSource.RECEIVE);
final Flux<ServiceBusReceivedMessage> client1Messages = Flux.concat(Flux.just(message1), Flux.error(client1TerminalError));
when(client1.nonSessionProcessorReceiveV2()).thenReturn(client1Messages);
when(client1.isConnectionClosed()).thenReturn(false);
doNothing().when(client1).close();

final Deque<ServiceBusReceivedMessage> consumedMessages = new ConcurrentLinkedDeque<>();
final Consumer<ServiceBusReceivedMessageContext> messageConsumer = (messageContext) -> {
consumedMessages.add(messageContext.getMessage());
};
final Deque<Throwable> consumedErrors = new ConcurrentLinkedDeque<>();
final Consumer<ServiceBusErrorContext> errorConsumer = (errorContext) -> {
consumedErrors.add(errorContext.getException());
};

final RollingMessagePump pump = new RollingMessagePump(builder, messageConsumer, errorConsumer, 1, false);

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
verifier.create(() -> pump.beginIntern())
.thenAwait(Duration.ofSeconds(30))
.verifyErrorSatisfies(e -> {
// The assertions confirm that the RollingMessagePump attempted to create the third pump
// by requesting a third client, that proves the retry happened upon receiving
// MessagePumpTerminatedException indicating the receiver terminal error.
// ISE will be thrown by ServiceBusProcessor.RollingMessagePump.retrySpecForNextPump() when retry
// gets UnsupportedOperationException.
Assertions.assertTrue(e instanceof IllegalStateException);
Assertions.assertNotNull(e.getCause());
Assertions.assertTrue(e.getCause() instanceof UnsupportedOperationException);
Assertions.assertEquals("NoThirdClient", e.getCause().getMessage());
});
}

Assertions.assertEquals(2, consumedMessages.size());
Assertions.assertEquals(message0, consumedMessages.pop());
Assertions.assertEquals(message1, consumedMessages.pop());
Assertions.assertEquals(2, consumedErrors.size());
// client0 throws a RuntimeException, Processor must wrap it in ServiceBusException.
final Throwable e0 = consumedErrors.pop();
Assertions.assertInstanceOf(ServiceBusException.class, e0);
Assertions.assertNotNull(e0.getCause());
Assertions.assertEquals(client0TerminalError, e0.getCause());
// client1 throws a ServiceBusException, Processor should just forward it without wrapping.
final Throwable e1 = consumedErrors.pop();
Assertions.assertEquals(client1TerminalError, e1);
Assertions.assertNotNull(e0.getCause());
Assertions.assertEquals("inner-error-1", e1.getCause().getMessage());
Assertions.assertEquals(3, buildClientCalls.get());
verify(client0).close();
verify(client1).close();
}

@Test
@Execution(ExecutionMode.SAME_THREAD)
public void shouldCompleteMessageOnSuccessfulProcessing() {
Expand Down