Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0063e7d
Streaming Receiver - No hardcoded delays, reuse DelayStreaming method
HarshaNalluru Feb 13, 2019
b6bbc4a
remove .only
HarshaNalluru Feb 13, 2019
b098cc8
Comments for Global variables
HarshaNalluru Feb 13, 2019
debae78
optional params, boolcheck -> predicate
HarshaNalluru Feb 14, 2019
508df48
Date.now() & updated other tests
HarshaNalluru Feb 14, 2019
b13f8bc
Move DelayStreaming to testUtils
HarshaNalluru Feb 14, 2019
8470e8a
receiveAndDeleteMode.spec.ts
HarshaNalluru Feb 14, 2019
868a2d9
sessionsTests.spec.ts
HarshaNalluru Feb 14, 2019
56d2898
Updated the tests which use streamingReceiver
HarshaNalluru Feb 14, 2019
b29ac4b
remove .only
HarshaNalluru Feb 14, 2019
7665e3b
Merge branch 'master' into DelaysSR
HarshaNalluru Feb 14, 2019
71eeb3c
Update message abandon in streaming receiver sessions
HarshaNalluru Feb 14, 2019
d4e3866
Merge branch 'DelaysSR' of https://github.com/HarshaNalluru/azure-sdk…
HarshaNalluru Feb 14, 2019
3369420
Update error message
HarshaNalluru Feb 14, 2019
9438a28
resolve merge conflicts
HarshaNalluru Feb 14, 2019
7a0fb60
msgCount instead of receivedMsgs
HarshaNalluru Feb 14, 2019
3a98596
camel case for delayStreaming
HarshaNalluru Feb 15, 2019
dd9c941
Append InMilliseconds
HarshaNalluru Feb 15, 2019
7368bac
Update comment for delayStreaming
HarshaNalluru Feb 15, 2019
bb52dea
delayStreaming -> checkWithTimeout
HarshaNalluru Feb 15, 2019
cd80533
Merge remote-tracking branch 'upstream/master' into DelaysSR
HarshaNalluru Feb 15, 2019
ed14a76
Added "should.equal(msgs.length, 1, "Unexpected num of msgs");"
HarshaNalluru Feb 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {
TopicClient,
SubscriptionClient,
ServiceBusMessage,
delay,
SendableMessageInfo,
ReceiveMode
} from "../lib";
Expand All @@ -26,7 +25,8 @@ import {
testSessionId1,
getSenderReceiverClients,
ClientType,
purge
purge,
checkWithTimeout
} from "./testUtils";

import { Receiver, SessionReceiver } from "../lib/receiver";
Expand Down Expand Up @@ -219,7 +219,8 @@ describe("Streaming Receiver from Queue/Subscription", function(): void {
{ autoComplete: autoCompleteFlag }
);

await delay(2000);
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
should.equal(receivedMsgs[0].body, testMessages.body, "MessageBody is different than expected");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import {
getSenderReceiverClients,
ClientType,
testSessionId1,
purge
purge,
checkWithTimeout
} from "./testUtils";

async function testPeekMsgsLength(
Expand Down Expand Up @@ -198,8 +199,10 @@ describe("SessionTests - Accept a session by passing non-existing sessionId rece
return Promise.resolve();
}, unExpectedErrorHandler);

await delay(2000);
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

await testPeekMsgsLength(receiverClient, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ import {

import { DispositionType } from "../lib/serviceBusMessage";

import { testSimpleMessages, getSenderReceiverClients, ClientType, purge } from "./testUtils";
import {
testSimpleMessages,
getSenderReceiverClients,
ClientType,
purge,
checkWithTimeout
} from "./testUtils";
import { Receiver } from "../lib/receiver";
import { Sender } from "../lib/sender";

Expand Down Expand Up @@ -123,13 +129,9 @@ describe("Streaming Receiver - Misc Tests", function(): void {
return Promise.resolve();
}, unExpectedErrorHandler);

for (let i = 0; i < 5; i++) {
await delay(1000);
if (receivedMsgs.length === 1) {
break;
}
}
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);

should.equal(msgsCheck, true, "Could not receive the messages in expected time.");
await receiver.close();

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
Expand Down Expand Up @@ -180,14 +182,11 @@ describe("Streaming Receiver - Misc Tests", function(): void {
{ autoComplete: false }
);

for (let i = 0; i < 5; i++) {
await delay(1000);
if (receivedMsgs.length === 1) {
break;
}
}
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);

should.equal(msgsCheck, true, "Could not receive the messages in expected time.");
await testPeekMsgsLength(receiverClient, 1);
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");

await receivedMsgs[0].complete();
await receiver.close();
Expand Down Expand Up @@ -248,16 +247,12 @@ describe("Streaming Receiver - Complete message", function(): void {
{ autoComplete }
);

for (let i = 0; i < 5; i++) {
await delay(1000);
if (receivedMsgs.length === 1) {
break;
}
}
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

await receiver.close();
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
await testPeekMsgsLength(receiverClient, 0);
}
it("Partitioned Queue: complete() removes message", async function(): Promise<void> {
Expand Down Expand Up @@ -333,13 +328,12 @@ describe("Streaming Receiver - Abandon message", function(): void {
{ autoComplete: false }
);

await delay(6000);
const deliveryCountFlag = await checkWithTimeout(() => checkDeliveryCount === maxDeliveryCount);
should.equal(deliveryCountFlag, true, "DeliveryCount is different than expected");

await receiver.close();
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

should.equal(checkDeliveryCount, maxDeliveryCount, "DeliveryCount is different than expected");

await testPeekMsgsLength(receiverClient, 0); // No messages in the queue

const deadLetterMsgs = await deadLetterClient.getReceiver().receiveBatch(1);
Expand Down Expand Up @@ -406,7 +400,14 @@ describe("Streaming Receiver - Defer message", function(): void {
unExpectedErrorHandler,
{ autoComplete }
);
await delay(4000);

const sequenceNumCheck = await checkWithTimeout(() => sequenceNum !== 0);
should.equal(
sequenceNumCheck,
true,
"Either the message is not received or observed an unexpected SequenceNumber."
);

await receiver.close();
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

Expand All @@ -429,7 +430,6 @@ describe("Streaming Receiver - Defer message", function(): void {
should.equal(deferredMsgs[0].deliveryCount, 1, "DeliveryCount is different than expected");

await deferredMsgs[0].complete();
await delay(10000);
await testPeekMsgsLength(receiverClient, 0);
}

Expand Down Expand Up @@ -496,17 +496,22 @@ describe("Streaming Receiver - Deadletter message", function(): void {
async function testDeadletter(autoComplete: boolean): Promise<void> {
await sender.send(testSimpleMessages);

const receivedMsgs: ServiceBusMessage[] = [];
receiver.receive(
(msg: ServiceBusMessage) => {
receivedMsgs.push(msg);
return msg.deadLetter();
},
unExpectedErrorHandler,
{ autoComplete }
);

await delay(4000);
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

await receiver.close();
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");

await testPeekMsgsLength(receiverClient, 0);

Expand All @@ -520,7 +525,6 @@ describe("Streaming Receiver - Deadletter message", function(): void {
);

await deadLetterMsgs[0].complete();

await testPeekMsgsLength(deadLetterClient, 0);
}

Expand Down Expand Up @@ -664,7 +668,9 @@ describe("Streaming Receiver - Settle an already Settled message throws error",
return Promise.resolve();
}, unExpectedErrorHandler);

await delay(5000);
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import {
testSessionId1,
getSenderReceiverClients,
ClientType,
purge
purge,
checkWithTimeout
} from "./testUtils";
import { Sender } from "../lib/sender";
import { SessionReceiver } from "../lib/receiver";
Expand Down Expand Up @@ -129,12 +130,9 @@ describe("Streaming Receiver - Misc Tests(with sessions)", function(): void {
return Promise.resolve();
}, unExpectedErrorHandler);

for (let i = 0; i < 5; i++) {
await delay(1000);
if (receivedMsgs.length === 1) {
break;
}
}
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
await testPeekMsgsLength(receiverClient, 0);
Expand Down Expand Up @@ -203,18 +201,15 @@ describe("Streaming Receiver - Misc Tests(with sessions)", function(): void {
{ autoComplete: false }
);

for (let i = 0; i < 5; i++) {
await delay(1000);
if (receivedMsgs.length === 1) {
break;
}
}
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

await testPeekMsgsLength(receiverClient, 1);

await receivedMsgs[0].complete();

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
}

it("Disabled autoComplete, no manual complete retains the message in Partitioned Queue(with sessions)", async function(): Promise<
Expand Down Expand Up @@ -286,14 +281,11 @@ describe("Streaming Receiver - Complete message(with sessions)", function(): voi
{ autoComplete }
);

for (let i = 0; i < 5; i++) {
await delay(1000);
if (receivedMsgs.length === 1) {
break;
}
}
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");

await testPeekMsgsLength(receiverClient, 0);
}
Expand Down Expand Up @@ -385,10 +377,11 @@ describe("Streaming Receiver - Abandon message(with sessions)", function(): void

async function testAbandon(autoComplete: boolean): Promise<void> {
await sender.send(testMessagesWithSessions);

let abandonFlag = 0;
await sessionReceiver.receive(
(msg: ServiceBusMessage) => {
return msg.abandon().then(() => {
abandonFlag = 1;
if (sessionReceiver.isOpen()) {
return sessionReceiver.close();
}
Expand All @@ -398,7 +391,9 @@ describe("Streaming Receiver - Abandon message(with sessions)", function(): void
unExpectedErrorHandler,
{ autoComplete }
);
await delay(4000);

const msgAbandonCheck = await checkWithTimeout(() => abandonFlag === 1);
should.equal(msgAbandonCheck, true, "Abandoning the message results in a failure");

if (sessionReceiver.isOpen()) {
await sessionReceiver.close();
Expand Down Expand Up @@ -518,7 +513,12 @@ describe("Streaming Receiver - Defer message(with sessions)", function(): void {
{ autoComplete }
);

await delay(4000);
const sequenceNumCheck = await checkWithTimeout(() => sequenceNum !== 0);
should.equal(
sequenceNumCheck,
true,
"Either the message is not received or observed an unexpected SequenceNumber."
);

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

Expand All @@ -540,7 +540,6 @@ describe("Streaming Receiver - Defer message(with sessions)", function(): void {
should.equal(deferredMsg.deliveryCount, 1, "DeliveryCount is different than expected");

await deferredMsg.complete();

await testPeekMsgsLength(receiverClient, 0);
}
it("Partitioned Queue: defer() moves message to deferred queue(with sessions)", async function(): Promise<
Expand Down Expand Up @@ -632,17 +631,21 @@ describe("Streaming Receiver - Deadletter message(with sessions)", function(): v
async function testDeadletter(autoComplete: boolean): Promise<void> {
await sender.send(testMessagesWithSessions);

let msgCount = 0;
await sessionReceiver.receive(
(msg: ServiceBusMessage) => {
msgCount++;
return msg.deadLetter();
},
unExpectedErrorHandler,
{ autoComplete }
);

await delay(4000);
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
const msgsCheck = await checkWithTimeout(() => msgCount === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
should.equal(msgCount, 1, "Unexpected number of messages");
await testPeekMsgsLength(receiverClient, 0);

const deadLetterMsgs = await deadLetterClient.getReceiver().receiveBatch(1);
Expand Down Expand Up @@ -833,7 +836,8 @@ describe("Streaming Receiver - Settle an already Settled message throws error(wi
return Promise.resolve();
}, unExpectedErrorHandler);

await delay(5000);
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
Expand Down
20 changes: 19 additions & 1 deletion packages/@azure/servicebus/data-plane/test/testUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import {
QueueClient,
TopicClient,
Namespace,
SubscriptionClient
SubscriptionClient,
delay
} from "../lib";
import * as msRestNodeAuth from "@azure/ms-rest-nodeauth";
import { ServiceBusManagementClient } from "@azure/arm-servicebus";
Expand Down Expand Up @@ -383,3 +384,20 @@ export async function purge(
}
}
}

/**
* Maximum wait duration for the expected event to happen = `10000 ms`(default value is 10 seconds)(= maxWaitTimeInMilliseconds)
* Keep checking whether the predicate is true after every `1000 ms`(default value is 1 second) (= delayBetweenRetriesInMilliseconds)
*/
export async function checkWithTimeout(
predicate: () => boolean,
delayBetweenRetriesInMilliseconds: number = 1000,
maxWaitTimeInMilliseconds: number = 10000
): Promise<boolean> {
const maxTime = Date.now() + maxWaitTimeInMilliseconds;
while (Date.now() < maxTime) {
if (predicate()) return true;
await delay(delayBetweenRetriesInMilliseconds);
}
return false;
}
Loading