Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Pass `skipParsingBodyAsJson` and `skipConvertingDate` options to peek operations. (PR #24950)[https://github.com/Azure/azure-sdk-for-js/pull/24950]

### Other Changes

## 7.8.0 (2023-02-07)
Expand Down
4 changes: 4 additions & 0 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
associatedLinkName: this._getAssociatedReceiverName(),
requestName: "receiveDeferredMessages",
timeoutInMs: this._retryOptions.timeoutInMs,
skipParsingBodyAsJson: this.skipParsingBodyAsJson,
skipConvertingDate: this.skipConvertingDate,
});
return deferredMessages;
};
Expand All @@ -465,6 +467,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
associatedLinkName: this._getAssociatedReceiverName(),
requestName: "peekMessages",
timeoutInMs: this._retryOptions?.timeoutInMs,
skipParsingBodyAsJson: this.skipParsingBodyAsJson,
skipConvertingDate: this.skipConvertingDate,
};
const peekOperationPromise = async (): Promise<ServiceBusReceivedMessage[]> => {
if (options.fromSequenceNumber !== undefined) {
Expand Down
6 changes: 6 additions & 0 deletions sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
private _context: ConnectionContext,
public entityPath: string,
public receiveMode: "peekLock" | "receiveAndDelete",
private _skipParsingBodyAsJson: boolean,
private _skipConvertingDate: boolean,
private _retryOptions: RetryOptions = {}
) {
throwErrorIfConnectionClosed(_context);
Expand Down Expand Up @@ -322,6 +324,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
associatedLinkName: this._messageSession.name,
requestName: "peekMessages",
timeoutInMs: this._retryOptions?.timeoutInMs,
skipParsingBodyAsJson: this._skipParsingBodyAsJson,
skipConvertingDate: this._skipConvertingDate,
};
const peekOperationPromise = async (): Promise<ServiceBusReceivedMessage[]> => {
if (options.fromSequenceNumber !== undefined) {
Expand Down Expand Up @@ -385,6 +389,8 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
associatedLinkName: this._messageSession.name,
requestName: "receiveDeferredMessages",
timeoutInMs: this._retryOptions.timeoutInMs,
skipParsingBodyAsJson: this._skipParsingBodyAsJson,
skipConvertingDate: this._skipConvertingDate,
});
return deferredMessages;
};
Expand Down
4 changes: 4 additions & 0 deletions sdk/servicebus/service-bus/src/serviceBusClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ export class ServiceBusClient {
this._connectionContext,
entityPath,
receiveMode,
options?.skipParsingBodyAsJson ?? false,
options?.skipConvertingDate ?? false,
this._clientOptions.retryOptions
);

Expand Down Expand Up @@ -460,6 +462,8 @@ export class ServiceBusClient {
this._connectionContext,
entityPath,
receiveMode,
options?.skipParsingBodyAsJson ?? false,
options?.skipConvertingDate ?? false,
this._clientOptions.retryOptions
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
ServiceBusError,
ServiceBusSessionReceiver,
ServiceBusSender,
ServiceBusReceiverOptions,
} from "../../src";
import { DispositionType, ServiceBusReceivedMessage } from "../../src/serviceBusMessage";
import { getReceiverClosedErrorMsg, getSenderClosedErrorMsg } from "../../src/util/errors";
Expand Down Expand Up @@ -242,6 +243,67 @@ describe("ServiceBusClient live tests", () => {
}
}
);

it(
noSessionTestClientType + ": respect receiver options when peeking",
async function (): Promise<void> {
// Create a test client to get the entity types
const sbClient = createServiceBusClientForTests();
const entities = await sbClient.test.createTestEntities(noSessionTestClientType);
await sbClient.close();

// Create a sb client, sender, receiver with relaxed endpoint
const sbClientWithRelaxedEndPoint = new ServiceBusClient(
getEnvVars().SERVICEBUS_CONNECTION_STRING.replace("sb://", "CheeseBurger://")
);
const sender = sbClientWithRelaxedEndPoint.createSender(entities.queue || entities.topic!);
const receiverOptions: ServiceBusReceiverOptions = {
skipParsingBodyAsJson: true,
skipConvertingDate: true,
};
const receiver = entities.queue
? sbClientWithRelaxedEndPoint.createReceiver(entities.queue, receiverOptions)
: sbClientWithRelaxedEndPoint.createReceiver(
entities.topic!,
entities.subscription!,
receiverOptions
);
try {
// Send and receive messages
const testMessages = [
{
// body: Long.fromString("12345678901234567890"),
body: { id: 123456789 },
applicationProperties: { createdOn: new Date() },
},
];
await sender.sendMessages(testMessages);

const peekedMsgs = await receiver.peekMessages(2, {
fromSequenceNumber: Long.ZERO,
});
should.equal(peekedMsgs.length, 1, "expecting one peeked message 1");
peekedMsgs[0].body.should.not.deep.equal({ id: 123456789 });
peekedMsgs[0].body.constructor.name.should.equal("Buffer");
if (!peekedMsgs[0].applicationProperties) {
throw new Error("Test failed. expect valid applicationProperties on peeked message");
}
if (!peekedMsgs[0].applicationProperties["createdOn"]) {
throw new Error("Test failed. expect valid createdOn property");
}
peekedMsgs[0].applicationProperties["createdOn"].constructor.name.should.equal("Date");

await receiver.receiveMessages(2);
await testPeekMsgsLength(receiver, 0);
} finally {
// Clean up
await sbClient.test.after();
await sender.close();
await receiver.close();
await sbClientWithRelaxedEndPoint.close();
}
}
);
});

describe("Errors with non existing Namespace", function (): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,9 @@ describe("AbortSignal", () => {
messageSession,
connectionContext,
"entityPath",
"peekLock"
"peekLock",
false,
false
);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ describe("Receiver unit tests", () => {
connectionContext,
"entity path",
"peekLock",
false,
false,
undefined
);

Expand Down