diff --git a/sdk/servicebus/service-bus/src/clientEntityContext.ts b/sdk/servicebus/service-bus/src/clientEntityContext.ts index e0704b1327cf..baf4b1321218 100644 --- a/sdk/servicebus/service-bus/src/clientEntityContext.ts +++ b/sdk/servicebus/service-bus/src/clientEntityContext.ts @@ -147,7 +147,7 @@ export namespace ClientEntityContext { ); (entityContext as ClientEntityContext).getReceiver = (name: string, sessionId?: string) => { - if (sessionId && entityContext.expiredMessageSessions[sessionId]) { + if (sessionId != undefined && entityContext.expiredMessageSessions[sessionId]) { const error = new Error( `The session lock has expired on the session with id ${sessionId}.` ); diff --git a/sdk/servicebus/service-bus/src/core/managementClient.ts b/sdk/servicebus/service-bus/src/core/managementClient.ts index c5d13971ac4a..cf7e255ec580 100644 --- a/sdk/servicebus/service-bus/src/core/managementClient.ts +++ b/sdk/servicebus/service-bus/src/core/managementClient.ts @@ -314,7 +314,7 @@ export class ManagementClient extends LinkEntity { Buffer.from(fromSequenceNumber.toBytesBE()) ); messageBody[Constants.messageCount] = types.wrap_int(maxMessageCount); - if (sessionId) { + if (sessionId != undefined) { messageBody[Constants.sessionIdMapKey] = sessionId; } const request: AmqpMessage = { diff --git a/sdk/servicebus/service-bus/src/receiver.ts b/sdk/servicebus/service-bus/src/receiver.ts index f0adc82524ef..0736f832a903 100644 --- a/sdk/servicebus/service-bus/src/receiver.ts +++ b/sdk/servicebus/service-bus/src/receiver.ts @@ -441,7 +441,7 @@ export class SessionReceiver { receiveMode === ReceiveMode.receiveAndDelete ? receiveMode : ReceiveMode.peekLock; this._sessionOptions = sessionOptions; - if (sessionOptions.sessionId) { + if (sessionOptions.sessionId != undefined) { sessionOptions.sessionId = String(sessionOptions.sessionId); // Check if receiver for given session already exists @@ -777,15 +777,8 @@ export class SessionReceiver { .maxSessionAutoRenewLockDurationInSeconds, receiveMode: this._receiveMode }); - // By this point, we should have a valid sessionId on the messageSession - // If not, the receiver cannot be used, so throw error. - if (!this._messageSession.sessionId) { - const error = new Error("Something went wrong. Cannot lock a session."); - log.error(`[${this._context.namespace.connectionId}] %O`, error); - throw error; - } this._sessionId = this._messageSession.sessionId; - delete this._context.expiredMessageSessions[this._messageSession.sessionId]; + delete this._context.expiredMessageSessions[this._messageSession.sessionId!]; } private _throwIfAlreadyReceiving(): void { diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index 0efa34f1e2a3..ff62933011f9 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -136,7 +136,7 @@ export class MessageSession extends LinkEntity { */ sessionLockedUntilUtc?: Date; /** - * @property {string} [sessionId] The sessionId for the message session. + * @property {string} [sessionId] The sessionId for the message session. Empty string is valid sessionId */ sessionId?: string; /** diff --git a/sdk/servicebus/service-bus/src/util/errors.ts b/sdk/servicebus/service-bus/src/util/errors.ts index bd323b9a6761..056eb1dd59ee 100644 --- a/sdk/servicebus/service-bus/src/util/errors.ts +++ b/sdk/servicebus/service-bus/src/util/errors.ts @@ -134,7 +134,7 @@ export function getReceiverClosedErrorMsg( `Please create a new client using an instance of ServiceBusClient.` ); } - if (!sessionId) { + if (sessionId == undefined) { return ( `The receiver for "${entityPath}" has been closed and can no longer be used. ` + `Please create a new receiver using the "createReceiver" function on the ${clientType}.` @@ -152,7 +152,7 @@ export function getReceiverClosedErrorMsg( * @param sessionId If using session receiver, then the id of the session */ export function getAlreadyReceivingErrorMsg(entityPath: string, sessionId?: string): string { - if (!sessionId) { + if (sessionId == undefined) { return `The receiver for "${entityPath}" is already receiving messages.`; } return `The receiver for session "${sessionId}" for "${entityPath}" is already receiving messages.`; diff --git a/sdk/servicebus/service-bus/test/sessionsTests.spec.ts b/sdk/servicebus/service-bus/test/sessionsTests.spec.ts index 116f1c6cd777..6c5b2c99577e 100644 --- a/sdk/servicebus/service-bus/test/sessionsTests.spec.ts +++ b/sdk/servicebus/service-bus/test/sessionsTests.spec.ts @@ -54,7 +54,10 @@ function unExpectedErrorHandler(err: Error): void { const testSessionId2 = "my-session2"; -async function beforeEachTest(senderType: TestClientType, sessionType: TestClientType): Promise { +async function beforeEachTest( + senderType: TestClientType, + sessionType: TestClientType +): Promise { // The tests in this file expect the env variables to contain the connection string and // the names of empty queue/topic/subscription that are to be tested @@ -255,7 +258,7 @@ describe("SessionReceiver with no sessionId", function(): void { let msgs = await receiver.receiveMessages(2); should.equal(msgs.length, 1, "Unexpected number of messages received"); - + should.equal(receiver.sessionId, msgs[0].sessionId, "Unexpected sessionId in receiver"); should.equal( testMessagesWithDifferentSessionIds.some( (x) => @@ -275,7 +278,7 @@ describe("SessionReceiver with no sessionId", function(): void { msgs = await receiver.receiveMessages(2); should.equal(msgs.length, 1, "Unexpected number of messages received"); - + should.equal(receiver.sessionId, msgs[0].sessionId, "Unexpected sessionId in receiver"); should.equal( testMessagesWithDifferentSessionIds.some( (x) => @@ -335,6 +338,98 @@ describe("SessionReceiver with no sessionId", function(): void { }); }); +describe("SessionReceiver with empty string as sessionId", function(): void { + afterEach(async () => { + await afterEachTest(); + }); + + // Sending messages with different session id, so that we know for sure we pick the right one + // and that Service Bus is not choosing a random one for us + const testMessagesWithDifferentSessionIds: SendableMessageInfo[] = [ + { + body: "hello1", + messageId: `test message ${Math.random()}`, + sessionId: TestMessage.sessionId + }, + { + body: "hello2", + messageId: `test message ${Math.random()}`, + sessionId: "" + } + ]; + + async function testComplete_batching(): Promise { + const sender = senderClient.createSender(); + await sender.send(testMessagesWithDifferentSessionIds[0]); + await sender.send(testMessagesWithDifferentSessionIds[1]); + + const receiver = receiverClient.createReceiver(ReceiveMode.peekLock, { + sessionId: "" + }); + const msgs = await receiver.receiveMessages(2); + + should.equal(msgs.length, 1, "Unexpected number of messages received"); + should.equal(receiver.sessionId, "", "Unexpected sessionId in receiver"); + should.equal( + testMessagesWithDifferentSessionIds[1].body === msgs[0].body && + testMessagesWithDifferentSessionIds[1].messageId === msgs[0].messageId && + testMessagesWithDifferentSessionIds[1].sessionId === msgs[0].sessionId, + true, + "Received Message doesnt match expected test message" + ); + await msgs[0].complete(); + + const peekedMsgsInSession = await receiver.peek(); + should.equal(peekedMsgsInSession.length, 0, "Unexpected number of messages peeked"); + + await receiver.close(); + } + + it("Partitioned Queue: complete() removes message from random session", async function(): Promise< + void + > { + await beforeEachTest( + TestClientType.PartitionedQueueWithSessions, + TestClientType.PartitionedQueueWithSessions + ); + await purge(receiverClient, testSessionId2); + await testComplete_batching(); + }); + + it("Partitioned Subscription: complete() removes message from random session", async function(): Promise< + void + > { + await beforeEachTest( + TestClientType.PartitionedTopicWithSessions, + TestClientType.PartitionedSubscriptionWithSessions + ); + await purge(receiverClient, testSessionId2); + await testComplete_batching(); + }); + + it("Unpartitioned Queue: complete() removes message from random session", async function(): Promise< + void + > { + await beforeEachTest( + TestClientType.UnpartitionedQueueWithSessions, + TestClientType.UnpartitionedQueueWithSessions + ); + await purge(receiverClient, testSessionId2); + await testComplete_batching(); + }); + + it("Unpartitioned Subscription: complete() removes message from random session", async function(): Promise< + void + > { + await beforeEachTest( + TestClientType.UnpartitionedTopicWithSessions, + TestClientType.UnpartitionedSubscriptionWithSessions + ); + await purge(receiverClient, testSessionId2); + await testComplete_batching(); + }); +}); + describe("Session State", function(): void { afterEach(async () => { await afterEachTest();