Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ export class ManagementClient extends LinkEntity {
async updateDispositionStatus(
lockToken: string,
dispositionStatus: DispositionStatus,
associatedLinkName?: string,
options?: DispositionStatusOptions
): Promise<void> {
throwErrorIfConnectionClosed(this._context.namespace);
Expand Down Expand Up @@ -757,6 +758,9 @@ export class ManagementClient extends LinkEntity {
operation: Constants.operations.updateDisposition
}
};
if (associatedLinkName) {
request.application_properties![Constants.associatedLinkName] = associatedLinkName;
}
request.application_properties![Constants.trackingId] = generate_uuid();
log.mgmt(
"[%s] Update disposition status request body: %O.",
Expand Down
21 changes: 6 additions & 15 deletions sdk/servicebus/service-bus/src/queueClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
} from "./util/errors";
import { generate_uuid } from "rhea-promise";
import { ClientEntityContext } from "./clientEntityContext";
import { getAssociatedReceiverName } from "../src/util/utils";

/**
* Describes the client that allows interacting with a Service Bus Queue.
Expand Down Expand Up @@ -201,13 +202,10 @@ export class QueueClient implements Client {
this._context.isClosed
);

let receiverName;
if (this._context.batchingReceiver) {
receiverName = this._context.batchingReceiver.name;
} else if (this._context.streamingReceiver) {
receiverName = this._context.streamingReceiver.name;
}
return this._context.managementClient!.peek(maxMessageCount, receiverName);
return this._context.managementClient!.peek(
maxMessageCount,
getAssociatedReceiverName(this._context)
);
}

/**
Expand All @@ -230,18 +228,11 @@ export class QueueClient implements Client {
this._context.isClosed
);

let receiverName;
if (this._context.batchingReceiver) {
receiverName = this._context.batchingReceiver.name;
} else if (this._context.streamingReceiver) {
receiverName = this._context.streamingReceiver.name;
}

return this._context.managementClient!.peekBySequenceNumber(
fromSequenceNumber,
maxMessageCount,
undefined,
receiverName
getAssociatedReceiverName(this._context)
);
}

Expand Down
32 changes: 8 additions & 24 deletions sdk/servicebus/service-bus/src/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import {
getErrorMessageNotSupportedInReceiveAndDeleteMode
} from "./util/errors";

import { getAssociatedReceiverName } from "../src/util/utils";

/**
* The Receiver class can be used to receive messages in a batch or by registering handlers.
* Use the `createReceiver` function on the QueueClient or SubscriptionClient to instantiate a Receiver.
Expand Down Expand Up @@ -186,14 +188,10 @@ export class Receiver {
? String(lockTokenOrMessage.lockToken)
: String(lockTokenOrMessage);

let receiverName;
if (this._context.batchingReceiver) {
receiverName = this._context.batchingReceiver.name;
} else if (this._context.streamingReceiver) {
receiverName = this._context.streamingReceiver.name;
}

const lockedUntilUtc = await this._context.managementClient!.renewLock(lockToken, receiverName);
const lockedUntilUtc = await this._context.managementClient!.renewLock(
lockToken,
getAssociatedReceiverName(this._context)
);

return lockedUntilUtc;
}
Expand All @@ -219,17 +217,10 @@ export class Receiver {
sequenceNumber
);

let receiverName;
if (this._context.batchingReceiver) {
receiverName = this._context.batchingReceiver.name;
} else if (this._context.streamingReceiver) {
receiverName = this._context.streamingReceiver.name;
}

const messages = await this._context.managementClient!.receiveDeferredMessages(
[sequenceNumber],
this._receiveMode,
receiverName
getAssociatedReceiverName(this._context)
);
return messages[0];
}
Expand Down Expand Up @@ -258,17 +249,10 @@ export class Receiver {
sequenceNumbers
);

let receiverName;
if (this._context.batchingReceiver) {
receiverName = this._context.batchingReceiver.name;
} else if (this._context.streamingReceiver) {
receiverName = this._context.streamingReceiver.name;
}

return this._context.managementClient!.receiveDeferredMessages(
sequenceNumbers,
this._receiveMode,
receiverName
getAssociatedReceiverName(this._context)
);
}

Expand Down
6 changes: 5 additions & 1 deletion sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
import { Constants, AmqpMessage } from "@azure/amqp-common";
import * as log from "./log";
import { ClientEntityContext } from "./clientEntityContext";
import { reorderLockToken } from "../src/util/utils";
import { reorderLockToken, getAssociatedReceiverName } from "../src/util/utils";
import { MessageReceiver } from "../src/core/messageReceiver";
import { MessageSession } from "../src/session/messageSession";
import { getErrorMessageNotSupportedInReceiveAndDeleteMode } from "./util/errors";
Expand Down Expand Up @@ -820,6 +820,7 @@ export class ServiceBusMessage implements ReceivedMessage {
await this._context.managementClient!.updateDispositionStatus(
this.lockToken!,
DispositionStatus.completed,
getAssociatedReceiverName(this._context, this.sessionId),
{
sessionId: this.sessionId
}
Expand Down Expand Up @@ -852,6 +853,7 @@ export class ServiceBusMessage implements ReceivedMessage {
await this._context.managementClient!.updateDispositionStatus(
this.lockToken!,
DispositionStatus.abandoned,
getAssociatedReceiverName(this._context, this.sessionId),
{ propertiesToModify: propertiesToModify, sessionId: this.sessionId }
);

Expand Down Expand Up @@ -884,6 +886,7 @@ export class ServiceBusMessage implements ReceivedMessage {
await this._context.managementClient!.updateDispositionStatus(
this.lockToken!,
DispositionStatus.defered,
getAssociatedReceiverName(this._context, this.sessionId),
{ propertiesToModify: propertiesToModify, sessionId: this.sessionId }
);

Expand Down Expand Up @@ -926,6 +929,7 @@ export class ServiceBusMessage implements ReceivedMessage {
await this._context.managementClient!.updateDispositionStatus(
this.lockToken!,
DispositionStatus.suspended,
getAssociatedReceiverName(this._context, this.sessionId),
{
deadLetterReason: error.condition,
deadLetterDescription: error.description,
Expand Down
22 changes: 6 additions & 16 deletions sdk/servicebus/service-bus/src/subscriptionClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
} from "./util/errors";
import { generate_uuid } from "rhea-promise";
import { ClientEntityContext } from "./clientEntityContext";
import { getAssociatedReceiverName } from "../src/util/utils";

/**
* Describes the client that allows interacting with a Service Bus Subscription.
Expand Down Expand Up @@ -201,14 +202,10 @@ export class SubscriptionClient implements Client {
this._context.isClosed
);

let receiverName;
if (this._context.batchingReceiver) {
receiverName = this._context.batchingReceiver.name;
} else if (this._context.streamingReceiver) {
receiverName = this._context.streamingReceiver.name;
}

return this._context.managementClient!.peek(maxMessageCount, receiverName);
return this._context.managementClient!.peek(
maxMessageCount,
getAssociatedReceiverName(this._context)
);
}

/**
Expand All @@ -231,18 +228,11 @@ export class SubscriptionClient implements Client {
this._context.isClosed
);

let receiverName;
if (this._context.batchingReceiver) {
receiverName = this._context.batchingReceiver.name;
} else if (this._context.streamingReceiver) {
receiverName = this._context.streamingReceiver.name;
}

return this._context.managementClient!.peekBySequenceNumber(
fromSequenceNumber,
maxMessageCount,
undefined,
receiverName
getAssociatedReceiverName(this._context)
);
}

Expand Down
21 changes: 21 additions & 0 deletions sdk/servicebus/service-bus/src/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import Long from "long";
import * as log from "../log";
import { generate_uuid } from "rhea-promise";
import { isBuffer } from "util";
import { ClientEntityContext } from "../../src/clientEntityContext";

// This is the only dependency we have on DOM types, so rather than require
// the DOM lib we can just shim this in.
Expand Down Expand Up @@ -160,3 +161,23 @@ export function toBuffer(input: any): Buffer {
return result;
}

/**
* Helper function to retrieve active receiver name, if it exists.
*

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the tag @internal here otherwise, the function gets picked up by our doc generation process. For example: https://docs.microsoft.com/en-us/javascript/api/%40azure/service-bus/index?view=azure-node-preview#functions

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't all of the methods in utils.ts need to be marked as internal?

*/
export function getAssociatedReceiverName(
clientEntityContext: ClientEntityContext,
sessionId?: string
): string {
let receiverName: string;
if (sessionId !== undefined) {
if (clientEntityContext.messageSessions[sessionId]) {
receiverName = clientEntityContext.messageSessions[sessionId].name;
}
} else if (clientEntityContext.batchingReceiver) {
receiverName = clientEntityContext.batchingReceiver.name;
} else if (clientEntityContext.streamingReceiver) {
receiverName = clientEntityContext.streamingReceiver.name;
}
return receiverName!;

@ramya-rao-a ramya-rao-a May 12, 2019

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the return value can be undefined as well, please update the return type to be string | undefined and remove the ! here

We use ! on a variable whose type allows undefined but we are sure that the value is not undefined. It is a way to tell typescript compiler to stop complaining as we know the variable cannot be undefined, but couldn't change the type, maybe because it was declared in a code section we don't own.

In this case, we define the variable receiverName and we know it can have undefined, so ! is misused here

}