diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 2cd6fd507b27..6b2c4f6578b1 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Pass `skipParsingBodyAsJson` and `skipConvertingDate` options to peek operations. (PR #24950)[https://github.com/Azure/azure-sdk-for-js/pull/24950] + ### Other Changes ## 7.8.0 (2023-02-07) diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index 61818ec791d2..c3445d47775f 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -439,6 +439,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { associatedLinkName: this._getAssociatedReceiverName(), requestName: "receiveDeferredMessages", timeoutInMs: this._retryOptions.timeoutInMs, + skipParsingBodyAsJson: this.skipParsingBodyAsJson, + skipConvertingDate: this.skipConvertingDate, }); return deferredMessages; }; @@ -465,6 +467,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { associatedLinkName: this._getAssociatedReceiverName(), requestName: "peekMessages", timeoutInMs: this._retryOptions?.timeoutInMs, + skipParsingBodyAsJson: this.skipParsingBodyAsJson, + skipConvertingDate: this.skipConvertingDate, }; const peekOperationPromise = async (): Promise => { if (options.fromSequenceNumber !== undefined) { diff --git a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts index f7abf41dc338..b8ad45933ed5 100644 --- a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts @@ -133,6 +133,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver private _context: ConnectionContext, public entityPath: string, public receiveMode: "peekLock" | "receiveAndDelete", + private _skipParsingBodyAsJson: boolean, + private _skipConvertingDate: boolean, private _retryOptions: RetryOptions = {} ) { throwErrorIfConnectionClosed(_context); @@ -322,6 +324,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver associatedLinkName: this._messageSession.name, requestName: "peekMessages", timeoutInMs: this._retryOptions?.timeoutInMs, + skipParsingBodyAsJson: this._skipParsingBodyAsJson, + skipConvertingDate: this._skipConvertingDate, }; const peekOperationPromise = async (): Promise => { if (options.fromSequenceNumber !== undefined) { @@ -385,6 +389,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver associatedLinkName: this._messageSession.name, requestName: "receiveDeferredMessages", timeoutInMs: this._retryOptions.timeoutInMs, + skipParsingBodyAsJson: this._skipParsingBodyAsJson, + skipConvertingDate: this._skipConvertingDate, }); return deferredMessages; }; diff --git a/sdk/servicebus/service-bus/src/serviceBusClient.ts b/sdk/servicebus/service-bus/src/serviceBusClient.ts index c97658709e6e..08b8c9775706 100644 --- a/sdk/servicebus/service-bus/src/serviceBusClient.ts +++ b/sdk/servicebus/service-bus/src/serviceBusClient.ts @@ -372,6 +372,8 @@ export class ServiceBusClient { this._connectionContext, entityPath, receiveMode, + options?.skipParsingBodyAsJson ?? false, + options?.skipConvertingDate ?? false, this._clientOptions.retryOptions ); @@ -460,6 +462,8 @@ export class ServiceBusClient { this._connectionContext, entityPath, receiveMode, + options?.skipParsingBodyAsJson ?? false, + options?.skipConvertingDate ?? false, this._clientOptions.retryOptions ); diff --git a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts index 89ec75884513..55441c73c344 100644 --- a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts @@ -15,6 +15,7 @@ import { ServiceBusError, ServiceBusSessionReceiver, ServiceBusSender, + ServiceBusReceiverOptions, } from "../../src"; import { DispositionType, ServiceBusReceivedMessage } from "../../src/serviceBusMessage"; import { getReceiverClosedErrorMsg, getSenderClosedErrorMsg } from "../../src/util/errors"; @@ -242,6 +243,67 @@ describe("ServiceBusClient live tests", () => { } } ); + + it( + noSessionTestClientType + ": respect receiver options when peeking", + async function (): Promise { + // Create a test client to get the entity types + const sbClient = createServiceBusClientForTests(); + const entities = await sbClient.test.createTestEntities(noSessionTestClientType); + await sbClient.close(); + + // Create a sb client, sender, receiver with relaxed endpoint + const sbClientWithRelaxedEndPoint = new ServiceBusClient( + getEnvVars().SERVICEBUS_CONNECTION_STRING.replace("sb://", "CheeseBurger://") + ); + const sender = sbClientWithRelaxedEndPoint.createSender(entities.queue || entities.topic!); + const receiverOptions: ServiceBusReceiverOptions = { + skipParsingBodyAsJson: true, + skipConvertingDate: true, + }; + const receiver = entities.queue + ? sbClientWithRelaxedEndPoint.createReceiver(entities.queue, receiverOptions) + : sbClientWithRelaxedEndPoint.createReceiver( + entities.topic!, + entities.subscription!, + receiverOptions + ); + try { + // Send and receive messages + const testMessages = [ + { + // body: Long.fromString("12345678901234567890"), + body: { id: 123456789 }, + applicationProperties: { createdOn: new Date() }, + }, + ]; + await sender.sendMessages(testMessages); + + const peekedMsgs = await receiver.peekMessages(2, { + fromSequenceNumber: Long.ZERO, + }); + should.equal(peekedMsgs.length, 1, "expecting one peeked message 1"); + peekedMsgs[0].body.should.not.deep.equal({ id: 123456789 }); + peekedMsgs[0].body.constructor.name.should.equal("Buffer"); + if (!peekedMsgs[0].applicationProperties) { + throw new Error("Test failed. expect valid applicationProperties on peeked message"); + } + if (!peekedMsgs[0].applicationProperties["createdOn"]) { + throw new Error("Test failed. expect valid createdOn property"); + } + peekedMsgs[0].applicationProperties["createdOn"].constructor.name.should.equal("Date"); + + await receiver.receiveMessages(2); + await testPeekMsgsLength(receiver, 0); + } finally { + // Clean up + await sbClient.test.after(); + await sender.close(); + await receiver.close(); + await sbClientWithRelaxedEndPoint.close(); + } + } + ); }); describe("Errors with non existing Namespace", function (): void { diff --git a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts index a229ba1da164..cb1a94ae74d5 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts @@ -400,7 +400,9 @@ describe("AbortSignal", () => { messageSession, connectionContext, "entityPath", - "peekLock" + "peekLock", + false, + false ); try { diff --git a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts index 7b8b12af3623..f41679058252 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts @@ -279,6 +279,8 @@ describe("Receiver unit tests", () => { connectionContext, "entity path", "peekLock", + false, + false, undefined );