Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,20 @@ export interface MessageHandlers {
processMessage(message: ServiceBusReceivedMessage): Promise<void>;
}

// @public (undocumented)
export interface MessageSettlementMethods {
abandonMessage(message: ServiceBusReceivedMessage, propertiesToModify?: {
[key: string]: any;
}): Promise<void>;
completeMessage(message: ServiceBusReceivedMessage): Promise<void>;
deadLetterMessage(message: ServiceBusReceivedMessage, options?: DeadLetterOptions & {
[key: string]: any;
}): Promise<void>;
deferMessage(message: ServiceBusReceivedMessage, propertiesToModify?: {
[key: string]: any;
}): Promise<void>;
}

export { MessagingError }

// @public
Expand Down Expand Up @@ -349,16 +363,16 @@ export class ServiceBusClient {
constructor(connectionString: string, options?: ServiceBusClientOptions);
constructor(fullyQualifiedNamespace: string, credential: TokenCredential, options?: ServiceBusClientOptions);
acceptNextSession(queueName: string, options?: AcceptSessionOptions<"peekLock">): Promise<ServiceBusSessionReceiver>;
acceptNextSession(queueName: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiver>;
acceptNextSession(queueName: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiverWithNoSettlementMethods>;
acceptNextSession(topicName: string, subscriptionName: string, options?: AcceptSessionOptions<"peekLock">): Promise<ServiceBusSessionReceiver>;
acceptNextSession(topicName: string, subscriptionName: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiver>;
acceptNextSession(topicName: string, subscriptionName: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiverWithNoSettlementMethods>;
acceptSession(queueName: string, sessionId: string, options?: AcceptSessionOptions<"peekLock">): Promise<ServiceBusSessionReceiver>;
acceptSession(queueName: string, sessionId: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiver>;
acceptSession(queueName: string, sessionId: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiverWithNoSettlementMethods>;
acceptSession(topicName: string, subscriptionName: string, sessionId: string, options?: AcceptSessionOptions<"peekLock">): Promise<ServiceBusSessionReceiver>;
acceptSession(topicName: string, subscriptionName: string, sessionId: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiver>;
acceptSession(topicName: string, subscriptionName: string, sessionId: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiverWithNoSettlementMethods>;
close(): Promise<void>;
createReceiver(queueName: string, options?: CreateReceiverOptions<"peekLock">): ServiceBusReceiver;
createReceiver(queueName: string, options: CreateReceiverOptions<"receiveAndDelete">): ServiceBusReceiver;
createReceiver(queueName: string, options: CreateReceiverOptions<"receiveAndDelete">): ServiceBusReceiverWithNoSettlementMethods;
createReceiver(topicName: string, subscriptionName: string, options?: CreateReceiverOptions<"peekLock">): ServiceBusReceiver;
createReceiver(topicName: string, subscriptionName: string, options: CreateReceiverOptions<"receiveAndDelete">): ServiceBusReceiver;
createSender(queueOrTopicName: string): ServiceBusSender;
Expand Down Expand Up @@ -430,27 +444,21 @@ export interface ServiceBusReceivedMessage extends ServiceBusMessage {
readonly sequenceNumber?: Long;
}

// @public (undocumented)
export interface ServiceBusReceiver extends ServiceBusReceiverWithNoSettlementMethods, MessageSettlementMethods {
renewMessageLock(message: ServiceBusReceivedMessage): Promise<Date>;
}

// @public
export interface ServiceBusReceiver {
abandonMessage(message: ServiceBusReceivedMessage, propertiesToModify?: {
[key: string]: any;
}): Promise<void>;
export interface ServiceBusReceiverWithNoSettlementMethods {
close(): Promise<void>;
completeMessage(message: ServiceBusReceivedMessage): Promise<void>;
deadLetterMessage(message: ServiceBusReceivedMessage, options?: DeadLetterOptions & {
[key: string]: any;
}): Promise<void>;
deferMessage(message: ServiceBusReceivedMessage, propertiesToModify?: {
[key: string]: any;
}): Promise<void>;
entityPath: string;
getMessageIterator(options?: GetMessageIteratorOptions): AsyncIterableIterator<ServiceBusReceivedMessage>;
isClosed: boolean;
peekMessages(maxMessageCount: number, options?: PeekMessagesOptions): Promise<ServiceBusReceivedMessage[]>;
receiveDeferredMessages(sequenceNumbers: Long | Long[], options?: OperationOptionsBase): Promise<ServiceBusReceivedMessage[]>;
receiveMessages(maxMessageCount: number, options?: ReceiveMessagesOptions): Promise<ServiceBusReceivedMessage[]>;
receiveMode: "peekLock" | "receiveAndDelete";
renewMessageLock(message: ServiceBusReceivedMessage): Promise<Date>;
subscribe(handlers: MessageHandlers, options?: SubscribeOptions): {
close(): Promise<void>;
};
Expand All @@ -468,8 +476,12 @@ export interface ServiceBusSender {
sendMessages(messages: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch, options?: OperationOptionsBase): Promise<void>;
}

// @public (undocumented)
export interface ServiceBusSessionReceiver extends ServiceBusSessionReceiverWithNoSettlementMethods, MessageSettlementMethods {
}

// @public
export interface ServiceBusSessionReceiver extends ServiceBusReceiver {
export interface ServiceBusSessionReceiverWithNoSettlementMethods extends ServiceBusReceiverWithNoSettlementMethods {
getSessionState(options?: OperationOptionsBase): Promise<any>;
renewSessionLock(options?: OperationOptionsBase): Promise<Date>;
readonly sessionId: string;
Expand Down
11 changes: 9 additions & 2 deletions sdk/servicebus/service-bus/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,15 @@ export {
SubscribeOptions
} from "./models";
export { OperationOptionsBase, TryAddOptions } from "./modelsToBeSharedWithEventHubs";
export { ServiceBusReceiver } from "./receivers/receiver";
export { ServiceBusSessionReceiver } from "./receivers/sessionReceiver";
export {
ServiceBusReceiver,
ServiceBusReceiverWithNoSettlementMethods,
MessageSettlementMethods
} from "./receivers/receiver";
export {
ServiceBusSessionReceiver,
ServiceBusSessionReceiverWithNoSettlementMethods
} from "./receivers/sessionReceiver";
export { ServiceBusSender } from "./sender";
export { NamespaceProperties } from "./serializers/namespaceResourceSerializer";
export {
Expand Down
223 changes: 115 additions & 108 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,109 +31,24 @@ import { LockRenewer } from "../core/autoLockRenewer";
import { createProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { receiverLogger as logger } from "../log";

/**
* A receiver that does not handle sessions.
*/
export interface ServiceBusReceiver {
/**
* Streams messages to message handlers.
* @param handlers A handler that gets called for messages and errors.
* @param options Options for subscribe.
* @returns An object that can be closed, sending any remaining messages to `handlers` and
* stopping new messages from arriving.
*/
subscribe(
handlers: MessageHandlers,
options?: SubscribeOptions
): {
/**
* Causes the subscriber to stop receiving new messages.
*/
close(): Promise<void>;
};

export interface ServiceBusReceiver
extends ServiceBusReceiverWithNoSettlementMethods,
MessageSettlementMethods {
/**
* Returns an iterator that can be used to receive messages from Service Bus.
* If the iterator is not able to fetch a new message in over a minute, `undefined` will be returned.
*
* @param options A set of options to control the receive operation.
* - `maxWaitTimeInMs`: The time to wait to receive the message in each iteration.
* - `abortSignal`: The signal to use to abort the ongoing operation.
*
* @throws Error if the underlying connection, client or receiver is closed.
* @throws Error if current receiver is already in state of receiving messages.
* @throws MessagingError if the service returns an error while receiving messages.
*/
getMessageIterator(
options?: GetMessageIteratorOptions
): AsyncIterableIterator<ServiceBusReceivedMessage>;

/**
* Returns a promise that resolves to an array of messages received from Service Bus.
* Renews the lock on the message for the duration as specified during the Queue/Subscription
* creation.
* - Check the `lockedUntilUtc` property on the message for the time when the lock expires.
* - If a message is not settled (using either `complete()`, `defer()` or `deadletter()`,
* before its lock expires, then the message lands back in the Queue/Subscription for the next
* receive operation.
*
* @param maxMessageCount The maximum number of messages to receive.
* @param options A set of options to control the receive operation.
* - `maxWaitTimeInMs`: The maximum time to wait for the first message before returning an empty array if no messages are available.
* - `abortSignal`: The signal to use to abort the ongoing operation.
* @returns Promise<ReceivedMessageT[]> A promise that resolves with an array of messages.
* @returns Promise<Date> - New lock token expiry date and time in UTC format.
* @throws Error if the underlying connection, client or receiver is closed.
* @throws Error if current receiver is already in state of receiving messages.
* @throws MessagingError if the service returns an error while receiving messages.
*/
receiveMessages(
maxMessageCount: number,
options?: ReceiveMessagesOptions
): Promise<ServiceBusReceivedMessage[]>;

/**
* Returns a promise that resolves to an array of deferred messages identified by given `sequenceNumbers`.
* @param sequenceNumbers The sequence number or an array of sequence numbers for the messages that need to be received.
* @param options - Options bag to pass an abort signal or tracing options.
* @returns {Promise<ServiceBusMessage[]>}
* - Returns a list of messages identified by the given sequenceNumbers.
* - Returns an empty list if no messages are found.
* @throws Error if the underlying connection or receiver is closed.
* @throws MessagingError if the service returns an error while receiving deferred messages.
*/
receiveDeferredMessages(
sequenceNumbers: Long | Long[],
options?: OperationOptionsBase
): Promise<ServiceBusReceivedMessage[]>;

/**
* Peek the next batch of active messages (including deferred but not deadlettered messages) on the
* queue or subscription without modifying them.
* - The first call to `peekMessages()` fetches the first active message. Each subsequent call fetches the
* subsequent message.
* - Unlike a "received" message, "peeked" message is a read-only version of the message.
* It cannot be `Completed/Abandoned/Deferred/Deadlettered`.
* @param maxMessageCount The maximum number of messages to peek.
* @param options Options that allow to specify the maximum number of messages to peek,
* the sequenceNumber to start peeking from or an abortSignal to abort the operation.
*/
peekMessages(
maxMessageCount: number,
options?: PeekMessagesOptions
): Promise<ServiceBusReceivedMessage[]>;
/**
* Path of the entity for which the receiver has been created.
*/
entityPath: string;
/**
* ReceiveMode provided to the client.
*/
receiveMode: "peekLock" | "receiveAndDelete";
/**
* @property Returns `true` if either the receiver or the client that created it has been closed
* @readonly
*/
isClosed: boolean;
/**
* Closes the receiver.
* Once closed, the receiver cannot be used for any further operations.
* Use the `createReceiver()` method on the ServiceBusClient to create a new Receiver.
* @throws MessagingError if the service returns an error while renewing message lock.
*/
close(): Promise<void>;
renewMessageLock(message: ServiceBusReceivedMessage): Promise<Date>;
}
export interface MessageSettlementMethods {
/**
* Removes the message from Service Bus.
*
Expand Down Expand Up @@ -241,19 +156,111 @@ export interface ServiceBusReceiver {
message: ServiceBusReceivedMessage,
options?: DeadLetterOptions & { [key: string]: any }
): Promise<void>;
}

/**
* A receiver that does not handle sessions.
*/
export interface ServiceBusReceiverWithNoSettlementMethods {
/**
* Renews the lock on the message for the duration as specified during the Queue/Subscription
* creation.
* - Check the `lockedUntilUtc` property on the message for the time when the lock expires.
* - If a message is not settled (using either `complete()`, `defer()` or `deadletter()`,
* before its lock expires, then the message lands back in the Queue/Subscription for the next
* receive operation.
* Streams messages to message handlers.
* @param handlers A handler that gets called for messages and errors.
* @param options Options for subscribe.
* @returns An object that can be closed, sending any remaining messages to `handlers` and
* stopping new messages from arriving.
*/
subscribe(
handlers: MessageHandlers,
options?: SubscribeOptions
): {
/**
* Causes the subscriber to stop receiving new messages.
*/
close(): Promise<void>;
};

/**
* Returns an iterator that can be used to receive messages from Service Bus.
* If the iterator is not able to fetch a new message in over a minute, `undefined` will be returned.
*
* @param options A set of options to control the receive operation.
* - `maxWaitTimeInMs`: The time to wait to receive the message in each iteration.
* - `abortSignal`: The signal to use to abort the ongoing operation.
*
* @returns Promise<Date> - New lock token expiry date and time in UTC format.
* @throws Error if the underlying connection, client or receiver is closed.
* @throws MessagingError if the service returns an error while renewing message lock.
* @throws Error if current receiver is already in state of receiving messages.
* @throws MessagingError if the service returns an error while receiving messages.
*/
renewMessageLock(message: ServiceBusReceivedMessage): Promise<Date>;
getMessageIterator(
options?: GetMessageIteratorOptions
): AsyncIterableIterator<ServiceBusReceivedMessage>;

/**
* Returns a promise that resolves to an array of messages received from Service Bus.
*
* @param maxMessageCount The maximum number of messages to receive.
* @param options A set of options to control the receive operation.
* - `maxWaitTimeInMs`: The maximum time to wait for the first message before returning an empty array if no messages are available.
* - `abortSignal`: The signal to use to abort the ongoing operation.
* @returns Promise<ReceivedMessageT[]> A promise that resolves with an array of messages.
* @throws Error if the underlying connection, client or receiver is closed.
* @throws Error if current receiver is already in state of receiving messages.
* @throws MessagingError if the service returns an error while receiving messages.
*/
receiveMessages(
maxMessageCount: number,
options?: ReceiveMessagesOptions
): Promise<ServiceBusReceivedMessage[]>;

/**
* Returns a promise that resolves to an array of deferred messages identified by given `sequenceNumbers`.
* @param sequenceNumbers The sequence number or an array of sequence numbers for the messages that need to be received.
* @param options - Options bag to pass an abort signal or tracing options.
* @returns {Promise<ServiceBusMessage[]>}
* - Returns a list of messages identified by the given sequenceNumbers.
* - Returns an empty list if no messages are found.
* @throws Error if the underlying connection or receiver is closed.
* @throws MessagingError if the service returns an error while receiving deferred messages.
*/
receiveDeferredMessages(
sequenceNumbers: Long | Long[],
options?: OperationOptionsBase
): Promise<ServiceBusReceivedMessage[]>;

/**
* Peek the next batch of active messages (including deferred but not deadlettered messages) on the
* queue or subscription without modifying them.
* - The first call to `peekMessages()` fetches the first active message. Each subsequent call fetches the
* subsequent message.
* - Unlike a "received" message, "peeked" message is a read-only version of the message.
* It cannot be `Completed/Abandoned/Deferred/Deadlettered`.
* @param maxMessageCount The maximum number of messages to peek.
* @param options Options that allow to specify the maximum number of messages to peek,
* the sequenceNumber to start peeking from or an abortSignal to abort the operation.
*/
peekMessages(
maxMessageCount: number,
options?: PeekMessagesOptions
): Promise<ServiceBusReceivedMessage[]>;
/**
* Path of the entity for which the receiver has been created.
*/
entityPath: string;
/**
* ReceiveMode provided to the client.
*/
receiveMode: "peekLock" | "receiveAndDelete";
/**
* @property Returns `true` if either the receiver or the client that created it has been closed
* @readonly
*/
isClosed: boolean;
/**
* Closes the receiver.
* Once closed, the receiver cannot be used for any further operations.
* Use the `createReceiver()` method on the ServiceBusClient to create a new Receiver.
*/
close(): Promise<void>;
}

/**
Expand Down
13 changes: 11 additions & 2 deletions sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ import {
} from "../util/errors";
import { OnError, OnMessage } from "../core/messageReceiver";
import { assertValidMessageHandlers, getMessageIterator, wrapProcessErrorHandler } from "./shared";
import { defaultMaxTimeAfterFirstMessageForBatchingMs, ServiceBusReceiver } from "./receiver";
import {
defaultMaxTimeAfterFirstMessageForBatchingMs,
ServiceBusReceiverWithNoSettlementMethods,
MessageSettlementMethods
} from "./receiver";
import Long from "long";
import { ServiceBusMessageImpl, DeadLetterOptions } from "../serviceBusMessage";
import {
Expand All @@ -32,10 +36,15 @@ import { AmqpError } from "rhea-promise";
import { createProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { receiverLogger as logger } from "../log";

export interface ServiceBusSessionReceiver
extends ServiceBusSessionReceiverWithNoSettlementMethods,
MessageSettlementMethods {}

/**
*A receiver that handles sessions, including renewing the session lock.
*/
export interface ServiceBusSessionReceiver extends ServiceBusReceiver {
export interface ServiceBusSessionReceiverWithNoSettlementMethods
extends ServiceBusReceiverWithNoSettlementMethods {
/**
* The session ID.
*/
Expand Down
Loading