diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index 51a11fbed287..444ffbaf8b5a 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -9,7 +9,7 @@ import { MessageAnnotations, DeliveryAnnotations } from "rhea-promise"; -import { Constants, AmqpMessage } from "@azure/amqp-common"; +import { Constants, AmqpMessage, translate, ErrorNameConditionMapper } from "@azure/amqp-common"; import * as log from "./log"; import { ClientEntityContext } from "./clientEntityContext"; import { reorderLockToken } from "../src/util/utils"; @@ -784,6 +784,13 @@ export class ServiceBusMessage implements ReceivedMessage { * @readonly */ readonly _amqpMessage: AmqpMessage; + /** + * @property Boolean denoting if the message has already been settled. + * @readonly + */ + public get isSettled(): boolean { + return this.delivery.remote_settled; + } /** * @property {ClientEntityContext} _context The client entity context. * @readonly @@ -810,6 +817,20 @@ export class ServiceBusMessage implements ReceivedMessage { /** * Removes the message from Service Bus. + * + * - Throws `SessionLockLostError` (for messages from a Queue/Subscription with sessions enabled) + * if the AMQP link with which the message was received is no longer alive. This can + * happen either because the lock on the session expired or the receiver was explicitly closed by + * the user or the AMQP link got closed by the library due to network loss or service error. + * - Throws `MessageLockLostError` (for messages from a Queue/Subscription with sessions not enabled) + * if the lock on the message has expired or the AMQP link with which the message was received is + * no longer alive. The latter can happen if the receiver was explicitly closed by the user or the + * AMQP link got closed by the library due to network loss or service error. + * - Throws an error if the message is already settled. To avoid this error check the `isSettled` + * property on the message if you are not sure whether the message is settled. + * - Throws an error if used in `ReceiveAndDelete` mode because all messages received in this mode + * are pre-settled. + * * @returns Promise. */ async complete(): Promise { @@ -832,13 +853,32 @@ export class ServiceBusMessage implements ReceivedMessage { return; } const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId); - throwIfMessageCannotBeSettled(receiver, DispositionType.complete, this.delivery.remote_settled); + throwIfMessageCannotBeSettled( + receiver, + DispositionType.complete, + this.delivery.remote_settled, + this.sessionId + ); return receiver!.settleMessage(this, DispositionType.complete); } /** * The lock held on the message by the receiver is let go, making the message available again in * Service Bus for another receive operation. + * + * - Throws `SessionLockLostError` (for messages from a Queue/Subscription with sessions enabled) + * if the AMQP link with which the message was received is no longer alive. This can + * happen either because the lock on the session expired or the receiver was explicitly closed by + * the user or the AMQP link got closed by the library due to network loss or service error. + * - Throws `MessageLockLostError` (for messages from a Queue/Subscription with sessions not enabled) + * if the lock on the message has expired or the AMQP link with which the message was received is + * no longer alive. The latter can happen if the receiver was explicitly closed by the user or the + * AMQP link got closed by the library due to network loss or service error. + * - Throws an error if the message is already settled. To avoid this error check the `isSettled` + * property on the message if you are not sure whether the message is settled. + * - Throws an error if used in `ReceiveAndDelete` mode because all messages received in this mode + * are pre-settled. + * * @param propertiesToModify The properties of the message to modify while abandoning the message. * * @return Promise. @@ -862,7 +902,12 @@ export class ServiceBusMessage implements ReceivedMessage { return; } const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId); - throwIfMessageCannotBeSettled(receiver, DispositionType.abandon, this.delivery.remote_settled); + throwIfMessageCannotBeSettled( + receiver, + DispositionType.abandon, + this.delivery.remote_settled, + this.sessionId + ); return receiver!.settleMessage(this, DispositionType.abandon, { propertiesToModify: propertiesToModify @@ -872,6 +917,20 @@ export class ServiceBusMessage implements ReceivedMessage { /** * Defers the processing of the message. Save the `sequenceNumber` of the message, in order to * receive it message again in the future using the `receiveDeferredMessage` method. + * + * - Throws `SessionLockLostError` (for messages from a Queue/Subscription with sessions enabled) + * if the AMQP link with which the message was received is no longer alive. This can + * happen either because the lock on the session expired or the receiver was explicitly closed by + * the user or the AMQP link got closed by the library due to network loss or service error. + * - Throws `MessageLockLostError` (for messages from a Queue/Subscription with sessions not enabled) + * if the lock on the message has expired or the AMQP link with which the message was received is + * no longer alive. The latter can happen if the receiver was explicitly closed by the user or the + * AMQP link got closed by the library due to network loss or service error. + * - Throws an error if the message is already settled. To avoid this error check the `isSettled` + * property on the message if you are not sure whether the message is settled. + * - Throws an error if used in `ReceiveAndDelete` mode because all messages received in this mode + * are pre-settled. + * * @param propertiesToModify The properties of the message to modify while deferring the message * * @returns Promise @@ -894,7 +953,12 @@ export class ServiceBusMessage implements ReceivedMessage { return; } const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId); - throwIfMessageCannotBeSettled(receiver, DispositionType.defer, this.delivery.remote_settled); + throwIfMessageCannotBeSettled( + receiver, + DispositionType.defer, + this.delivery.remote_settled, + this.sessionId + ); return receiver!.settleMessage(this, DispositionType.defer, { propertiesToModify: propertiesToModify @@ -904,6 +968,20 @@ export class ServiceBusMessage implements ReceivedMessage { /** * Moves the message to the deadletter sub-queue. To receive a deadletted message, create a new * QueueClient/SubscriptionClient using the path for the deadletter sub-queue. + * + * - Throws `SessionLockLostError` (for messages from a Queue/Subscription with sessions enabled) + * if the AMQP link with which the message was received is no longer alive. This can + * happen either because the lock on the session expired or the receiver was explicitly closed by + * the user or the AMQP link got closed by the library due to network loss or service error. + * - Throws `MessageLockLostError` (for messages from a Queue/Subscription with sessions not enabled) + * if the lock on the message has expired or the AMQP link with which the message was received is + * no longer alive. The latter can happen if the receiver was explicitly closed by the user or the + * AMQP link got closed by the library due to network loss or service error. + * - Throws an error if the message is already settled. To avoid this error check the `isSettled` + * property on the message if you are not sure whether the message is settled. + * - Throws an error if used in `ReceiveAndDelete` mode because all messages received in this mode + * are pre-settled. + * * @param options The DeadLetter options that can be provided while * rejecting the message. * @@ -943,7 +1021,8 @@ export class ServiceBusMessage implements ReceivedMessage { throwIfMessageCannotBeSettled( receiver, DispositionType.deadletter, - this.delivery.remote_settled + this.delivery.remote_settled, + this.sessionId ); return receiver!.settleMessage(this, DispositionType.deadletter, { @@ -979,28 +1058,46 @@ export class ServiceBusMessage implements ReceivedMessage { } /** + * @internal * Logs and Throws an error if the given message cannot be settled. * @param receiver Receiver to be used to settle this message * @param operation Settle operation: complete, abandon, defer or deadLetter * @param isRemoteSettled Boolean indicating if the message has been settled at the remote + * @param sessionId sessionId of the message if applicable */ export function throwIfMessageCannotBeSettled( receiver: MessageReceiver | MessageSession | undefined, operation: DispositionType, - isRemoteSettled: boolean + isRemoteSettled: boolean, + sessionId?: string ): void { - let errorMessage; - if (!receiver || !receiver.isOpen()) { - errorMessage = `Failed to ${operation} the message as it's receiver has been closed.`; - } else if (receiver.receiveMode !== ReceiveMode.peekLock) { - errorMessage = getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`); + let error: Error | undefined; + + if (receiver && receiver.receiveMode !== ReceiveMode.peekLock) { + error = new Error( + getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`) + ); } else if (isRemoteSettled) { - errorMessage = `Failed to ${operation} the message as this message has been already settled.`; + error = new Error(`Failed to ${operation} the message as this message is already settled.`); + } else if (!receiver || !receiver.isOpen()) { + const errorMessage = + `Failed to ${operation} the message as the AMQP link with which the message was ` + + `received is no longer alive.`; + if (sessionId != undefined) { + error = translate({ + description: errorMessage, + condition: ErrorNameConditionMapper.SessionLockLostError + }); + } else { + error = translate({ + description: errorMessage, + condition: ErrorNameConditionMapper.MessageLockLostError + }); + } } - if (!errorMessage) { + if (!error) { return; } - const error = new Error(errorMessage); if (receiver) { log.error( "An error occured when settling a message using the receiver %s: %O", diff --git a/sdk/servicebus/service-bus/src/util/errors.ts b/sdk/servicebus/service-bus/src/util/errors.ts index 056eb1dd59ee..5d84f711f007 100644 --- a/sdk/servicebus/service-bus/src/util/errors.ts +++ b/sdk/servicebus/service-bus/src/util/errors.ts @@ -261,6 +261,7 @@ export function throwTypeErrorIfParameterIsEmptyString( } /** + * @internal * Gets error message for when an operation is not supported in ReceiveAndDelete mode * @param failedToDo A string to add to the placeholder in the error message. Denotes the action * that is not supported in ReceiveAndDelete mode diff --git a/sdk/servicebus/service-bus/test/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/serviceBusClient.spec.ts index 3bac74071aa7..264944ef45fb 100644 --- a/sdk/servicebus/service-bus/test/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/serviceBusClient.spec.ts @@ -36,6 +36,7 @@ import { TestMessage } from "./testUtils"; import { ClientType } from "../src/client"; +import { throwIfMessageCannotBeSettled, DispositionType } from "../src/serviceBusMessage"; const should = chai.should(); dotenv.config(); chai.use(chaiAsPromised); @@ -478,47 +479,51 @@ describe("Errors after close()", function(): void { /** * Tests the error from settling a message after the receiver is closed */ - async function testDisposition(): Promise { + async function testAllDispositions(): Promise { + await testDisposition(DispositionType.complete); + await testDisposition(DispositionType.abandon); + await testDisposition(DispositionType.defer); + await testDisposition(DispositionType.deadletter); + } + + async function testDisposition(operation: DispositionType): Promise { let caughtError: Error | undefined; - try { - await receivedMessage.complete(); - } catch (error) { - caughtError = error; - } - should.equal( - caughtError && caughtError.message, - "Failed to complete the message as it's receiver has been closed." - ); + let expectedError: Error | undefined; try { - await receivedMessage.abandon(); + switch (operation) { + case DispositionType.complete: + await receivedMessage.complete(); + break; + case DispositionType.abandon: + await receivedMessage.abandon(); + break; + case DispositionType.defer: + await receivedMessage.defer(); + break; + case DispositionType.deadletter: + await receivedMessage.deadLetter(); + break; + + default: + break; + } } catch (error) { caughtError = error; } - should.equal( - caughtError && caughtError.message, - "Failed to abandon the message as it's receiver has been closed." - ); try { - await receivedMessage.defer(); + throwIfMessageCannotBeSettled( + undefined, + operation, + receivedMessage.isSettled, + receivedMessage.sessionId + ); } catch (error) { - caughtError = error; + expectedError = error; } - should.equal( - caughtError && caughtError.message, - "Failed to defer the message as it's receiver has been closed." - ); - try { - await receivedMessage.deadLetter(); - } catch (error) { - caughtError = error; - } - should.equal( - caughtError && caughtError.message, - "Failed to deadletter the message as it's receiver has been closed." - ); + should.equal(caughtError && caughtError.message, expectedError && expectedError.message); } /** @@ -1349,7 +1354,7 @@ describe("Errors after close()", function(): void { await testReceiver( getReceiverClosedErrorMsg(receiverClient.entityPath, ClientType.QueueClient, false) ); - await testDisposition(); + await testAllDispositions(); }); it("Partitioned Queue with sessions: errors after close() on receiver", async function(): Promise< @@ -1370,7 +1375,7 @@ describe("Errors after close()", function(): void { TestMessage.sessionId ) ); - await testDisposition(); + await testAllDispositions(); }); it("Partitioned Topic/Subscription: errors after close() on receiver", async function(): Promise< @@ -1385,7 +1390,7 @@ describe("Errors after close()", function(): void { await testReceiver( getReceiverClosedErrorMsg(receiverClient.entityPath, ClientType.SubscriptionClient, false) ); - await testDisposition(); + await testAllDispositions(); }); it("Partitioned Topic/Subscription with sessions: errors after close() on receiver", async function(): Promise< @@ -1406,7 +1411,7 @@ describe("Errors after close()", function(): void { TestMessage.sessionId ) ); - await testDisposition(); + await testAllDispositions(); }); it("Unpartitioned Queue: errors after close() on receiver", async function(): Promise { @@ -1419,7 +1424,7 @@ describe("Errors after close()", function(): void { await testReceiver( getReceiverClosedErrorMsg(receiverClient.entityPath, ClientType.QueueClient, false) ); - await testDisposition(); + await testAllDispositions(); }); it("Unpartitioned Queue with sessions: errors after close() on receiver", async function(): Promise< @@ -1440,7 +1445,7 @@ describe("Errors after close()", function(): void { TestMessage.sessionId ) ); - await testDisposition(); + await testAllDispositions(); }); it("Unpartitioned Topic/Subscription: errors after close() on receiver", async function(): Promise< @@ -1455,7 +1460,7 @@ describe("Errors after close()", function(): void { await testReceiver( getReceiverClosedErrorMsg(receiverClient.entityPath, ClientType.SubscriptionClient, false) ); - await testDisposition(); + await testAllDispositions(); }); it("Unpartitioned Topic/Subscription with sessions: errors after close() on receiver", async function(): Promise< @@ -1476,7 +1481,7 @@ describe("Errors after close()", function(): void { TestMessage.sessionId ) ); - await testDisposition(); + await testAllDispositions(); }); }); diff --git a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts index ce39b88d5a06..01938fb20671 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts @@ -703,7 +703,7 @@ describe("Streaming - Settle an already Settled message throws error", () => { const testError = (err: Error, operation: DispositionType) => { should.equal( err.message, - `Failed to ${operation} the message as this message has been already settled.`, + `Failed to ${operation} the message as this message is already settled.`, "ErrorMessage is different than expected" ); errorWasThrown = true; diff --git a/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts index e564391d9213..24e1cf56cbec 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts @@ -830,7 +830,7 @@ describe("Sessions Streaming - Settle an already Settled message throws error", const testError = (err: Error, operation: DispositionType) => { should.equal( err.message, - `Failed to ${operation} the message as this message has been already settled.`, + `Failed to ${operation} the message as this message is already settled.`, "ErrorMessage is different than expected" ); errorWasThrown = true;