Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/clientEntityContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ export namespace ClientEntityContext {
);

(entityContext as ClientEntityContext).getReceiver = (name: string, sessionId?: string) => {
if (sessionId && entityContext.expiredMessageSessions[sessionId]) {
if (sessionId != undefined && entityContext.expiredMessageSessions[sessionId]) {
const error = new Error(
`The session lock has expired on the session with id ${sessionId}.`
);
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ export class ManagementClient extends LinkEntity {
Buffer.from(fromSequenceNumber.toBytesBE())
);
messageBody[Constants.messageCount] = types.wrap_int(maxMessageCount);
if (sessionId) {
if (sessionId != undefined) {
messageBody[Constants.sessionIdMapKey] = sessionId;
}
const request: AmqpMessage = {
Expand Down
11 changes: 2 additions & 9 deletions sdk/servicebus/service-bus/src/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ export class SessionReceiver {
receiveMode === ReceiveMode.receiveAndDelete ? receiveMode : ReceiveMode.peekLock;
this._sessionOptions = sessionOptions;

if (sessionOptions.sessionId) {
if (sessionOptions.sessionId != undefined) {
sessionOptions.sessionId = String(sessionOptions.sessionId);

// Check if receiver for given session already exists
Expand Down Expand Up @@ -777,15 +777,8 @@ export class SessionReceiver {
.maxSessionAutoRenewLockDurationInSeconds,
receiveMode: this._receiveMode
});
// By this point, we should have a valid sessionId on the messageSession
// If not, the receiver cannot be used, so throw error.
if (!this._messageSession.sessionId) {
const error = new Error("Something went wrong. Cannot lock a session.");
log.error(`[${this._context.namespace.connectionId}] %O`, error);
throw error;
}
this._sessionId = this._messageSession.sessionId;
delete this._context.expiredMessageSessions[this._messageSession.sessionId];
delete this._context.expiredMessageSessions[this._messageSession.sessionId!];
}

private _throwIfAlreadyReceiving(): void {
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export class MessageSession extends LinkEntity {
*/
sessionLockedUntilUtc?: Date;
/**
* @property {string} [sessionId] The sessionId for the message session.
* @property {string} [sessionId] The sessionId for the message session. Empty string is valid sessionId
*/
sessionId?: string;
/**
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/src/util/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ export function getReceiverClosedErrorMsg(
`Please create a new client using an instance of ServiceBusClient.`
);
}
if (!sessionId) {
if (sessionId == undefined) {
return (
`The receiver for "${entityPath}" has been closed and can no longer be used. ` +
`Please create a new receiver using the "createReceiver" function on the ${clientType}.`
Expand All @@ -152,7 +152,7 @@ export function getReceiverClosedErrorMsg(
* @param sessionId If using session receiver, then the id of the session
*/
export function getAlreadyReceivingErrorMsg(entityPath: string, sessionId?: string): string {
if (!sessionId) {
if (sessionId == undefined) {
return `The receiver for "${entityPath}" is already receiving messages.`;
}
return `The receiver for session "${sessionId}" for "${entityPath}" is already receiving messages.`;
Expand Down
101 changes: 98 additions & 3 deletions sdk/servicebus/service-bus/test/sessionsTests.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ function unExpectedErrorHandler(err: Error): void {

const testSessionId2 = "my-session2";

async function beforeEachTest(senderType: TestClientType, sessionType: TestClientType): Promise<void> {
async function beforeEachTest(
senderType: TestClientType,
sessionType: TestClientType
): Promise<void> {
// The tests in this file expect the env variables to contain the connection string and
// the names of empty queue/topic/subscription that are to be tested

Expand Down Expand Up @@ -255,7 +258,7 @@ describe("SessionReceiver with no sessionId", function(): void {
let msgs = await receiver.receiveMessages(2);

should.equal(msgs.length, 1, "Unexpected number of messages received");

should.equal(receiver.sessionId, msgs[0].sessionId, "Unexpected sessionId in receiver");
should.equal(
testMessagesWithDifferentSessionIds.some(
(x) =>
Expand All @@ -275,7 +278,7 @@ describe("SessionReceiver with no sessionId", function(): void {
msgs = await receiver.receiveMessages(2);

should.equal(msgs.length, 1, "Unexpected number of messages received");

should.equal(receiver.sessionId, msgs[0].sessionId, "Unexpected sessionId in receiver");
should.equal(
testMessagesWithDifferentSessionIds.some(
(x) =>
Expand Down Expand Up @@ -335,6 +338,98 @@ describe("SessionReceiver with no sessionId", function(): void {
});
});

describe("SessionReceiver with empty string as sessionId", function(): void {
afterEach(async () => {
await afterEachTest();
});

// Sending messages with different session id, so that we know for sure we pick the right one
// and that Service Bus is not choosing a random one for us
const testMessagesWithDifferentSessionIds: SendableMessageInfo[] = [
{
body: "hello1",
messageId: `test message ${Math.random()}`,
sessionId: TestMessage.sessionId
},
{
body: "hello2",
messageId: `test message ${Math.random()}`,
sessionId: ""
}
];

async function testComplete_batching(): Promise<void> {
const sender = senderClient.createSender();
await sender.send(testMessagesWithDifferentSessionIds[0]);
await sender.send(testMessagesWithDifferentSessionIds[1]);

const receiver = <SessionReceiver>receiverClient.createReceiver(ReceiveMode.peekLock, {
sessionId: ""
});
const msgs = await receiver.receiveMessages(2);

should.equal(msgs.length, 1, "Unexpected number of messages received");
should.equal(receiver.sessionId, "", "Unexpected sessionId in receiver");
should.equal(
testMessagesWithDifferentSessionIds[1].body === msgs[0].body &&
testMessagesWithDifferentSessionIds[1].messageId === msgs[0].messageId &&
testMessagesWithDifferentSessionIds[1].sessionId === msgs[0].sessionId,
true,
"Received Message doesnt match expected test message"
);
await msgs[0].complete();

const peekedMsgsInSession = await receiver.peek();
should.equal(peekedMsgsInSession.length, 0, "Unexpected number of messages peeked");

await receiver.close();
}

it("Partitioned Queue: complete() removes message from random session", async function(): Promise<
void
> {
await beforeEachTest(
TestClientType.PartitionedQueueWithSessions,
TestClientType.PartitionedQueueWithSessions
);
await purge(receiverClient, testSessionId2);
await testComplete_batching();
});

it("Partitioned Subscription: complete() removes message from random session", async function(): Promise<
void
> {
await beforeEachTest(
TestClientType.PartitionedTopicWithSessions,
TestClientType.PartitionedSubscriptionWithSessions
);
await purge(receiverClient, testSessionId2);
await testComplete_batching();
});

it("Unpartitioned Queue: complete() removes message from random session", async function(): Promise<
void
> {
await beforeEachTest(
TestClientType.UnpartitionedQueueWithSessions,
TestClientType.UnpartitionedQueueWithSessions
);
await purge(receiverClient, testSessionId2);
await testComplete_batching();
});

it("Unpartitioned Subscription: complete() removes message from random session", async function(): Promise<
void
> {
await beforeEachTest(
TestClientType.UnpartitionedTopicWithSessions,
TestClientType.UnpartitionedSubscriptionWithSessions
);
await purge(receiverClient, testSessionId2);
await testComplete_batching();
});
});

describe("Session State", function(): void {
afterEach(async () => {
await afterEachTest();
Expand Down