diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index 0f8e5bab30a4..138d2e31ac4c 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -253,7 +253,6 @@ export class BatchingReceiverLite { this._createServiceBusMessage = (context: MessageAndDelivery) => { return new ServiceBusMessageImpl( _connectionContext, - entityPath, context.message!, context.delivery!, true, diff --git a/sdk/servicebus/service-bus/src/core/managementClient.ts b/sdk/servicebus/service-bus/src/core/managementClient.ts index 956e6b638efa..6826cecc0fe0 100644 --- a/sdk/servicebus/service-bus/src/core/managementClient.ts +++ b/sdk/servicebus/service-bus/src/core/managementClient.ts @@ -793,7 +793,6 @@ export class ManagementClient extends LinkEntity { const decodedMessage = RheaMessageUtil.decode(msg.message); const message = new ServiceBusMessageImpl( this._context, - this.entityPath, decodedMessage as any, { tag: msg["lock-token"] } as any, false, diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index f5f6bc8fe907..81c3d3a01c92 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -28,6 +28,7 @@ import { receiverLogger as logger } from "../log"; import { AmqpError, EventContext, OnAmqpEvent } from "rhea-promise"; import { ServiceBusMessageImpl } from "../serviceBusMessage"; import { AbortSignalLike } from "@azure/abort-controller"; +import { abandonMessage, completeMessage } from "../receivers/shared"; /** * @internal @@ -222,7 +223,6 @@ export class StreamingReceiver extends MessageReceiver { const bMessage: ServiceBusMessageImpl = new ServiceBusMessageImpl( this._context, - this.entityPath, context.message!, context.delivery!, true, @@ -279,7 +279,7 @@ export class StreamingReceiver extends MessageReceiver { this.name, error ); - await bMessage.abandon(); + await abandonMessage(bMessage, this._context, entityPath); } catch (abandonError) { const translatedError = translate(abandonError); logger.logError( @@ -316,7 +316,7 @@ export class StreamingReceiver extends MessageReceiver { this.logPrefix, bMessage.messageId ); - await bMessage.complete(); + await completeMessage(bMessage, this._context, entityPath); } catch (completeError) { const translatedError = translate(completeError); logger.logError( diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index 86cdd5ce5179..c5e1e90eacfe 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -14,6 +14,7 @@ import { ServiceBusReceivedMessage } from "../serviceBusMessage"; import { ConnectionContext } from "../connectionContext"; import { getAlreadyReceivingErrorMsg, + getErrorMessageNotSupportedInReceiveAndDeleteMode, getReceiverClosedErrorMsg, throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing, @@ -22,7 +23,15 @@ import { import { OnError, OnMessage, ReceiveOptions } from "../core/messageReceiver"; import { StreamingReceiverInitArgs, StreamingReceiver } from "../core/streamingReceiver"; import { BatchingReceiver } from "../core/batchingReceiver"; -import { assertValidMessageHandlers, getMessageIterator, wrapProcessErrorHandler } from "./shared"; +import { + abandonMessage, + assertValidMessageHandlers, + completeMessage, + deadLetterMessage, + deferMessage, + getMessageIterator, + wrapProcessErrorHandler +} from "./shared"; import Long from "long"; import { ServiceBusMessageImpl, DeadLetterOptions } from "../serviceBusMessage"; import { Constants, RetryConfig, RetryOperationType, RetryOptions, retry } from "@azure/core-amqp"; @@ -611,38 +620,71 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { }; } - completeMessage(message: ServiceBusReceivedMessage): Promise { + async completeMessage(message: ServiceBusReceivedMessage): Promise { const msgImpl = message as ServiceBusMessageImpl; - return msgImpl.complete(); + return completeMessage(msgImpl, this._context, this.entityPath); } - abandonMessage( + async abandonMessage( message: ServiceBusReceivedMessage, propertiesToModify?: { [key: string]: any } ): Promise { const msgImpl = message as ServiceBusMessageImpl; - return msgImpl.abandon(propertiesToModify); + return abandonMessage(msgImpl, this._context, this.entityPath, propertiesToModify); } - deferMessage( + async deferMessage( message: ServiceBusReceivedMessage, propertiesToModify?: { [key: string]: any } ): Promise { const msgImpl = message as ServiceBusMessageImpl; - return msgImpl.defer(propertiesToModify); + return deferMessage(msgImpl, this._context, this.entityPath, propertiesToModify); } - deadLetterMessage( + async deadLetterMessage( message: ServiceBusReceivedMessage, options?: DeadLetterOptions & { [key: string]: any } ): Promise { const msgImpl = message as ServiceBusMessageImpl; - return msgImpl.deadLetter(options); + return deadLetterMessage(msgImpl, this._context, this.entityPath, options); } - renewMessageLock(message: ServiceBusReceivedMessage): Promise { + async renewMessageLock(message: ServiceBusReceivedMessage): Promise { const msgImpl = message as ServiceBusMessageImpl; - return msgImpl.renewLock(); + if (!msgImpl.delivery) { + throw new Error("A peeked message does not have a lock to be renewed."); + } + + let associatedLinkName: string | undefined; + let error: Error | undefined; + if (!message.lockToken) { + error = new Error( + getErrorMessageNotSupportedInReceiveAndDeleteMode(`renew the lock on the message`) + ); + } else if (msgImpl.delivery.remote_settled) { + error = new Error(`Failed to renew the lock as this message is already settled.`); + } + if (error) { + logger.logError( + error, + "[%s] An error occurred when renewing the lock on the message with id '%s'", + this._context.connectionId, + message.messageId + ); + throw error; + } + + if (msgImpl.delivery.link) { + const associatedReceiver = this._context.getReceiverFromCache(msgImpl.delivery.link.name); + associatedLinkName = associatedReceiver?.name; + } + return this._context + .getManagementClient(this.entityPath) + .renewLock(message.lockToken!, { associatedLinkName }) + .then((lockedUntil) => { + message.lockedUntilUtc = lockedUntil; + return lockedUntil; + }); } async close(): Promise { diff --git a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts index 6cc9156a119d..814c216590a0 100644 --- a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts @@ -13,7 +13,15 @@ import { throwTypeErrorIfParameterNotLong } from "../util/errors"; import { OnError, OnMessage } from "../core/messageReceiver"; -import { assertValidMessageHandlers, getMessageIterator, wrapProcessErrorHandler } from "./shared"; +import { + abandonMessage, + assertValidMessageHandlers, + completeMessage, + deadLetterMessage, + deferMessage, + getMessageIterator, + wrapProcessErrorHandler +} from "./shared"; import { defaultMaxTimeAfterFirstMessageForBatchingMs, ServiceBusReceiver } from "./receiver"; import Long from "long"; import { ServiceBusMessageImpl, DeadLetterOptions } from "../serviceBusMessage"; @@ -485,7 +493,7 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver async completeMessage(message: ServiceBusReceivedMessage): Promise { const msgImpl = message as ServiceBusMessageImpl; - return msgImpl.complete(); + return completeMessage(msgImpl, this._context, this.entityPath); } async abandonMessage( @@ -493,7 +501,7 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver propertiesToModify?: { [key: string]: any } ): Promise { const msgImpl = message as ServiceBusMessageImpl; - return msgImpl.abandon(propertiesToModify); + return abandonMessage(msgImpl, this._context, this.entityPath, propertiesToModify); } async deferMessage( @@ -501,7 +509,7 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver propertiesToModify?: { [key: string]: any } ): Promise { const msgImpl = message as ServiceBusMessageImpl; - return msgImpl.defer(propertiesToModify); + return deferMessage(msgImpl, this._context, this.entityPath, propertiesToModify); } async deadLetterMessage( @@ -509,7 +517,7 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver options?: DeadLetterOptions & { [key: string]: any } ): Promise { const msgImpl = message as ServiceBusMessageImpl; - return msgImpl.deadLetter(options); + return deadLetterMessage(msgImpl, this._context, this.entityPath, options); } async renewMessageLock(): Promise { diff --git a/sdk/servicebus/service-bus/src/receivers/shared.ts b/sdk/servicebus/service-bus/src/receivers/shared.ts index 905db5a079af..9dca9a530eb6 100644 --- a/sdk/servicebus/service-bus/src/receivers/shared.ts +++ b/sdk/servicebus/service-bus/src/receivers/shared.ts @@ -5,7 +5,16 @@ import { MessageHandlers, ProcessErrorArgs } from "../models"; import { ServiceBusReceiver } from "./receiver"; import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; import { receiverLogger, ServiceBusLogger } from "../log"; -import { ServiceBusReceivedMessage } from "../serviceBusMessage"; +import { + DeadLetterOptions, + DispositionType, + ServiceBusMessageImpl, + ServiceBusReceivedMessage +} from "../serviceBusMessage"; +import { DispositionStatusOptions } from "../core/managementClient"; +import { ConnectionContext } from "../connectionContext"; +import { getErrorMessageNotSupportedInReceiveAndDeleteMode } from "../util/errors"; +import { ErrorNameConditionMapper, translate } from "@azure/core-amqp"; /** * @internal @@ -58,3 +67,158 @@ export function wrapProcessErrorHandler( } }; } + +export function completeMessage( + message: ServiceBusMessageImpl, + context: ConnectionContext, + entityPath: string +): Promise { + receiverLogger.verbose( + "[%s] Completing the message with id '%s'.", + context.connectionId, + message.messageId + ); + return settleMessage(message, DispositionType.complete, context, entityPath); +} + +export function abandonMessage( + message: ServiceBusMessageImpl, + context: ConnectionContext, + entityPath: string, + propertiesToModify?: { [key: string]: any } +): Promise { + receiverLogger.verbose( + "[%s] Abandoning the message with id '%s'.", + context.connectionId, + message.messageId + ); + return settleMessage(message, DispositionType.abandon, context, entityPath, { + propertiesToModify + }); +} + +export function deferMessage( + message: ServiceBusMessageImpl, + context: ConnectionContext, + entityPath: string, + propertiesToModify?: { [key: string]: any } +): Promise { + receiverLogger.verbose( + "[%s] Deferring the message with id '%s'.", + context.connectionId, + message.messageId + ); + return settleMessage(message, DispositionType.defer, context, entityPath, { + propertiesToModify + }); +} + +export function deadLetterMessage( + message: ServiceBusMessageImpl, + context: ConnectionContext, + entityPath: string, + propertiesToModify?: DeadLetterOptions & { [key: string]: any } +): Promise { + receiverLogger.verbose( + "[%s] Deadlettering the message with id '%s'.", + context.connectionId, + message.messageId + ); + + const actualPropertiesToModify: Partial = { + ...propertiesToModify + }; + + // these two fields are handled specially and don't need to be in here. + delete actualPropertiesToModify.deadLetterErrorDescription; + delete actualPropertiesToModify.deadLetterReason; + + const dispositionStatusOptions: DispositionStatusOptions = { + propertiesToModify: actualPropertiesToModify, + deadLetterReason: propertiesToModify?.deadLetterReason, + deadLetterDescription: propertiesToModify?.deadLetterErrorDescription + }; + + return settleMessage( + message, + DispositionType.deadletter, + context, + entityPath, + dispositionStatusOptions + ); +} + +function settleMessage( + message: ServiceBusMessageImpl, + operation: DispositionType, + context: ConnectionContext, + entityPath: string, + options?: DispositionStatusOptions +): Promise { + if (!message.delivery) { + throw new Error("A peeked message cannot be settled."); + } + + if (!message.lockToken) { + const error = new Error( + getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`) + ); + receiverLogger.logError( + error, + "[%s] An error occurred when settling a message with id '%s'", + context.connectionId, + message.messageId + ); + throw error; + } + const isDeferredMessage = !message.delivery.link; + const receiver = isDeferredMessage + ? undefined + : context.getReceiverFromCache(message.delivery.link.name, message.sessionId); + const associatedLinkName = receiver?.name; + + if (!isDeferredMessage) { + // In case the message wasn't from a deferred queue, + // 1. We can verify the remote_settled flag on the delivery + // - If the flag is true, throw an error since the message has been settled (Specifically, with a receive link) + // - If the flag is false, we can't say that the message has not been settled + // since settling with the management link won't update the delivery (In this case, service would throw an error) + // 2. If the message has a session-id and if the associated receiver link is unavailable, + // then throw an error since we need a lock on the session to settle the message. + let error: Error | undefined; + if (message.delivery.remote_settled) { + error = new Error(`Failed to ${operation} the message as this message is already settled.`); + } else if ((!receiver || !receiver.isOpen()) && message.sessionId != undefined) { + error = translate({ + description: + `Failed to ${operation} the message as the AMQP link with which the message was ` + + `received is no longer alive.`, + condition: ErrorNameConditionMapper.SessionLockLostError + }); + } + if (error) { + receiverLogger.logError( + error, + "[%s] An error occurred when settling a message with id '%s'", + context.connectionId, + message.messageId + ); + throw error; + } + } + + // Message Settlement with managementLink + // 1. If the received message is deferred as such messages can only be settled using managementLink + // 2. If the associated receiver link is not available. This does not apply to messages from sessions as we need a lock on the session to do so. + if (isDeferredMessage || ((!receiver || !receiver.isOpen()) && message.sessionId == undefined)) { + return context + .getManagementClient(entityPath) + .updateDispositionStatus(message.lockToken, operation, { + ...options, + associatedLinkName, + sessionId: message.sessionId + }); + } + + return receiver!.settleMessage(message, operation, options); +} diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index 4183a3e18eb6..6f067e93d181 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -1,12 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { - AmqpAnnotatedMessage, - Constants, - ErrorNameConditionMapper, - translate -} from "@azure/core-amqp"; +import { AmqpAnnotatedMessage, Constants } from "@azure/core-amqp"; import { Buffer } from "buffer"; import Long from "long"; import { @@ -17,10 +12,8 @@ import { Message as RheaMessage } from "rhea-promise"; import { ConnectionContext } from "./connectionContext"; -import { DispositionStatusOptions } from "./core/managementClient"; -import { messageLogger as logger, receiverLogger } from "./log"; +import { messageLogger as logger } from "./log"; import { ReceiveMode } from "./models"; -import { getErrorMessageNotSupportedInReceiveAndDeleteMode } from "./util/errors"; import { reorderLockToken } from "./util/utils"; /** @@ -778,7 +771,6 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage { */ constructor( private readonly _context: ConnectionContext, - private readonly _entityPath: string, msg: RheaMessage, delivery: Delivery, shouldReorderLockToken: boolean, @@ -798,120 +790,6 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage { this.delivery = delivery; } - /** - * See ServiceBusReceivedMessageWithLock.complete(). - */ - async complete(): Promise { - receiverLogger.verbose( - "[%s] Completing the message with id '%s'.", - this._context.connectionId, - this.messageId - ); - return this.settleMessage(DispositionType.complete); - } - - /** - * See ServiceBusReceivedMessageWithLock.abandon(). - */ - async abandon(propertiesToModify?: { [key: string]: any }): Promise { - // TODO: Figure out a mechanism to convert specified properties to message_annotations. - receiverLogger.verbose( - "[%s] Abandoning the message with id '%s'.", - this._context.connectionId, - this.messageId - ); - return this.settleMessage(DispositionType.abandon, { - propertiesToModify: propertiesToModify - }); - } - - /** - * See ServiceBusReceivedMessageWithLock.defer(). - */ - async defer(propertiesToModify?: { [key: string]: any }): Promise { - receiverLogger.verbose( - "[%s] Deferring the message with id '%s'.", - this._context.connectionId, - this.messageId - ); - return this.settleMessage(DispositionType.defer, { - propertiesToModify: propertiesToModify - }); - } - - /** - * See ServiceBusReceivedMessageWithLock.deadLetter(). - */ - async deadLetter(propertiesToModify?: DeadLetterOptions & { [key: string]: any }): Promise { - receiverLogger.verbose( - "[%s] Deadlettering the message with id '%s'.", - this._context.connectionId, - this.messageId - ); - - const actualPropertiesToModify: Partial = { - ...propertiesToModify - }; - - // these two fields are handled specially and don't need to be in here. - delete actualPropertiesToModify.deadLetterErrorDescription; - delete actualPropertiesToModify.deadLetterReason; - - const dispositionStatusOptions: DispositionStatusOptions = { - propertiesToModify: actualPropertiesToModify, - deadLetterReason: propertiesToModify?.deadLetterReason, - deadLetterDescription: propertiesToModify?.deadLetterErrorDescription - }; - return this.settleMessage(DispositionType.deadletter, dispositionStatusOptions); - } - - /** - * Renews the lock on the message for the duration as specified during the Queue/Subscription - * creation. - * - Check the `lockedUntilUtc` property on the message for the time when the lock expires. - * - If a message is not settled (using either `complete()`, `defer()` or `deadletter()`, - * before its lock expires, then the message lands back in the Queue/Subscription for the next - * receive operation. - * - * @returns Promise - New lock token expiry date and time in UTC format. - * @throws Error if the underlying connection, client or receiver is closed. - * @throws MessagingError if the service returns an error while renewing message lock. - */ - async renewLock(): Promise { - let associatedLinkName: string | undefined; - let error: Error | undefined; - if (this.sessionId) { - error = translate({ - description: `Invalid operation on the message, message lock doesn't exist when dealing with sessions`, - condition: ErrorNameConditionMapper.InvalidOperationError - }); - } else if (!this.lockToken) { - error = new Error( - getErrorMessageNotSupportedInReceiveAndDeleteMode(`renew the lock on the message`) - ); - } else if (this.delivery.remote_settled) { - error = new Error(`Failed to renew the lock as this message is already settled.`); - } - if (error) { - logger.logError( - error, - "[%s] An error occurred when renewing the lock on the message with id '%s'", - this._context.connectionId, - this.messageId - ); - throw error; - } - - if (this.delivery.link) { - const associatedReceiver = this._context.getReceiverFromCache(this.delivery.link.name); - associatedLinkName = associatedReceiver?.name; - } - this.lockedUntilUtc = await this._context - .getManagementClient(this._entityPath) - .renewLock(this.lockToken!, { associatedLinkName }); - return this.lockedUntilUtc; - } - /** * Creates a clone of the current message to allow it to be re-sent to the queue * @returns ServiceBusMessage @@ -938,84 +816,4 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage { return clone; } - - /** - * Helper method to settle the message. - * @ignore - * @internal - * - * @private - * @param {DispositionStatus} operation - * @param {DispositionStatusOptions} [options] - * @returns {Promise} - * @memberof ServiceBusMessageImpl - */ - private async settleMessage( - operation: DispositionType, - options?: DispositionStatusOptions - ): Promise { - if (!this.lockToken) { - const error = new Error( - getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`) - ); - logger.logError( - error, - "[%s] An error occurred when settling a message with id '%s'", - this._context.connectionId, - this.messageId - ); - throw error; - } - const isDeferredMessage = !this.delivery.link; - const receiver = isDeferredMessage - ? undefined - : this._context.getReceiverFromCache(this.delivery.link.name, this.sessionId); - const associatedLinkName = receiver?.name; - - if (!isDeferredMessage) { - // In case the message wasn't from a deferred queue, - // 1. We can verify the remote_settled flag on the delivery - // - If the flag is true, throw an error since the message has been settled (Specifically, with a receive link) - // - If the flag is false, we can't say that the message has not been settled - // since settling with the management link won't update the delivery (In this case, service would throw an error) - // 2. If the message has a session-id and if the associated receiver link is unavailable, - // then throw an error since we need a lock on the session to settle the message. - let error: Error | undefined; - if (this.delivery.remote_settled) { - error = new Error(`Failed to ${operation} the message as this message is already settled.`); - } else if ((!receiver || !receiver.isOpen()) && this.sessionId != undefined) { - error = translate({ - description: - `Failed to ${operation} the message as the AMQP link with which the message was ` + - `received is no longer alive.`, - condition: ErrorNameConditionMapper.SessionLockLostError - }); - } - if (error) { - logger.logError( - error, - "[%s] An error occurred when settling a message with id '%s'", - this._context.connectionId, - this.messageId - ); - throw error; - } - } - - // Message Settlement with managementLink - // 1. If the received message is deferred as such messages can only be settled using managementLink - // 2. If the associated receiver link is not available. This does not apply to messages from sessions as we need a lock on the session to do so. - if (isDeferredMessage || ((!receiver || !receiver.isOpen()) && this.sessionId == undefined)) { - await this._context - .getManagementClient(this._entityPath) - .updateDispositionStatus(this.lockToken, operation, { - ...options, - associatedLinkName, - sessionId: this.sessionId - }); - return; - } - - return receiver!.settleMessage(this, operation, options); - } } diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index ac2f49373722..eb4640a7b4f8 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -33,6 +33,7 @@ import { SubscribeOptions } from "../models"; import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; +import { abandonMessage, completeMessage } from "../receivers/shared"; /** * Describes the options that need to be provided while creating a message session receiver link. @@ -620,7 +621,6 @@ export class MessageSession extends LinkEntity { const bMessage = new ServiceBusMessageImpl( this._context, - this.entityPath, context.message!, context.delivery!, true, @@ -658,7 +658,7 @@ export class MessageSession extends LinkEntity { this.logPrefix, bMessage.messageId ); - await bMessage.abandon(); + await abandonMessage(bMessage, this._context, this.entityPath); } catch (abandonError) { const translatedError = translate(abandonError); logger.logError( @@ -695,7 +695,7 @@ export class MessageSession extends LinkEntity { this.logPrefix, bMessage.messageId ); - await bMessage.complete(); + await completeMessage(bMessage, this._context, this.entityPath); } catch (completeError) { const translatedError = translate(completeError); logger.logError( diff --git a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts index ea9578951192..a06e36d5959a 100644 --- a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts @@ -24,6 +24,7 @@ import { ReceiverEvents } from "rhea-promise"; const should = chai.should(); chai.use(chaiAsPromised); +const assert = chai.assert; const noSessionTestClientType = getRandomTestClientTypeWithNoSessions(); const withSessionTestClientType = getRandomTestClientTypeWithSessions(); @@ -380,6 +381,60 @@ describe("Batching Receiver", () => { await testDeadletter(); } ); + + async function testPeek(): Promise { + const testMessages = entityNames.usesSessions + ? TestMessage.getSessionSample() + : TestMessage.getSample(); + await sender.sendMessages(testMessages); + + const [peekedMsg] = await receiver.peekMessages(1); + should.equal( + !(peekedMsg as any)["delivery"], + true, + "Peeked msg was not meant to have delivery! We use this assumption to differentiate between peeked msg and other messages." + ); + + const expectedErrorMsg = "A peeked message cannot be settled."; + try { + await receiver.completeMessage(peekedMsg); + assert.fail("completeMessage should have failed"); + } catch (error) { + should.equal(error.message, expectedErrorMsg); + } + try { + await receiver.abandonMessage(peekedMsg); + assert.fail("abandonMessage should have failed"); + } catch (error) { + should.equal(error.message, expectedErrorMsg); + } + try { + await receiver.deferMessage(peekedMsg); + assert.fail("deferMessage should have failed"); + } catch (error) { + should.equal(error.message, expectedErrorMsg); + } + try { + await receiver.deadLetterMessage(peekedMsg); + assert.fail("deadLetterMessage should have failed"); + } catch (error) { + should.equal(error.message, expectedErrorMsg); + } + + await testPeekMsgsLength(receiver, 0); + } + + it(noSessionTestClientType + ": cannot settle peeked message", async function(): Promise { + await beforeEachTest(noSessionTestClientType); + await testPeek(); + }); + + it(withSessionTestClientType + ": cannot settle peeked message", async function(): Promise< + void + > { + await beforeEachTest(withSessionTestClientType); + await testPeek(); + }); }); describe("Batch Receiver - Settle deadlettered message", function(): void { @@ -532,7 +587,7 @@ describe("Batching Receiver", () => { "MessageId is different than expected" ); - await receiver.completeMessage(deferredMsg); + await deadLetterReceiver.completeMessage(deferredMsg); await testPeekMsgsLength(receiver, 0); diff --git a/sdk/servicebus/service-bus/test/deferredMessage.spec.ts b/sdk/servicebus/service-bus/test/deferredMessage.spec.ts index 19ec81f802e2..ddc4b8e4006c 100644 --- a/sdk/servicebus/service-bus/test/deferredMessage.spec.ts +++ b/sdk/servicebus/service-bus/test/deferredMessage.spec.ts @@ -86,6 +86,11 @@ describe("Deferred Messages", () => { if (!deferredMsg) { throw "No message received for sequence number"; } + should.equal( + !!(deferredMsg as any)["delivery"], + true, + "Deferred msg should have delivery! We use this assumption to differentiate between peeked msg and other messages." + ); should.equal(deferredMsg.body, testMessage.body, "MessageBody is different than expected"); should.equal( deferredMsg.messageId, diff --git a/sdk/servicebus/service-bus/test/internal/serviceBusMessage.spec.ts b/sdk/servicebus/service-bus/test/internal/serviceBusMessage.spec.ts index e653fe698d05..bde40688ab3c 100644 --- a/sdk/servicebus/service-bus/test/internal/serviceBusMessage.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/serviceBusMessage.spec.ts @@ -20,7 +20,6 @@ const fakeContext = { decode: (data) => data } } as ConnectionContext; -const fakeEntityPath = "dummy"; const fakeDelivery = {} as Delivery; describe("ServiceBusMessageImpl LockToken unit tests", () => { @@ -40,7 +39,6 @@ describe("ServiceBusMessageImpl LockToken unit tests", () => { it("Lock token in peekLock mode", () => { const sbMessage = new ServiceBusMessageImpl( fakeContext, - fakeEntityPath, amqpMessage, { tag: fakeDeliveryTag } as Delivery, false, @@ -53,7 +51,6 @@ describe("ServiceBusMessageImpl LockToken unit tests", () => { it("Lock token in receiveAndDelete mode", () => { const sbMessage = new ServiceBusMessageImpl( fakeContext, - fakeEntityPath, amqpMessage, { tag: fakeDeliveryTag } as Delivery, false, @@ -103,7 +100,6 @@ describe("ServiceBusMessageImpl AmqpAnnotations unit tests", () => { const sbMessage = new ServiceBusMessageImpl( fakeContext, - fakeEntityPath, amqpMessage, fakeDelivery, false, diff --git a/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts b/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts index ea25d60dbde4..5b77030efecf 100644 --- a/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts +++ b/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts @@ -77,6 +77,12 @@ describe("receive and delete", () => { await sender.sendMessages(testMessages); const msgs = await receiver.receiveMessages(1); + should.equal( + !msgs[0].lockToken, + true, + "Msgs in receiveAndDelete mode should not have locktoken! We use this assumption to differentiate between the two receive modes." + ); + should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 1, "Unexpected number of messages"); should.equal(msgs[0].body, testMessages.body, "MessageBody is different than expected"); diff --git a/sdk/servicebus/service-bus/test/renewLock.spec.ts b/sdk/servicebus/service-bus/test/renewLock.spec.ts index c34f4ad7ef2d..b256b300279e 100644 --- a/sdk/servicebus/service-bus/test/renewLock.spec.ts +++ b/sdk/servicebus/service-bus/test/renewLock.spec.ts @@ -3,6 +3,7 @@ import chai from "chai"; const should = chai.should(); +const assert = chai.assert; import chaiAsPromised from "chai-as-promised"; chai.use(chaiAsPromised); import { delay } from "rhea-promise"; @@ -66,6 +67,25 @@ describe("Message Lock Renewal", () => { } ); + it(testClientType + ": cannot renew lock on peeked message", async function(): Promise { + const receiver = await serviceBusClient.test.createPeekLockReceiver(autoGeneratedEntity); + + const testMessage = TestMessage.getSample(); + await sender.sendMessages(testMessage); + + const [peekedMsg] = await receiver.peekMessages(1); + try { + await receiver.renewMessageLock(peekedMsg); + assert.fail("renewMessageLock should have failed"); + } catch (error) { + should.equal(error.message, "A peeked message does not have a lock to be renewed."); + } + + // Clean up any left over messages + const [unprocessedMsg] = await receiver.receiveMessages(1); + await receiver.completeMessage(unprocessedMsg); + }); + const receiveMethodType: ("subscribe" | "receive" | "iterator")[] = [ "iterator", "subscribe",