Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,6 @@ public BinaryData getBody() {
}
}

/**
* Gets the payload wrapped by the {@link ServiceBusReceivedMessage} as a byte array.
*
* @return A byte array representing the data.
* @see ServiceBusMessage#getBody()
*/
public byte[] getBodyAsBytes() {
return getBody().toBytes();
}

/**
* Gets the content type of the message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ Mono<ServiceBusReceivedMessage> peekMessage(String sessionId) {
* @throws ServiceBusException if an error occurs while peeking at the message.
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-browsing">Message browsing</a>
*/
public Mono<ServiceBusReceivedMessage> peekMessageAt(long sequenceNumber) {
return peekMessageAt(sequenceNumber, receiverOptions.getSessionId());
public Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber) {
return peekMessage(sequenceNumber, receiverOptions.getSessionId());
}

/**
Expand All @@ -472,7 +472,7 @@ public Mono<ServiceBusReceivedMessage> peekMessageAt(long sequenceNumber) {
* @throws IllegalStateException if receiver is already disposed.
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-browsing">Message browsing</a>
*/
Mono<ServiceBusReceivedMessage> peekMessageAt(long sequenceNumber, String sessionId) {
Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber, String sessionId) {
if (isDisposed.get()) {
return monoError(logger, new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peekAt")));
Expand Down Expand Up @@ -560,8 +560,8 @@ Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, String sessionId)
* @throws ServiceBusException if an error occurs while peeking at messages.
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-browsing">Message browsing</a>
*/
public Flux<ServiceBusReceivedMessage> peekMessagesAt(int maxMessages, long sequenceNumber) {
return peekMessagesAt(maxMessages, sequenceNumber, receiverOptions.getSessionId());
public Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber) {
return peekMessages(maxMessages, sequenceNumber, receiverOptions.getSessionId());
}

/**
Expand All @@ -577,7 +577,7 @@ public Flux<ServiceBusReceivedMessage> peekMessagesAt(int maxMessages, long sequ
* @throws IllegalStateException if receiver is already disposed.
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-browsing">Message browsing</a>
*/
Flux<ServiceBusReceivedMessage> peekMessagesAt(int maxMessages, long sequenceNumber, String sessionId) {
Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber, String sessionId) {
if (isDisposed.get()) {
return fluxError(logger, new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatchAt")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ ServiceBusReceivedMessage peekMessage(String sessionId) {
*
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-browsing">Message browsing</a>
*/
public ServiceBusReceivedMessage peekMessageAt(long sequenceNumber) {
return this.peekMessageAt(sequenceNumber, asyncClient.getReceiverOptions().getSessionId());
public ServiceBusReceivedMessage peekMessage(long sequenceNumber) {
return this.peekMessage(sequenceNumber, asyncClient.getReceiverOptions().getSessionId());
}

/**
Expand All @@ -296,8 +296,8 @@ public ServiceBusReceivedMessage peekMessageAt(long sequenceNumber) {
* @throws IllegalStateException if receiver is already disposed.
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-browsing">Message browsing</a>
*/
ServiceBusReceivedMessage peekMessageAt(long sequenceNumber, String sessionId) {
return asyncClient.peekMessageAt(sequenceNumber, sessionId).block(operationTimeout);
ServiceBusReceivedMessage peekMessage(long sequenceNumber, String sessionId) {
return asyncClient.peekMessage(sequenceNumber, sessionId).block(operationTimeout);
}

/**
Expand Down Expand Up @@ -358,8 +358,8 @@ IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, String s
*
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-browsing">Message browsing</a>
*/
public IterableStream<ServiceBusReceivedMessage> peekMessagesAt(int maxMessages, long sequenceNumber) {
return this.peekMessagesAt(maxMessages, sequenceNumber, asyncClient.getReceiverOptions().getSessionId());
public IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber) {
return this.peekMessages(maxMessages, sequenceNumber, asyncClient.getReceiverOptions().getSessionId());
}

/**
Expand All @@ -375,13 +375,13 @@ public IterableStream<ServiceBusReceivedMessage> peekMessagesAt(int maxMessages,
* @throws IllegalStateException if receiver is already disposed.
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-browsing">Message browsing</a>
*/
IterableStream<ServiceBusReceivedMessage> peekMessagesAt(int maxMessages, long sequenceNumber, String sessionId) {
IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber, String sessionId) {
if (maxMessages <= 0) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
}

final Flux<ServiceBusReceivedMessage> messages = asyncClient.peekMessagesAt(maxMessages, sequenceNumber,
final Flux<ServiceBusReceivedMessage> messages = asyncClient.peekMessages(maxMessages, sequenceNumber,
sessionId).timeout(operationTimeout);

// Subscribe so we can kick off this operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* @see ServiceBusSenderAsyncClient To communicate with a Service Bus resource using an asynchronous client.
*/
@ServiceClient(builder = ServiceBusClientBuilder.class)
public class ServiceBusSenderClient implements AutoCloseable {
public final class ServiceBusSenderClient implements AutoCloseable {
private final ServiceBusSenderAsyncClient asyncClient;
private final Duration tryTimeout;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* Overview</a>
*/
@Immutable
public class ServiceBusTransactionContext {
public final class ServiceBusTransactionContext {
private final ByteBuffer transactionId;

ServiceBusTransactionContext(ByteBuffer transactionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
*/
@ServiceClientBuilder(serviceClients = {ServiceBusAdministrationClient.class,
ServiceBusAdministrationAsyncClient.class})
public class ServiceBusAdministrationClientBuilder {
public final class ServiceBusAdministrationClientBuilder {
private final ClientLogger logger = new ClientLogger(ServiceBusAdministrationClientBuilder.class);
private final ServiceBusManagementSerializer serializer = new ServiceBusManagementSerializer();
private final List<HttpPipelinePolicy> userPolicies = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* @see ServiceBusAdministrationClient#createQueue(String, CreateQueueOptions)
*/
@Fluent
public class CreateQueueOptions {
public final class CreateQueueOptions {
private final List<AuthorizationRule> authorizationRules;

private Duration autoDeleteOnIdle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* A set of options for creating a rule.
*/
@Fluent
public class CreateRuleOptions {
public final class CreateRuleOptions {
private RuleFilter filter;
private RuleAction action;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* @see ServiceBusAdministrationClient#createTopic(String, CreateTopicOptions)
*/
@Fluent
public class CreateSubscriptionOptions {
public final class CreateSubscriptionOptions {
private Duration autoDeleteOnIdle;
private Duration defaultMessageTimeToLive;
private boolean deadLetteringOnMessageExpiration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* @see ServiceBusAdministrationAsyncClient#createTopic(String, CreateTopicOptions)
* @see ServiceBusAdministrationClient#createTopic(String, CreateTopicOptions)
*/
public class CreateTopicOptions {
public final class CreateTopicOptions {
private final List<AuthorizationRule> authorizationRules;

private Duration autoDeleteOnIdle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* Matches none the messages arriving to be selected for the subscription.
*/
public class FalseRuleFilter extends SqlRuleFilter {
public final class FalseRuleFilter extends SqlRuleFilter {
private static final FalseRuleFilter INSTANCE = new FalseRuleFilter();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* A shared access key for accessing Service Bus entities.
*/
@Fluent
public class SharedAccessAuthorizationRule implements AuthorizationRule {
public final class SharedAccessAuthorizationRule implements AuthorizationRule {
/**
* There one type of authorization rule.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* Runtime properties about the topic.
*/
@Immutable
public class TopicRuntimeProperties {
public final class TopicRuntimeProperties {
private final String name;
private final int subscriptionCount;
private final long sizeInBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void canCreateWithByteArrayPayload() {

// Assert
assertNotNull(serviceBusMessageData.getBody());
assertArrayEquals(PAYLOAD_BYTES, serviceBusMessageData.getBodyAsBytes());
assertArrayEquals(PAYLOAD_BYTES, serviceBusMessageData.getBody().toBytes());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ void peekMessageEmptyEntity(MessagingEntityType entityType, boolean isSessionEna
final int fromSequenceNumber = 1;

// Assert & Act
StepVerifier.create(receiver.peekMessageAt(fromSequenceNumber))
StepVerifier.create(receiver.peekMessage(fromSequenceNumber))
.verifyComplete();
}

Expand Down Expand Up @@ -459,7 +459,7 @@ void peekFromSequenceNumberMessage(MessagingEntityType entityType, boolean isSes

// Assert & Act
try {
StepVerifier.create(receiver.peekMessageAt(sequenceNumber))
StepVerifier.create(receiver.peekMessage(sequenceNumber))
.assertNext(m -> {
assertEquals(sequenceNumber, m.getSequenceNumber());
assertMessageEquals(m, messageId, isSessionEnabled);
Expand Down Expand Up @@ -558,7 +558,7 @@ void peekMessagesFromSequence(MessagingEntityType entityType) {
Mono.when(sendMessage(message), sendMessage(message)).block(TIMEOUT);

// Assert & Act
StepVerifier.create(receiver.peekMessagesAt(maxMessages, fromSequenceNumber))
StepVerifier.create(receiver.peekMessages(maxMessages, fromSequenceNumber))
.expectNextCount(maxMessages)
.verifyComplete();

Expand Down Expand Up @@ -587,7 +587,7 @@ void peekMessagesFromSequenceEmptyEntity(MessagingEntityType entityType, boolean
final int fromSequenceNumber = 1;

// Assert & Act
StepVerifier.create(receiver.peekMessagesAt(maxMessages, fromSequenceNumber))
StepVerifier.create(receiver.peekMessages(maxMessages, fromSequenceNumber))
.verifyComplete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ void peekWithSequenceOneMessage() {
when(managementNode.peek(fromSequenceNumber, null, null)).thenReturn(Mono.just(receivedMessage));

// Act & Assert
StepVerifier.create(receiver.peekMessageAt(fromSequenceNumber))
StepVerifier.create(receiver.peekMessage(fromSequenceNumber))
.expectNext(receivedMessage)
.verifyComplete();
}
Expand Down Expand Up @@ -428,7 +428,7 @@ void peekBatchWithSequenceNumberMessages() {
.thenReturn(Flux.fromArray(new ServiceBusReceivedMessage[]{receivedMessage, receivedMessage2}));

// Act & Assert
StepVerifier.create(receiver.peekMessagesAt(numberOfEvents, fromSequenceNumber))
StepVerifier.create(receiver.peekMessages(numberOfEvents, fromSequenceNumber))
.expectNext(receivedMessage, receivedMessage2)
.verifyComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ void peekFromSequenceNumberMessage(MessagingEntityType entityType, boolean isSes
assertNotNull(receivedMessage);

// Act
final ServiceBusReceivedMessage receivedPeekMessage = receiver.peekMessageAt(receivedMessage.getSequenceNumber());
final ServiceBusReceivedMessage receivedPeekMessage = receiver.peekMessage(receivedMessage.getSequenceNumber());

// Assert
assertEquals(receivedMessage.getSequenceNumber(), receivedPeekMessage.getSequenceNumber());
Expand Down Expand Up @@ -491,7 +491,7 @@ void peekMessagesFromSequence(MessagingEntityType entityType) {
setReceiver(entityType, 0, false);

// Act
IterableStream<ServiceBusReceivedMessage> iterableMessages = receiver.peekMessagesAt(maxMessages, fromSequenceNumber);
IterableStream<ServiceBusReceivedMessage> iterableMessages = receiver.peekMessages(maxMessages, fromSequenceNumber);

// Assert
final List<ServiceBusReceivedMessage> asList = iterableMessages.stream().collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,10 @@ void peekMessageFromSequence() {
final long sequenceNumber = 154;
final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class);
when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions);
when(asyncClient.peekMessageAt(sequenceNumber, SESSION_ID)).thenReturn(Mono.just(message));
when(asyncClient.peekMessage(sequenceNumber, SESSION_ID)).thenReturn(Mono.just(message));

// Act
final ServiceBusReceivedMessage actual = client.peekMessageAt(sequenceNumber);
final ServiceBusReceivedMessage actual = client.peekMessage(sequenceNumber);

// Assert
assertEquals(message, actual);
Expand Down Expand Up @@ -586,10 +586,10 @@ void peekMessagesMaxSequenceNumber() {
sink.complete();
}));
when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions);
when(asyncClient.peekMessagesAt(maxMessages, sequenceNumber, SESSION_ID)).thenReturn(messages);
when(asyncClient.peekMessages(maxMessages, sequenceNumber, SESSION_ID)).thenReturn(messages);

// Act
final IterableStream<ServiceBusReceivedMessage> actual = client.peekMessagesAt(maxMessages, sequenceNumber);
final IterableStream<ServiceBusReceivedMessage> actual = client.peekMessages(maxMessages, sequenceNumber);

// Assert
assertNotNull(actual);
Expand Down