Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -371,26 +371,25 @@ public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetD
return fluxError(logger, new NullPointerException("'scheduledEnqueueTime' cannot be null."));
}

return getSendLink().flatMapMany(link -> createMessageBatch()
.flatMapMany(messageBatch -> {
return createMessageBatch()
.map(messageBatch -> {
int index = 0;
for (ServiceBusMessage message : messages) {
if (!messageBatch.tryAddMessage(message)) {
final String error = String.format(Locale.US,
"Messages exceed max allowed size for all the messages together. "
+ "Failed to add message at index '%s'.", index);
throw logger.logExceptionAsError(new AmqpException(false,
AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, error, link.getErrorContext()));
throw logger.logExceptionAsError(new IllegalArgumentException(error));
}
++index;
}

return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(),
scheduledEnqueueTime, messageBatch.getMaxSizeInBytes(), link.getLinkName(),
transactionContext));
}));
return messageBatch;
})
.flatMapMany(messageBatch -> connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), scheduledEnqueueTime,
messageBatch.getMaxSizeInBytes(), linkName.get(), transactionContext))
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
Expand Down Expand Up @@ -308,13 +307,12 @@ void scheduleMessageSizeTooBig() {

// Act & Assert
StepVerifier.create(sender.scheduleMessages(messages, instant))
.verifyErrorMatches(throwable -> {
assertTrue(throwable instanceof AmqpException);
assertSame(((AmqpException) throwable).getErrorCondition(), AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED);
return true;
});
.verifyError(IllegalArgumentException.class);

verify(managementNode, never()).schedule(any(), eq(instant), anyInt(), eq(LINK_NAME), isNull());
}



/**
* Verifies that sending multiple message will result in calling sender.send(MessageBatch, transaction).
*/
Expand Down