From 03910f4117e41bc5123311922cc410d6f9008c13 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Wed, 22 Feb 2023 02:39:55 +0000 Subject: [PATCH 1/2] [ServiceBus] pass `skipParsingBodyAsJson` and `skipConvertingDate` to peek operations Currently the two options only affect `receiveMessages()`. This PR pass them to peek operations as well so the received messages are consistent. --- .../service-bus/src/receivers/receiver.ts | 4 ++ .../src/receivers/sessionReceiver.ts | 6 +++ .../service-bus/src/serviceBusClient.ts | 4 ++ .../test/internal/serviceBusClient.spec.ts | 47 +++++++++++++++++++ .../test/internal/unit/abortSignal.spec.ts | 4 +- .../test/internal/unit/receiver.spec.ts | 2 + 6 files changed, 66 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index 61818ec791d2..c3445d47775f 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -439,6 +439,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { associatedLinkName: this._getAssociatedReceiverName(), requestName: "receiveDeferredMessages", timeoutInMs: this._retryOptions.timeoutInMs, + skipParsingBodyAsJson: this.skipParsingBodyAsJson, + skipConvertingDate: this.skipConvertingDate, }); return deferredMessages; }; @@ -465,6 +467,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { associatedLinkName: this._getAssociatedReceiverName(), requestName: "peekMessages", timeoutInMs: this._retryOptions?.timeoutInMs, + skipParsingBodyAsJson: this.skipParsingBodyAsJson, + skipConvertingDate: this.skipConvertingDate, }; const peekOperationPromise = async (): Promise => { if (options.fromSequenceNumber !== undefined) { diff --git a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts index f7abf41dc338..b8ad45933ed5 100644 --- a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts @@ -133,6 +133,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver private _context: ConnectionContext, public entityPath: string, public receiveMode: "peekLock" | "receiveAndDelete", + private _skipParsingBodyAsJson: boolean, + private _skipConvertingDate: boolean, private _retryOptions: RetryOptions = {} ) { throwErrorIfConnectionClosed(_context); @@ -322,6 +324,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver associatedLinkName: this._messageSession.name, requestName: "peekMessages", timeoutInMs: this._retryOptions?.timeoutInMs, + skipParsingBodyAsJson: this._skipParsingBodyAsJson, + skipConvertingDate: this._skipConvertingDate, }; const peekOperationPromise = async (): Promise => { if (options.fromSequenceNumber !== undefined) { @@ -385,6 +389,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver associatedLinkName: this._messageSession.name, requestName: "receiveDeferredMessages", timeoutInMs: this._retryOptions.timeoutInMs, + skipParsingBodyAsJson: this._skipParsingBodyAsJson, + skipConvertingDate: this._skipConvertingDate, }); return deferredMessages; }; diff --git a/sdk/servicebus/service-bus/src/serviceBusClient.ts b/sdk/servicebus/service-bus/src/serviceBusClient.ts index c97658709e6e..08b8c9775706 100644 --- a/sdk/servicebus/service-bus/src/serviceBusClient.ts +++ b/sdk/servicebus/service-bus/src/serviceBusClient.ts @@ -372,6 +372,8 @@ export class ServiceBusClient { this._connectionContext, entityPath, receiveMode, + options?.skipParsingBodyAsJson ?? false, + options?.skipConvertingDate ?? false, this._clientOptions.retryOptions ); @@ -460,6 +462,8 @@ export class ServiceBusClient { this._connectionContext, entityPath, receiveMode, + options?.skipParsingBodyAsJson ?? false, + options?.skipConvertingDate ?? false, this._clientOptions.retryOptions ); diff --git a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts index 89ec75884513..5127b3adbf88 100644 --- a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts @@ -15,6 +15,7 @@ import { ServiceBusError, ServiceBusSessionReceiver, ServiceBusSender, + ServiceBusReceiverOptions, } from "../../src"; import { DispositionType, ServiceBusReceivedMessage } from "../../src/serviceBusMessage"; import { getReceiverClosedErrorMsg, getSenderClosedErrorMsg } from "../../src/util/errors"; @@ -242,6 +243,52 @@ describe("ServiceBusClient live tests", () => { } } ); + + it( + noSessionTestClientType + ": respect receiver options when peeking", + async function (): Promise { + // Create a test client to get the entity types + const sbClient = createServiceBusClientForTests(); + const entities = await sbClient.test.createTestEntities(noSessionTestClientType); + await sbClient.close(); + + // Create a sb client, sender, receiver with relaxed endpoint + const sbClientWithRelaxedEndPoint = new ServiceBusClient( + getEnvVars().SERVICEBUS_CONNECTION_STRING.replace("sb://", "CheeseBurger://") + ); + const sender = sbClientWithRelaxedEndPoint.createSender(entities.queue || entities.topic!); + const receiverOptions: ServiceBusReceiverOptions = { + skipParsingBodyAsJson: true + }; + const receiver = entities.queue + ? sbClientWithRelaxedEndPoint.createReceiver(entities.queue, receiverOptions) + : sbClientWithRelaxedEndPoint.createReceiver(entities.topic!, entities.subscription!, receiverOptions); + try { + // Send and receive messages + const testMessages = [ { + // body: Long.fromString("12345678901234567890"), + body: { id: 123456789 }, + }]; + await sender.sendMessages(testMessages); + + let peekedMsgs = await receiver.peekMessages(2, { + fromSequenceNumber: Long.ZERO, + }); + should.equal(peekedMsgs.length, 1, "expecting one peeked message 1"); + peekedMsgs[0].body.should.not.deep.equal({ id: 123456789 }); + peekedMsgs[0].body.constructor.name.should.equal("Buffer"); + + await receiver.receiveMessages(2); + await testPeekMsgsLength(receiver, 0); + } finally { + // Clean up + await sbClient.test.after(); + await sender.close(); + await receiver.close(); + await sbClientWithRelaxedEndPoint.close(); + } + } + ); }); describe("Errors with non existing Namespace", function (): void { diff --git a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts index a229ba1da164..7b60c9ea3a1d 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts @@ -400,7 +400,9 @@ describe("AbortSignal", () => { messageSession, connectionContext, "entityPath", - "peekLock" + "peekLock", + false, + false, ); try { diff --git a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts index 7b8b12af3623..f41679058252 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts @@ -279,6 +279,8 @@ describe("Receiver unit tests", () => { connectionContext, "entity path", "peekLock", + false, + false, undefined ); From 6be302c14594fe127a9f0c430e1a5919604d0812 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Wed, 22 Feb 2023 18:19:52 +0000 Subject: [PATCH 2/2] changelog entry and testing skipConvertingDate: true --- sdk/servicebus/service-bus/CHANGELOG.md | 2 ++ .../test/internal/serviceBusClient.spec.ts | 29 ++++++++++++++----- .../test/internal/unit/abortSignal.spec.ts | 2 +- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 2cd6fd507b27..6b2c4f6578b1 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Pass `skipParsingBodyAsJson` and `skipConvertingDate` options to peek operations. (PR #24950)[https://github.com/Azure/azure-sdk-for-js/pull/24950] + ### Other Changes ## 7.8.0 (2023-02-07) diff --git a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts index 5127b3adbf88..55441c73c344 100644 --- a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts @@ -258,25 +258,40 @@ describe("ServiceBusClient live tests", () => { ); const sender = sbClientWithRelaxedEndPoint.createSender(entities.queue || entities.topic!); const receiverOptions: ServiceBusReceiverOptions = { - skipParsingBodyAsJson: true + skipParsingBodyAsJson: true, + skipConvertingDate: true, }; const receiver = entities.queue ? sbClientWithRelaxedEndPoint.createReceiver(entities.queue, receiverOptions) - : sbClientWithRelaxedEndPoint.createReceiver(entities.topic!, entities.subscription!, receiverOptions); + : sbClientWithRelaxedEndPoint.createReceiver( + entities.topic!, + entities.subscription!, + receiverOptions + ); try { // Send and receive messages - const testMessages = [ { - // body: Long.fromString("12345678901234567890"), - body: { id: 123456789 }, - }]; + const testMessages = [ + { + // body: Long.fromString("12345678901234567890"), + body: { id: 123456789 }, + applicationProperties: { createdOn: new Date() }, + }, + ]; await sender.sendMessages(testMessages); - let peekedMsgs = await receiver.peekMessages(2, { + const peekedMsgs = await receiver.peekMessages(2, { fromSequenceNumber: Long.ZERO, }); should.equal(peekedMsgs.length, 1, "expecting one peeked message 1"); peekedMsgs[0].body.should.not.deep.equal({ id: 123456789 }); peekedMsgs[0].body.constructor.name.should.equal("Buffer"); + if (!peekedMsgs[0].applicationProperties) { + throw new Error("Test failed. expect valid applicationProperties on peeked message"); + } + if (!peekedMsgs[0].applicationProperties["createdOn"]) { + throw new Error("Test failed. expect valid createdOn property"); + } + peekedMsgs[0].applicationProperties["createdOn"].constructor.name.should.equal("Date"); await receiver.receiveMessages(2); await testPeekMsgsLength(receiver, 0); diff --git a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts index 7b60c9ea3a1d..cb1a94ae74d5 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts @@ -402,7 +402,7 @@ describe("AbortSignal", () => { "entityPath", "peekLock", false, - false, + false ); try {