From f7899cd29ecc3f096d858a7f7e6379e5e8209ea6 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Thu, 9 May 2019 11:02:31 -0700 Subject: [PATCH 1/5] Pass associatedLinkName to updateDispositionStatus() API --- .../service-bus/src/core/managementClient.ts | 4 ++ .../service-bus/src/serviceBusMessage.ts | 48 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/sdk/servicebus/service-bus/src/core/managementClient.ts b/sdk/servicebus/service-bus/src/core/managementClient.ts index c5d13971ac4a..404e295bcbeb 100644 --- a/sdk/servicebus/service-bus/src/core/managementClient.ts +++ b/sdk/servicebus/service-bus/src/core/managementClient.ts @@ -726,6 +726,7 @@ export class ManagementClient extends LinkEntity { async updateDispositionStatus( lockToken: string, dispositionStatus: DispositionStatus, + associatedLinkName?: string, options?: DispositionStatusOptions ): Promise { throwErrorIfConnectionClosed(this._context.namespace); @@ -757,6 +758,9 @@ export class ManagementClient extends LinkEntity { operation: Constants.operations.updateDisposition } }; + if (associatedLinkName) { + request.application_properties![Constants.associatedLinkName] = associatedLinkName; + } request.application_properties![Constants.trackingId] = generate_uuid(); log.mgmt( "[%s] Update disposition status request body: %O.", diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index f65f96518d1f..e332676cb133 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -817,9 +817,21 @@ export class ServiceBusMessage implements ReceivedMessage { this.messageId ); if (this._context.requestResponseLockedMessages.has(this.lockToken!)) { + let receiverName; + if (this._context.batchingReceiver) { + receiverName = this._context.batchingReceiver.name; + } else if (this._context.streamingReceiver) { + receiverName = this._context.streamingReceiver.name; + } + + if (this.sessionId) { + receiverName = this._context.messageSessions[this.sessionId].name; + } + await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.completed, + receiverName, { sessionId: this.sessionId } @@ -849,9 +861,21 @@ export class ServiceBusMessage implements ReceivedMessage { this.messageId ); if (this._context.requestResponseLockedMessages.has(this.lockToken!)) { + let receiverName; + if (this._context.batchingReceiver) { + receiverName = this._context.batchingReceiver.name; + } else if (this._context.streamingReceiver) { + receiverName = this._context.streamingReceiver.name; + } + + if (this.sessionId) { + receiverName = this._context.messageSessions[this.sessionId].name; + } + await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.abandoned, + receiverName, { propertiesToModify: propertiesToModify, sessionId: this.sessionId } ); @@ -881,9 +905,21 @@ export class ServiceBusMessage implements ReceivedMessage { this.messageId ); if (this._context.requestResponseLockedMessages.has(this.lockToken!)) { + let receiverName; + if (this._context.batchingReceiver) { + receiverName = this._context.batchingReceiver.name; + } else if (this._context.streamingReceiver) { + receiverName = this._context.streamingReceiver.name; + } + + if (this.sessionId) { + receiverName = this._context.messageSessions[this.sessionId].name; + } + await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.defered, + receiverName, { propertiesToModify: propertiesToModify, sessionId: this.sessionId } ); @@ -923,9 +959,21 @@ export class ServiceBusMessage implements ReceivedMessage { this.messageId ); if (this._context.requestResponseLockedMessages.has(this.lockToken!)) { + let receiverName; + if (this._context.batchingReceiver) { + receiverName = this._context.batchingReceiver.name; + } else if (this._context.streamingReceiver) { + receiverName = this._context.streamingReceiver.name; + } + + if (this.sessionId) { + receiverName = this._context.messageSessions[this.sessionId].name; + } + await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.suspended, + receiverName, { deadLetterReason: error.condition, deadLetterDescription: error.description, From 24d2e52b8ab7bbbc8694e74abd3af424663ab423 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Thu, 9 May 2019 16:54:28 -0700 Subject: [PATCH 2/5] Combine if-else --- .../service-bus/src/serviceBusMessage.ts | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index e332676cb133..7aa255ce950a 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -818,16 +818,14 @@ export class ServiceBusMessage implements ReceivedMessage { ); if (this._context.requestResponseLockedMessages.has(this.lockToken!)) { let receiverName; - if (this._context.batchingReceiver) { + if (this.sessionId !== undefined) { + receiverName = this._context.messageSessions[this.sessionId].name; + } else if (this._context.batchingReceiver) { receiverName = this._context.batchingReceiver.name; } else if (this._context.streamingReceiver) { receiverName = this._context.streamingReceiver.name; } - if (this.sessionId) { - receiverName = this._context.messageSessions[this.sessionId].name; - } - await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.completed, @@ -862,16 +860,14 @@ export class ServiceBusMessage implements ReceivedMessage { ); if (this._context.requestResponseLockedMessages.has(this.lockToken!)) { let receiverName; - if (this._context.batchingReceiver) { + if (this.sessionId !== undefined) { + receiverName = this._context.messageSessions[this.sessionId].name; + } else if (this._context.batchingReceiver) { receiverName = this._context.batchingReceiver.name; } else if (this._context.streamingReceiver) { receiverName = this._context.streamingReceiver.name; } - if (this.sessionId) { - receiverName = this._context.messageSessions[this.sessionId].name; - } - await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.abandoned, @@ -906,16 +902,14 @@ export class ServiceBusMessage implements ReceivedMessage { ); if (this._context.requestResponseLockedMessages.has(this.lockToken!)) { let receiverName; - if (this._context.batchingReceiver) { + if (this.sessionId !== undefined) { + receiverName = this._context.messageSessions[this.sessionId].name; + } else if (this._context.batchingReceiver) { receiverName = this._context.batchingReceiver.name; } else if (this._context.streamingReceiver) { receiverName = this._context.streamingReceiver.name; } - if (this.sessionId) { - receiverName = this._context.messageSessions[this.sessionId].name; - } - await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.defered, @@ -960,16 +954,14 @@ export class ServiceBusMessage implements ReceivedMessage { ); if (this._context.requestResponseLockedMessages.has(this.lockToken!)) { let receiverName; - if (this._context.batchingReceiver) { + if (this.sessionId !== undefined) { + receiverName = this._context.messageSessions[this.sessionId].name; + } else if (this._context.batchingReceiver) { receiverName = this._context.batchingReceiver.name; } else if (this._context.streamingReceiver) { receiverName = this._context.streamingReceiver.name; } - if (this.sessionId) { - receiverName = this._context.messageSessions[this.sessionId].name; - } - await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.suspended, From dd71f9bb5280fa4ca40e729651d2fe631a73e746 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Fri, 10 May 2019 10:53:31 -0700 Subject: [PATCH 3/5] Use private helper --- .../service-bus/src/serviceBusMessage.ts | 59 ++++++------------- 1 file changed, 19 insertions(+), 40 deletions(-) diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index 7aa255ce950a..c42c2f920f11 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -817,19 +817,10 @@ export class ServiceBusMessage implements ReceivedMessage { this.messageId ); if (this._context.requestResponseLockedMessages.has(this.lockToken!)) { - let receiverName; - if (this.sessionId !== undefined) { - receiverName = this._context.messageSessions[this.sessionId].name; - } else if (this._context.batchingReceiver) { - receiverName = this._context.batchingReceiver.name; - } else if (this._context.streamingReceiver) { - receiverName = this._context.streamingReceiver.name; - } - await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.completed, - receiverName, + this._getAssociatedReceiverName(), { sessionId: this.sessionId } @@ -859,19 +850,10 @@ export class ServiceBusMessage implements ReceivedMessage { this.messageId ); if (this._context.requestResponseLockedMessages.has(this.lockToken!)) { - let receiverName; - if (this.sessionId !== undefined) { - receiverName = this._context.messageSessions[this.sessionId].name; - } else if (this._context.batchingReceiver) { - receiverName = this._context.batchingReceiver.name; - } else if (this._context.streamingReceiver) { - receiverName = this._context.streamingReceiver.name; - } - await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.abandoned, - receiverName, + this._getAssociatedReceiverName(), { propertiesToModify: propertiesToModify, sessionId: this.sessionId } ); @@ -901,19 +883,10 @@ export class ServiceBusMessage implements ReceivedMessage { this.messageId ); if (this._context.requestResponseLockedMessages.has(this.lockToken!)) { - let receiverName; - if (this.sessionId !== undefined) { - receiverName = this._context.messageSessions[this.sessionId].name; - } else if (this._context.batchingReceiver) { - receiverName = this._context.batchingReceiver.name; - } else if (this._context.streamingReceiver) { - receiverName = this._context.streamingReceiver.name; - } - await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.defered, - receiverName, + this._getAssociatedReceiverName(), { propertiesToModify: propertiesToModify, sessionId: this.sessionId } ); @@ -953,19 +926,10 @@ export class ServiceBusMessage implements ReceivedMessage { this.messageId ); if (this._context.requestResponseLockedMessages.has(this.lockToken!)) { - let receiverName; - if (this.sessionId !== undefined) { - receiverName = this._context.messageSessions[this.sessionId].name; - } else if (this._context.batchingReceiver) { - receiverName = this._context.batchingReceiver.name; - } else if (this._context.streamingReceiver) { - receiverName = this._context.streamingReceiver.name; - } - await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.suspended, - receiverName, + this._getAssociatedReceiverName(), { deadLetterReason: error.condition, deadLetterDescription: error.description, @@ -1014,6 +978,21 @@ export class ServiceBusMessage implements ReceivedMessage { return clone; } + + /** + * Helper function to retrieve active receiver name, if it exists. + */ + _getAssociatedReceiverName(): string { + let receiverName: string; + if (this.sessionId !== undefined) { + receiverName = this._context.messageSessions[this.sessionId].name; + } else if (this._context.batchingReceiver) { + receiverName = this._context.batchingReceiver.name; + } else if (this._context.streamingReceiver) { + receiverName = this._context.streamingReceiver.name; + } + return receiverName!; + } } /** From 442bc7984af8b78be2e2da1e1e269530acd18a2f Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Fri, 10 May 2019 14:41:29 -0700 Subject: [PATCH 4/5] Move helper to util and update 7 other usages --- sdk/servicebus/service-bus/src/queueClient.ts | 21 ++++-------- sdk/servicebus/service-bus/src/receiver.ts | 32 +++++-------------- .../service-bus/src/serviceBusMessage.ts | 25 +++------------ .../service-bus/src/subscriptionClient.ts | 22 ++++--------- sdk/servicebus/service-bus/src/util/utils.ts | 21 ++++++++++++ 5 files changed, 46 insertions(+), 75 deletions(-) diff --git a/sdk/servicebus/service-bus/src/queueClient.ts b/sdk/servicebus/service-bus/src/queueClient.ts index 6c5a6b5796bd..0ffdf46dc9f2 100644 --- a/sdk/servicebus/service-bus/src/queueClient.ts +++ b/sdk/servicebus/service-bus/src/queueClient.ts @@ -17,6 +17,7 @@ import { } from "./util/errors"; import { generate_uuid } from "rhea-promise"; import { ClientEntityContext } from "./clientEntityContext"; +import { getAssociatedReceiverName } from "../src/util/utils"; /** * Describes the client that allows interacting with a Service Bus Queue. @@ -201,13 +202,10 @@ export class QueueClient implements Client { this._context.isClosed ); - let receiverName; - if (this._context.batchingReceiver) { - receiverName = this._context.batchingReceiver.name; - } else if (this._context.streamingReceiver) { - receiverName = this._context.streamingReceiver.name; - } - return this._context.managementClient!.peek(maxMessageCount, receiverName); + return this._context.managementClient!.peek( + maxMessageCount, + getAssociatedReceiverName(this._context) + ); } /** @@ -230,18 +228,11 @@ export class QueueClient implements Client { this._context.isClosed ); - let receiverName; - if (this._context.batchingReceiver) { - receiverName = this._context.batchingReceiver.name; - } else if (this._context.streamingReceiver) { - receiverName = this._context.streamingReceiver.name; - } - return this._context.managementClient!.peekBySequenceNumber( fromSequenceNumber, maxMessageCount, undefined, - receiverName + getAssociatedReceiverName(this._context) ); } diff --git a/sdk/servicebus/service-bus/src/receiver.ts b/sdk/servicebus/service-bus/src/receiver.ts index b23a63687e90..51541869013a 100644 --- a/sdk/servicebus/service-bus/src/receiver.ts +++ b/sdk/servicebus/service-bus/src/receiver.ts @@ -23,6 +23,8 @@ import { getErrorMessageNotSupportedInReceiveAndDeleteMode } from "./util/errors"; +import { getAssociatedReceiverName } from "../src/util/utils"; + /** * The Receiver class can be used to receive messages in a batch or by registering handlers. * Use the `createReceiver` function on the QueueClient or SubscriptionClient to instantiate a Receiver. @@ -186,14 +188,10 @@ export class Receiver { ? String(lockTokenOrMessage.lockToken) : String(lockTokenOrMessage); - let receiverName; - if (this._context.batchingReceiver) { - receiverName = this._context.batchingReceiver.name; - } else if (this._context.streamingReceiver) { - receiverName = this._context.streamingReceiver.name; - } - - const lockedUntilUtc = await this._context.managementClient!.renewLock(lockToken, receiverName); + const lockedUntilUtc = await this._context.managementClient!.renewLock( + lockToken, + getAssociatedReceiverName(this._context) + ); return lockedUntilUtc; } @@ -219,17 +217,10 @@ export class Receiver { sequenceNumber ); - let receiverName; - if (this._context.batchingReceiver) { - receiverName = this._context.batchingReceiver.name; - } else if (this._context.streamingReceiver) { - receiverName = this._context.streamingReceiver.name; - } - const messages = await this._context.managementClient!.receiveDeferredMessages( [sequenceNumber], this._receiveMode, - receiverName + getAssociatedReceiverName(this._context) ); return messages[0]; } @@ -258,17 +249,10 @@ export class Receiver { sequenceNumbers ); - let receiverName; - if (this._context.batchingReceiver) { - receiverName = this._context.batchingReceiver.name; - } else if (this._context.streamingReceiver) { - receiverName = this._context.streamingReceiver.name; - } - return this._context.managementClient!.receiveDeferredMessages( sequenceNumbers, this._receiveMode, - receiverName + getAssociatedReceiverName(this._context) ); } diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index c42c2f920f11..da2eebe25154 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -12,7 +12,7 @@ import { import { Constants, AmqpMessage } from "@azure/amqp-common"; import * as log from "./log"; import { ClientEntityContext } from "./clientEntityContext"; -import { reorderLockToken } from "../src/util/utils"; +import { reorderLockToken, getAssociatedReceiverName } from "../src/util/utils"; import { MessageReceiver } from "../src/core/messageReceiver"; import { MessageSession } from "../src/session/messageSession"; import { getErrorMessageNotSupportedInReceiveAndDeleteMode } from "./util/errors"; @@ -820,7 +820,7 @@ export class ServiceBusMessage implements ReceivedMessage { await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.completed, - this._getAssociatedReceiverName(), + getAssociatedReceiverName(this._context, this.sessionId), { sessionId: this.sessionId } @@ -853,7 +853,7 @@ export class ServiceBusMessage implements ReceivedMessage { await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.abandoned, - this._getAssociatedReceiverName(), + getAssociatedReceiverName(this._context, this.sessionId), { propertiesToModify: propertiesToModify, sessionId: this.sessionId } ); @@ -886,7 +886,7 @@ export class ServiceBusMessage implements ReceivedMessage { await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.defered, - this._getAssociatedReceiverName(), + getAssociatedReceiverName(this._context, this.sessionId), { propertiesToModify: propertiesToModify, sessionId: this.sessionId } ); @@ -929,7 +929,7 @@ export class ServiceBusMessage implements ReceivedMessage { await this._context.managementClient!.updateDispositionStatus( this.lockToken!, DispositionStatus.suspended, - this._getAssociatedReceiverName(), + getAssociatedReceiverName(this._context, this.sessionId), { deadLetterReason: error.condition, deadLetterDescription: error.description, @@ -978,21 +978,6 @@ export class ServiceBusMessage implements ReceivedMessage { return clone; } - - /** - * Helper function to retrieve active receiver name, if it exists. - */ - _getAssociatedReceiverName(): string { - let receiverName: string; - if (this.sessionId !== undefined) { - receiverName = this._context.messageSessions[this.sessionId].name; - } else if (this._context.batchingReceiver) { - receiverName = this._context.batchingReceiver.name; - } else if (this._context.streamingReceiver) { - receiverName = this._context.streamingReceiver.name; - } - return receiverName!; - } } /** diff --git a/sdk/servicebus/service-bus/src/subscriptionClient.ts b/sdk/servicebus/service-bus/src/subscriptionClient.ts index aa91b11d6fb8..29d9a225ef28 100644 --- a/sdk/servicebus/service-bus/src/subscriptionClient.ts +++ b/sdk/servicebus/service-bus/src/subscriptionClient.ts @@ -15,6 +15,7 @@ import { } from "./util/errors"; import { generate_uuid } from "rhea-promise"; import { ClientEntityContext } from "./clientEntityContext"; +import { getAssociatedReceiverName } from "../src/util/utils"; /** * Describes the client that allows interacting with a Service Bus Subscription. @@ -201,14 +202,10 @@ export class SubscriptionClient implements Client { this._context.isClosed ); - let receiverName; - if (this._context.batchingReceiver) { - receiverName = this._context.batchingReceiver.name; - } else if (this._context.streamingReceiver) { - receiverName = this._context.streamingReceiver.name; - } - - return this._context.managementClient!.peek(maxMessageCount, receiverName); + return this._context.managementClient!.peek( + maxMessageCount, + getAssociatedReceiverName(this._context) + ); } /** @@ -231,18 +228,11 @@ export class SubscriptionClient implements Client { this._context.isClosed ); - let receiverName; - if (this._context.batchingReceiver) { - receiverName = this._context.batchingReceiver.name; - } else if (this._context.streamingReceiver) { - receiverName = this._context.streamingReceiver.name; - } - return this._context.managementClient!.peekBySequenceNumber( fromSequenceNumber, maxMessageCount, undefined, - receiverName + getAssociatedReceiverName(this._context) ); } diff --git a/sdk/servicebus/service-bus/src/util/utils.ts b/sdk/servicebus/service-bus/src/util/utils.ts index 266e57f3ea5f..f2a3a1f1a056 100644 --- a/sdk/servicebus/service-bus/src/util/utils.ts +++ b/sdk/servicebus/service-bus/src/util/utils.ts @@ -5,6 +5,7 @@ import Long from "long"; import * as log from "../log"; import { generate_uuid } from "rhea-promise"; import { isBuffer } from "util"; +import { ClientEntityContext } from "../../src/clientEntityContext"; // This is the only dependency we have on DOM types, so rather than require // the DOM lib we can just shim this in. @@ -160,3 +161,23 @@ export function toBuffer(input: any): Buffer { return result; } +/** + * Helper function to retrieve active receiver name, if it exists. + * + */ +export function getAssociatedReceiverName( + clientEntityContext: ClientEntityContext, + sessionId?: string +): string { + let receiverName: string; + if (sessionId !== undefined) { + if (clientEntityContext.messageSessions[sessionId]) { + receiverName = clientEntityContext.messageSessions[sessionId].name; + } + } else if (clientEntityContext.batchingReceiver) { + receiverName = clientEntityContext.batchingReceiver.name; + } else if (clientEntityContext.streamingReceiver) { + receiverName = clientEntityContext.streamingReceiver.name; + } + return receiverName!; +} From 054ddc68ad7b67f004364b1ee52b7c4de6759da3 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Sun, 12 May 2019 19:08:28 -0700 Subject: [PATCH 5/5] Address comments --- sdk/servicebus/service-bus/src/util/utils.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/service-bus/src/util/utils.ts b/sdk/servicebus/service-bus/src/util/utils.ts index f2a3a1f1a056..edcc172298d4 100644 --- a/sdk/servicebus/service-bus/src/util/utils.ts +++ b/sdk/servicebus/service-bus/src/util/utils.ts @@ -162,15 +162,15 @@ export function toBuffer(input: any): Buffer { } /** + * @internal * Helper function to retrieve active receiver name, if it exists. - * */ export function getAssociatedReceiverName( clientEntityContext: ClientEntityContext, sessionId?: string -): string { - let receiverName: string; - if (sessionId !== undefined) { +): string | undefined { + let receiverName: string | undefined; + if (sessionId != undefined) { if (clientEntityContext.messageSessions[sessionId]) { receiverName = clientEntityContext.messageSessions[sessionId].name; } @@ -179,5 +179,5 @@ export function getAssociatedReceiverName( } else if (clientEntityContext.streamingReceiver) { receiverName = clientEntityContext.streamingReceiver.name; } - return receiverName!; + return receiverName; }