Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 111 additions & 14 deletions sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
MessageAnnotations,
DeliveryAnnotations
} from "rhea-promise";
import { Constants, AmqpMessage } from "@azure/amqp-common";
import { Constants, AmqpMessage, translate, ErrorNameConditionMapper } from "@azure/amqp-common";
import * as log from "./log";
import { ClientEntityContext } from "./clientEntityContext";
import { reorderLockToken } from "../src/util/utils";
Expand Down Expand Up @@ -784,6 +784,13 @@ export class ServiceBusMessage implements ReceivedMessage {
* @readonly
*/
readonly _amqpMessage: AmqpMessage;
/**
* @property Boolean denoting if the message has already been settled.
* @readonly
*/
public get isSettled(): boolean {
return this.delivery.remote_settled;
}
/**
* @property {ClientEntityContext} _context The client entity context.
* @readonly
Expand All @@ -810,6 +817,20 @@ export class ServiceBusMessage implements ReceivedMessage {

/**
* Removes the message from Service Bus.
*
* - Throws `SessionLockLostError` (for messages from a Queue/Subscription with sessions enabled)
* if the AMQP link with which the message was received is no longer alive. This can
* happen either because the lock on the session expired or the receiver was explicitly closed by
* the user or the AMQP link got closed by the library due to network loss or service error.
* - Throws `MessageLockLostError` (for messages from a Queue/Subscription with sessions not enabled)
* if the lock on the message has expired or the AMQP link with which the message was received is
* no longer alive. The latter can happen if the receiver was explicitly closed by the user or the
* AMQP link got closed by the library due to network loss or service error.
* - Throws an error if the message is already settled. To avoid this error check the `isSettled`
* property on the message if you are not sure whether the message is settled.
* - Throws an error if used in `ReceiveAndDelete` mode because all messages received in this mode
* are pre-settled.
*
* @returns Promise<void>.
*/
async complete(): Promise<void> {
Expand All @@ -832,13 +853,32 @@ export class ServiceBusMessage implements ReceivedMessage {
return;
}
const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId);
throwIfMessageCannotBeSettled(receiver, DispositionType.complete, this.delivery.remote_settled);
throwIfMessageCannotBeSettled(
receiver,
DispositionType.complete,
this.delivery.remote_settled,
this.sessionId
);

return receiver!.settleMessage(this, DispositionType.complete);
}
/**
* The lock held on the message by the receiver is let go, making the message available again in
* Service Bus for another receive operation.
*
* - Throws `SessionLockLostError` (for messages from a Queue/Subscription with sessions enabled)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@throws in documentation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this format for GA, as all other user facing APIs in the library are using this format. It renders well in out docs as a bulleted list.

Logged #2853 to track the discussion around moving to using @throws and the follow up work

* if the AMQP link with which the message was received is no longer alive. This can
* happen either because the lock on the session expired or the receiver was explicitly closed by
* the user or the AMQP link got closed by the library due to network loss or service error.
* - Throws `MessageLockLostError` (for messages from a Queue/Subscription with sessions not enabled)
* if the lock on the message has expired or the AMQP link with which the message was received is
* no longer alive. The latter can happen if the receiver was explicitly closed by the user or the
* AMQP link got closed by the library due to network loss or service error.
* - Throws an error if the message is already settled. To avoid this error check the `isSettled`
* property on the message if you are not sure whether the message is settled.
* - Throws an error if used in `ReceiveAndDelete` mode because all messages received in this mode
* are pre-settled.
*
* @param propertiesToModify The properties of the message to modify while abandoning the message.
*
* @return Promise<void>.
Expand All @@ -862,7 +902,12 @@ export class ServiceBusMessage implements ReceivedMessage {
return;
}
const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId);
throwIfMessageCannotBeSettled(receiver, DispositionType.abandon, this.delivery.remote_settled);
throwIfMessageCannotBeSettled(
receiver,
DispositionType.abandon,
this.delivery.remote_settled,
this.sessionId
);

return receiver!.settleMessage(this, DispositionType.abandon, {
propertiesToModify: propertiesToModify
Expand All @@ -872,6 +917,20 @@ export class ServiceBusMessage implements ReceivedMessage {
/**
* Defers the processing of the message. Save the `sequenceNumber` of the message, in order to
* receive it message again in the future using the `receiveDeferredMessage` method.
*
* - Throws `SessionLockLostError` (for messages from a Queue/Subscription with sessions enabled)
* if the AMQP link with which the message was received is no longer alive. This can
* happen either because the lock on the session expired or the receiver was explicitly closed by
* the user or the AMQP link got closed by the library due to network loss or service error.
* - Throws `MessageLockLostError` (for messages from a Queue/Subscription with sessions not enabled)
* if the lock on the message has expired or the AMQP link with which the message was received is
* no longer alive. The latter can happen if the receiver was explicitly closed by the user or the
* AMQP link got closed by the library due to network loss or service error.
* - Throws an error if the message is already settled. To avoid this error check the `isSettled`
* property on the message if you are not sure whether the message is settled.
* - Throws an error if used in `ReceiveAndDelete` mode because all messages received in this mode
* are pre-settled.
*
* @param propertiesToModify The properties of the message to modify while deferring the message
*
* @returns Promise<void>
Expand All @@ -894,7 +953,12 @@ export class ServiceBusMessage implements ReceivedMessage {
return;
}
const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId);
throwIfMessageCannotBeSettled(receiver, DispositionType.defer, this.delivery.remote_settled);
throwIfMessageCannotBeSettled(
receiver,
DispositionType.defer,
this.delivery.remote_settled,
this.sessionId
);

return receiver!.settleMessage(this, DispositionType.defer, {
propertiesToModify: propertiesToModify
Expand All @@ -904,6 +968,20 @@ export class ServiceBusMessage implements ReceivedMessage {
/**
* Moves the message to the deadletter sub-queue. To receive a deadletted message, create a new
* QueueClient/SubscriptionClient using the path for the deadletter sub-queue.
*
* - Throws `SessionLockLostError` (for messages from a Queue/Subscription with sessions enabled)
* if the AMQP link with which the message was received is no longer alive. This can
* happen either because the lock on the session expired or the receiver was explicitly closed by
* the user or the AMQP link got closed by the library due to network loss or service error.
* - Throws `MessageLockLostError` (for messages from a Queue/Subscription with sessions not enabled)
* if the lock on the message has expired or the AMQP link with which the message was received is
* no longer alive. The latter can happen if the receiver was explicitly closed by the user or the
* AMQP link got closed by the library due to network loss or service error.
* - Throws an error if the message is already settled. To avoid this error check the `isSettled`
* property on the message if you are not sure whether the message is settled.
* - Throws an error if used in `ReceiveAndDelete` mode because all messages received in this mode
* are pre-settled.
*
* @param options The DeadLetter options that can be provided while
* rejecting the message.
*
Expand Down Expand Up @@ -943,7 +1021,8 @@ export class ServiceBusMessage implements ReceivedMessage {
throwIfMessageCannotBeSettled(
receiver,
DispositionType.deadletter,
this.delivery.remote_settled
this.delivery.remote_settled,
this.sessionId
);

return receiver!.settleMessage(this, DispositionType.deadletter, {
Expand Down Expand Up @@ -979,28 +1058,46 @@ export class ServiceBusMessage implements ReceivedMessage {
}

/**
* @internal
* Logs and Throws an error if the given message cannot be settled.
* @param receiver Receiver to be used to settle this message
* @param operation Settle operation: complete, abandon, defer or deadLetter
* @param isRemoteSettled Boolean indicating if the message has been settled at the remote
* @param sessionId sessionId of the message if applicable
*/
export function throwIfMessageCannotBeSettled(
receiver: MessageReceiver | MessageSession | undefined,
operation: DispositionType,
isRemoteSettled: boolean
isRemoteSettled: boolean,
sessionId?: string
): void {
let errorMessage;
if (!receiver || !receiver.isOpen()) {
errorMessage = `Failed to ${operation} the message as it's receiver has been closed.`;
} else if (receiver.receiveMode !== ReceiveMode.peekLock) {
errorMessage = getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`);
let error: Error | undefined;

if (receiver && receiver.receiveMode !== ReceiveMode.peekLock) {
error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`)
);
} else if (isRemoteSettled) {
errorMessage = `Failed to ${operation} the message as this message has been already settled.`;
error = new Error(`Failed to ${operation} the message as this message is already settled.`);
} else if (!receiver || !receiver.isOpen()) {
const errorMessage =
`Failed to ${operation} the message as the AMQP link with which the message was ` +
`received is no longer alive.`;
if (sessionId != undefined) {
error = translate({
description: errorMessage,
condition: ErrorNameConditionMapper.SessionLockLostError
});
} else {
error = translate({
description: errorMessage,
condition: ErrorNameConditionMapper.MessageLockLostError
});
}
}
if (!errorMessage) {
if (!error) {
return;
}
const error = new Error(errorMessage);
if (receiver) {
log.error(
"An error occured when settling a message using the receiver %s: %O",
Expand Down
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/src/util/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ export function throwTypeErrorIfParameterIsEmptyString(
}

/**
* @internal
* Gets error message for when an operation is not supported in ReceiveAndDelete mode
* @param failedToDo A string to add to the placeholder in the error message. Denotes the action
* that is not supported in ReceiveAndDelete mode
Expand Down
81 changes: 43 additions & 38 deletions sdk/servicebus/service-bus/test/serviceBusClient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
TestMessage
} from "./testUtils";
import { ClientType } from "../src/client";
import { throwIfMessageCannotBeSettled, DispositionType } from "../src/serviceBusMessage";
const should = chai.should();
dotenv.config();
chai.use(chaiAsPromised);
Expand Down Expand Up @@ -478,47 +479,51 @@ describe("Errors after close()", function(): void {
/**
* Tests the error from settling a message after the receiver is closed
*/
async function testDisposition(): Promise<void> {
async function testAllDispositions(): Promise<void> {
await testDisposition(DispositionType.complete);
await testDisposition(DispositionType.abandon);
await testDisposition(DispositionType.defer);
await testDisposition(DispositionType.deadletter);
}

async function testDisposition(operation: DispositionType): Promise<void> {
let caughtError: Error | undefined;
try {
await receivedMessage.complete();
} catch (error) {
caughtError = error;
}
should.equal(
caughtError && caughtError.message,
"Failed to complete the message as it's receiver has been closed."
);
let expectedError: Error | undefined;

try {
await receivedMessage.abandon();
switch (operation) {
case DispositionType.complete:
await receivedMessage.complete();
break;
case DispositionType.abandon:
await receivedMessage.abandon();
break;
case DispositionType.defer:
await receivedMessage.defer();
break;
case DispositionType.deadletter:
await receivedMessage.deadLetter();
break;

default:
break;
}
} catch (error) {
caughtError = error;
}
should.equal(
caughtError && caughtError.message,
"Failed to abandon the message as it's receiver has been closed."
);

try {
await receivedMessage.defer();
throwIfMessageCannotBeSettled(
undefined,
operation,
receivedMessage.isSettled,
receivedMessage.sessionId
);
} catch (error) {
caughtError = error;
expectedError = error;
}
should.equal(
caughtError && caughtError.message,
"Failed to defer the message as it's receiver has been closed."
);

try {
await receivedMessage.deadLetter();
} catch (error) {
caughtError = error;
}
should.equal(
caughtError && caughtError.message,
"Failed to deadletter the message as it's receiver has been closed."
);
should.equal(caughtError && caughtError.message, expectedError && expectedError.message);
}

/**
Expand Down Expand Up @@ -1349,7 +1354,7 @@ describe("Errors after close()", function(): void {
await testReceiver(
getReceiverClosedErrorMsg(receiverClient.entityPath, ClientType.QueueClient, false)
);
await testDisposition();
await testAllDispositions();
});

it("Partitioned Queue with sessions: errors after close() on receiver", async function(): Promise<
Expand All @@ -1370,7 +1375,7 @@ describe("Errors after close()", function(): void {
TestMessage.sessionId
)
);
await testDisposition();
await testAllDispositions();
});

it("Partitioned Topic/Subscription: errors after close() on receiver", async function(): Promise<
Expand All @@ -1385,7 +1390,7 @@ describe("Errors after close()", function(): void {
await testReceiver(
getReceiverClosedErrorMsg(receiverClient.entityPath, ClientType.SubscriptionClient, false)
);
await testDisposition();
await testAllDispositions();
});

it("Partitioned Topic/Subscription with sessions: errors after close() on receiver", async function(): Promise<
Expand All @@ -1406,7 +1411,7 @@ describe("Errors after close()", function(): void {
TestMessage.sessionId
)
);
await testDisposition();
await testAllDispositions();
});

it("Unpartitioned Queue: errors after close() on receiver", async function(): Promise<void> {
Expand All @@ -1419,7 +1424,7 @@ describe("Errors after close()", function(): void {
await testReceiver(
getReceiverClosedErrorMsg(receiverClient.entityPath, ClientType.QueueClient, false)
);
await testDisposition();
await testAllDispositions();
});

it("Unpartitioned Queue with sessions: errors after close() on receiver", async function(): Promise<
Expand All @@ -1440,7 +1445,7 @@ describe("Errors after close()", function(): void {
TestMessage.sessionId
)
);
await testDisposition();
await testAllDispositions();
});

it("Unpartitioned Topic/Subscription: errors after close() on receiver", async function(): Promise<
Expand All @@ -1455,7 +1460,7 @@ describe("Errors after close()", function(): void {
await testReceiver(
getReceiverClosedErrorMsg(receiverClient.entityPath, ClientType.SubscriptionClient, false)
);
await testDisposition();
await testAllDispositions();
});

it("Unpartitioned Topic/Subscription with sessions: errors after close() on receiver", async function(): Promise<
Expand All @@ -1476,7 +1481,7 @@ describe("Errors after close()", function(): void {
TestMessage.sessionId
)
);
await testDisposition();
await testAllDispositions();
});
});

Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/test/streamingReceiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ describe("Streaming - Settle an already Settled message throws error", () => {
const testError = (err: Error, operation: DispositionType) => {
should.equal(
err.message,
`Failed to ${operation} the message as this message has been already settled.`,
`Failed to ${operation} the message as this message is already settled.`,
"ErrorMessage is different than expected"
);
errorWasThrown = true;
Expand Down
Loading