Skip to content

Commit 9ce2b2a

Browse files
authored
Signal current pump's terminal error to Processor error handler (#39723)
1 parent cd5d96d commit 9ce2b2a

File tree

4 files changed

+129
-1
lines changed

4 files changed

+129
-1
lines changed

sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- Fixes the ServiceBusProcessorClient to signal intermediate errors to the processor handler. ([#39669](https://github.com/Azure/azure-sdk-for-java/issues/39669))
12+
1113
### Other Changes
1214

1315
## 7.16.0-beta.1 (2024-03-14)

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePumpTerminatedException.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,23 @@ static MessagePumpTerminatedException forCompletion(long pumpId, String fullyQua
6464
return new MessagePumpTerminatedException(pumpId, fullyQualifiedNamespace, entityPath, "pumping#reached-completion");
6565
}
6666

67+
/**
68+
* Gets the error context describing the reason for the termination of the pump.
69+
*
70+
* @return the error context.
71+
*/
72+
ServiceBusErrorContext getErrorContext() {
73+
final Throwable cause = super.getCause();
74+
if (cause == null) {
75+
return null;
76+
} else if (cause instanceof ServiceBusException) {
77+
return new ServiceBusErrorContext(cause, fullyQualifiedNamespace, entityPath);
78+
} else {
79+
return new ServiceBusErrorContext(new ServiceBusException(cause, ServiceBusErrorSource.RECEIVE),
80+
fullyQualifiedNamespace, entityPath);
81+
}
82+
}
83+
6784
/**
6885
* Logs the given {@code message} along with pump identifier, fully qualified Service Bus namespace and entity path.
6986
*

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,9 @@ Mono<Void> beginIntern() {
254254
},
255255
true);
256256
}
257-
final Mono<Void> rollingPump = pumping.retryWhen(retrySpecForNextPump());
257+
final Mono<Void> rollingPump = pumping
258+
.onErrorResume(MessagePumpTerminatedException.class, t -> notifyError(t).then(Mono.error(t)))
259+
.retryWhen(retrySpecForNextPump());
258260
return rollingPump;
259261
}
260262

@@ -266,6 +268,29 @@ void dispose() {
266268
disposable.dispose();
267269
}
268270

271+
/**
272+
* Notify the current pump termination cause to the processor handler.
273+
* <p>
274+
* The processor handler will be called on a worker thread so that application may do blocking calls in
275+
* the handler.
276+
* </p>
277+
* @param t the input error with cause as the reason for pump termination.
278+
* @return a Mono that when subscribed invokes the processor handler.
279+
*/
280+
private Mono<Void> notifyError(MessagePumpTerminatedException t) {
281+
final ServiceBusErrorContext errorContext = t.getErrorContext();
282+
if (errorContext == null) {
283+
return Mono.empty();
284+
}
285+
return Mono.<Void>fromRunnable(() -> {
286+
try {
287+
processError.accept(errorContext);
288+
} catch (Exception e) {
289+
logger.atVerbose().log("Ignoring error from user processError handler.", e);
290+
}
291+
}).subscribeOn(Schedulers.boundedElastic());
292+
}
293+
269294
/**
270295
* Retry spec to roll to the next {@link SessionsMessagePump} with a back-off. If the spec is asked for retry after
271296
* the {@link RollingMessagePump} is disposed of (due to {@link ServiceBusProcessor} closure),

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorRollingMessagePumpIsolatedTest.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,90 @@ public ServiceBusReceiverAsyncClient answer(InvocationOnMock invocation) {
213213
verify(firstClient).close();
214214
}
215215

216+
@Test
217+
@Execution(ExecutionMode.SAME_THREAD)
218+
public void shouldReportReceiverTerminalError() {
219+
final ServiceBusReceivedMessage message0 = mock(ServiceBusReceivedMessage.class);
220+
final ServiceBusReceivedMessage message1 = mock(ServiceBusReceivedMessage.class);
221+
final ServiceBusReceiverClientBuilder builder = mock(ServiceBusReceiverClientBuilder.class);
222+
final ServiceBusReceiverAsyncClient client0 = mock(ServiceBusReceiverAsyncClient.class);
223+
final ServiceBusReceiverAsyncClient client1 = mock(ServiceBusReceiverAsyncClient.class);
224+
225+
final AtomicInteger buildClientCalls = new AtomicInteger(0);
226+
when(builder.buildAsyncClientForProcessor()).thenAnswer(new Answer<ServiceBusReceiverAsyncClient>() {
227+
@Override
228+
public ServiceBusReceiverAsyncClient answer(InvocationOnMock invocation) {
229+
final int callCount = buildClientCalls.incrementAndGet();
230+
if (callCount == 1) {
231+
return client0;
232+
} else if (callCount == 2) {
233+
return client1;
234+
} else {
235+
throw new UnsupportedOperationException("NoThirdClient");
236+
}
237+
}
238+
});
239+
240+
when(client0.getInstrumentation()).thenReturn(INSTRUMENTATION);
241+
final RuntimeException client0TerminalError = new RuntimeException("receiver-0-error");
242+
final Flux<ServiceBusReceivedMessage> client0Messages = Flux.concat(Flux.just(message0), Flux.error(client0TerminalError));
243+
when(client0.nonSessionProcessorReceiveV2()).thenReturn(client0Messages);
244+
when(client0.isConnectionClosed()).thenReturn(false);
245+
doNothing().when(client0).close();
246+
247+
when(client1.getInstrumentation()).thenReturn(INSTRUMENTATION);
248+
final ServiceBusException client1TerminalError = new ServiceBusException(new RuntimeException("inner-error-1"), ServiceBusErrorSource.RECEIVE);
249+
final Flux<ServiceBusReceivedMessage> client1Messages = Flux.concat(Flux.just(message1), Flux.error(client1TerminalError));
250+
when(client1.nonSessionProcessorReceiveV2()).thenReturn(client1Messages);
251+
when(client1.isConnectionClosed()).thenReturn(false);
252+
doNothing().when(client1).close();
253+
254+
final Deque<ServiceBusReceivedMessage> consumedMessages = new ConcurrentLinkedDeque<>();
255+
final Consumer<ServiceBusReceivedMessageContext> messageConsumer = (messageContext) -> {
256+
consumedMessages.add(messageContext.getMessage());
257+
};
258+
final Deque<Throwable> consumedErrors = new ConcurrentLinkedDeque<>();
259+
final Consumer<ServiceBusErrorContext> errorConsumer = (errorContext) -> {
260+
consumedErrors.add(errorContext.getException());
261+
};
262+
263+
final RollingMessagePump pump = new RollingMessagePump(builder, messageConsumer, errorConsumer, 1, false);
264+
265+
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
266+
verifier.create(() -> pump.beginIntern())
267+
.thenAwait(Duration.ofSeconds(30))
268+
.verifyErrorSatisfies(e -> {
269+
// The assertions confirm that the RollingMessagePump attempted to create the third pump
270+
// by requesting a third client, that proves the retry happened upon receiving
271+
// MessagePumpTerminatedException indicating the receiver terminal error.
272+
// ISE will be thrown by ServiceBusProcessor.RollingMessagePump.retrySpecForNextPump() when retry
273+
// gets UnsupportedOperationException.
274+
Assertions.assertTrue(e instanceof IllegalStateException);
275+
Assertions.assertNotNull(e.getCause());
276+
Assertions.assertTrue(e.getCause() instanceof UnsupportedOperationException);
277+
Assertions.assertEquals("NoThirdClient", e.getCause().getMessage());
278+
});
279+
}
280+
281+
Assertions.assertEquals(2, consumedMessages.size());
282+
Assertions.assertEquals(message0, consumedMessages.pop());
283+
Assertions.assertEquals(message1, consumedMessages.pop());
284+
Assertions.assertEquals(2, consumedErrors.size());
285+
// client0 throws a RuntimeException, Processor must wrap it in ServiceBusException.
286+
final Throwable e0 = consumedErrors.pop();
287+
Assertions.assertInstanceOf(ServiceBusException.class, e0);
288+
Assertions.assertNotNull(e0.getCause());
289+
Assertions.assertEquals(client0TerminalError, e0.getCause());
290+
// client1 throws a ServiceBusException, Processor should just forward it without wrapping.
291+
final Throwable e1 = consumedErrors.pop();
292+
Assertions.assertEquals(client1TerminalError, e1);
293+
Assertions.assertNotNull(e0.getCause());
294+
Assertions.assertEquals("inner-error-1", e1.getCause().getMessage());
295+
Assertions.assertEquals(3, buildClientCalls.get());
296+
verify(client0).close();
297+
verify(client1).close();
298+
}
299+
216300
@Test
217301
@Execution(ExecutionMode.SAME_THREAD)
218302
public void shouldCompleteMessageOnSuccessfulProcessing() {

0 commit comments

Comments
 (0)