diff --git a/packages/@azure/servicebus/data-plane/test/receiveAndDeleteMode.spec.ts b/packages/@azure/servicebus/data-plane/test/receiveAndDeleteMode.spec.ts index 6594ec1c8b10..68668db91871 100644 --- a/packages/@azure/servicebus/data-plane/test/receiveAndDeleteMode.spec.ts +++ b/packages/@azure/servicebus/data-plane/test/receiveAndDeleteMode.spec.ts @@ -13,7 +13,6 @@ import { TopicClient, SubscriptionClient, ServiceBusMessage, - delay, SendableMessageInfo, ReceiveMode } from "../lib"; @@ -26,7 +25,8 @@ import { testSessionId1, getSenderReceiverClients, ClientType, - purge + purge, + checkWithTimeout } from "./testUtils"; import { Receiver, SessionReceiver } from "../lib/receiver"; @@ -219,7 +219,8 @@ describe("Streaming Receiver from Queue/Subscription", function(): void { { autoComplete: autoCompleteFlag } ); - await delay(2000); + const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); should.equal(receivedMsgs.length, 1, "Unexpected number of messages"); should.equal(receivedMsgs[0].body, testMessages.body, "MessageBody is different than expected"); diff --git a/packages/@azure/servicebus/data-plane/test/sessionsTests.spec.ts b/packages/@azure/servicebus/data-plane/test/sessionsTests.spec.ts index 1922475bcb47..0575b3cf9f3c 100644 --- a/packages/@azure/servicebus/data-plane/test/sessionsTests.spec.ts +++ b/packages/@azure/servicebus/data-plane/test/sessionsTests.spec.ts @@ -22,7 +22,8 @@ import { getSenderReceiverClients, ClientType, testSessionId1, - purge + purge, + checkWithTimeout } from "./testUtils"; async function testPeekMsgsLength( @@ -198,8 +199,10 @@ describe("SessionTests - Accept a session by passing non-existing sessionId rece return Promise.resolve(); }, unExpectedErrorHandler); - await delay(2000); + const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); should.equal(receivedMsgs.length, 1, "Unexpected number of messages"); + should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); await testPeekMsgsLength(receiverClient, 0); diff --git a/packages/@azure/servicebus/data-plane/test/streamingReceiver.spec.ts b/packages/@azure/servicebus/data-plane/test/streamingReceiver.spec.ts index 9e2abf1440fa..ad04b1f00b6f 100644 --- a/packages/@azure/servicebus/data-plane/test/streamingReceiver.spec.ts +++ b/packages/@azure/servicebus/data-plane/test/streamingReceiver.spec.ts @@ -18,7 +18,13 @@ import { import { DispositionType } from "../lib/serviceBusMessage"; -import { testSimpleMessages, getSenderReceiverClients, ClientType, purge } from "./testUtils"; +import { + testSimpleMessages, + getSenderReceiverClients, + ClientType, + purge, + checkWithTimeout +} from "./testUtils"; import { Receiver } from "../lib/receiver"; import { Sender } from "../lib/sender"; @@ -123,13 +129,9 @@ describe("Streaming Receiver - Misc Tests", function(): void { return Promise.resolve(); }, unExpectedErrorHandler); - for (let i = 0; i < 5; i++) { - await delay(1000); - if (receivedMsgs.length === 1) { - break; - } - } + const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); await receiver.close(); should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); @@ -180,14 +182,11 @@ describe("Streaming Receiver - Misc Tests", function(): void { { autoComplete: false } ); - for (let i = 0; i < 5; i++) { - await delay(1000); - if (receivedMsgs.length === 1) { - break; - } - } + const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); await testPeekMsgsLength(receiverClient, 1); + should.equal(receivedMsgs.length, 1, "Unexpected number of messages"); await receivedMsgs[0].complete(); await receiver.close(); @@ -248,16 +247,12 @@ describe("Streaming Receiver - Complete message", function(): void { { autoComplete } ); - for (let i = 0; i < 5; i++) { - await delay(1000); - if (receivedMsgs.length === 1) { - break; - } - } + const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); await receiver.close(); should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); - + should.equal(receivedMsgs.length, 1, "Unexpected number of messages"); await testPeekMsgsLength(receiverClient, 0); } it("Partitioned Queue: complete() removes message", async function(): Promise { @@ -333,13 +328,12 @@ describe("Streaming Receiver - Abandon message", function(): void { { autoComplete: false } ); - await delay(6000); + const deliveryCountFlag = await checkWithTimeout(() => checkDeliveryCount === maxDeliveryCount); + should.equal(deliveryCountFlag, true, "DeliveryCount is different than expected"); await receiver.close(); should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); - should.equal(checkDeliveryCount, maxDeliveryCount, "DeliveryCount is different than expected"); - await testPeekMsgsLength(receiverClient, 0); // No messages in the queue const deadLetterMsgs = await deadLetterClient.getReceiver().receiveBatch(1); @@ -406,7 +400,14 @@ describe("Streaming Receiver - Defer message", function(): void { unExpectedErrorHandler, { autoComplete } ); - await delay(4000); + + const sequenceNumCheck = await checkWithTimeout(() => sequenceNum !== 0); + should.equal( + sequenceNumCheck, + true, + "Either the message is not received or observed an unexpected SequenceNumber." + ); + await receiver.close(); should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); @@ -429,7 +430,6 @@ describe("Streaming Receiver - Defer message", function(): void { should.equal(deferredMsgs[0].deliveryCount, 1, "DeliveryCount is different than expected"); await deferredMsgs[0].complete(); - await delay(10000); await testPeekMsgsLength(receiverClient, 0); } @@ -496,17 +496,22 @@ describe("Streaming Receiver - Deadletter message", function(): void { async function testDeadletter(autoComplete: boolean): Promise { await sender.send(testSimpleMessages); + const receivedMsgs: ServiceBusMessage[] = []; receiver.receive( (msg: ServiceBusMessage) => { + receivedMsgs.push(msg); return msg.deadLetter(); }, unExpectedErrorHandler, { autoComplete } ); - await delay(4000); + const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); + await receiver.close(); should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); + should.equal(receivedMsgs.length, 1, "Unexpected number of messages"); await testPeekMsgsLength(receiverClient, 0); @@ -520,7 +525,6 @@ describe("Streaming Receiver - Deadletter message", function(): void { ); await deadLetterMsgs[0].complete(); - await testPeekMsgsLength(deadLetterClient, 0); } @@ -664,7 +668,9 @@ describe("Streaming Receiver - Settle an already Settled message throws error", return Promise.resolve(); }, unExpectedErrorHandler); - await delay(5000); + const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); + should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); should.equal(receivedMsgs.length, 1, "Unexpected number of messages"); diff --git a/packages/@azure/servicebus/data-plane/test/streamingReceiverSessions.spec.ts b/packages/@azure/servicebus/data-plane/test/streamingReceiverSessions.spec.ts index 6412147ec1c0..6b892ff53633 100644 --- a/packages/@azure/servicebus/data-plane/test/streamingReceiverSessions.spec.ts +++ b/packages/@azure/servicebus/data-plane/test/streamingReceiverSessions.spec.ts @@ -23,7 +23,8 @@ import { testSessionId1, getSenderReceiverClients, ClientType, - purge + purge, + checkWithTimeout } from "./testUtils"; import { Sender } from "../lib/sender"; import { SessionReceiver } from "../lib/receiver"; @@ -129,12 +130,9 @@ describe("Streaming Receiver - Misc Tests(with sessions)", function(): void { return Promise.resolve(); }, unExpectedErrorHandler); - for (let i = 0; i < 5; i++) { - await delay(1000); - if (receivedMsgs.length === 1) { - break; - } - } + const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); + should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); should.equal(receivedMsgs.length, 1, "Unexpected number of messages"); await testPeekMsgsLength(receiverClient, 0); @@ -203,18 +201,15 @@ describe("Streaming Receiver - Misc Tests(with sessions)", function(): void { { autoComplete: false } ); - for (let i = 0; i < 5; i++) { - await delay(1000); - if (receivedMsgs.length === 1) { - break; - } - } + const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); await testPeekMsgsLength(receiverClient, 1); await receivedMsgs[0].complete(); should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); + should.equal(receivedMsgs.length, 1, "Unexpected number of messages"); } it("Disabled autoComplete, no manual complete retains the message in Partitioned Queue(with sessions)", async function(): Promise< @@ -286,14 +281,11 @@ describe("Streaming Receiver - Complete message(with sessions)", function(): voi { autoComplete } ); - for (let i = 0; i < 5; i++) { - await delay(1000); - if (receivedMsgs.length === 1) { - break; - } - } + const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); + should.equal(receivedMsgs.length, 1, "Unexpected number of messages"); await testPeekMsgsLength(receiverClient, 0); } @@ -385,10 +377,11 @@ describe("Streaming Receiver - Abandon message(with sessions)", function(): void async function testAbandon(autoComplete: boolean): Promise { await sender.send(testMessagesWithSessions); - + let abandonFlag = 0; await sessionReceiver.receive( (msg: ServiceBusMessage) => { return msg.abandon().then(() => { + abandonFlag = 1; if (sessionReceiver.isOpen()) { return sessionReceiver.close(); } @@ -398,7 +391,9 @@ describe("Streaming Receiver - Abandon message(with sessions)", function(): void unExpectedErrorHandler, { autoComplete } ); - await delay(4000); + + const msgAbandonCheck = await checkWithTimeout(() => abandonFlag === 1); + should.equal(msgAbandonCheck, true, "Abandoning the message results in a failure"); if (sessionReceiver.isOpen()) { await sessionReceiver.close(); @@ -518,7 +513,12 @@ describe("Streaming Receiver - Defer message(with sessions)", function(): void { { autoComplete } ); - await delay(4000); + const sequenceNumCheck = await checkWithTimeout(() => sequenceNum !== 0); + should.equal( + sequenceNumCheck, + true, + "Either the message is not received or observed an unexpected SequenceNumber." + ); should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); @@ -540,7 +540,6 @@ describe("Streaming Receiver - Defer message(with sessions)", function(): void { should.equal(deferredMsg.deliveryCount, 1, "DeliveryCount is different than expected"); await deferredMsg.complete(); - await testPeekMsgsLength(receiverClient, 0); } it("Partitioned Queue: defer() moves message to deferred queue(with sessions)", async function(): Promise< @@ -632,17 +631,21 @@ describe("Streaming Receiver - Deadletter message(with sessions)", function(): v async function testDeadletter(autoComplete: boolean): Promise { await sender.send(testMessagesWithSessions); + let msgCount = 0; await sessionReceiver.receive( (msg: ServiceBusMessage) => { + msgCount++; return msg.deadLetter(); }, unExpectedErrorHandler, { autoComplete } ); - await delay(4000); - should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); + const msgsCheck = await checkWithTimeout(() => msgCount === 1); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); + should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); + should.equal(msgCount, 1, "Unexpected number of messages"); await testPeekMsgsLength(receiverClient, 0); const deadLetterMsgs = await deadLetterClient.getReceiver().receiveBatch(1); @@ -833,7 +836,8 @@ describe("Streaming Receiver - Settle an already Settled message throws error(wi return Promise.resolve(); }, unExpectedErrorHandler); - await delay(5000); + const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); should.equal(receivedMsgs.length, 1, "Unexpected number of messages"); diff --git a/packages/@azure/servicebus/data-plane/test/testUtils.ts b/packages/@azure/servicebus/data-plane/test/testUtils.ts index 709ca62905bc..e893f9ccd662 100644 --- a/packages/@azure/servicebus/data-plane/test/testUtils.ts +++ b/packages/@azure/servicebus/data-plane/test/testUtils.ts @@ -6,7 +6,8 @@ import { QueueClient, TopicClient, Namespace, - SubscriptionClient + SubscriptionClient, + delay } from "../lib"; import * as msRestNodeAuth from "@azure/ms-rest-nodeauth"; import { ServiceBusManagementClient } from "@azure/arm-servicebus"; @@ -383,3 +384,20 @@ export async function purge( } } } + +/** + * Maximum wait duration for the expected event to happen = `10000 ms`(default value is 10 seconds)(= maxWaitTimeInMilliseconds) + * Keep checking whether the predicate is true after every `1000 ms`(default value is 1 second) (= delayBetweenRetriesInMilliseconds) + */ +export async function checkWithTimeout( + predicate: () => boolean, + delayBetweenRetriesInMilliseconds: number = 1000, + maxWaitTimeInMilliseconds: number = 10000 +): Promise { + const maxTime = Date.now() + maxWaitTimeInMilliseconds; + while (Date.now() < maxTime) { + if (predicate()) return true; + await delay(delayBetweenRetriesInMilliseconds); + } + return false; +} diff --git a/packages/@azure/servicebus/data-plane/test/topicFilters.spec.ts b/packages/@azure/servicebus/data-plane/test/topicFilters.spec.ts index 00a111034fad..a31b3852cd50 100644 --- a/packages/@azure/servicebus/data-plane/test/topicFilters.spec.ts +++ b/packages/@azure/servicebus/data-plane/test/topicFilters.spec.ts @@ -13,10 +13,9 @@ import { ServiceBusMessage, TopicClient, SendableMessageInfo, - CorrelationFilter, - delay + CorrelationFilter } from "../lib"; -import { getSenderReceiverClients, ClientType, purge } from "./testUtils"; +import { getSenderReceiverClients, ClientType, purge, checkWithTimeout } from "./testUtils"; // We need to remove rules before adding one because otherwise the existing default rule will let in all messages. async function removeAllRules(client: SubscriptionClient): Promise { @@ -134,9 +133,8 @@ async function receiveOrders( } ); - for (let i = 0; i < 10 && receivedMsgs.length < expectedMessageCount; i++) { - await delay(1000); - } + const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === expectedMessageCount); + should.equal(msgsCheck, true, "Could not receive the messages in expected time."); await receiver.close(); should.equal( @@ -144,6 +142,7 @@ async function receiveOrders( undefined, errorFromErrorHandler && errorFromErrorHandler.message ); + should.equal(receivedMsgs.length, expectedMessageCount, "Unexpected number of messages"); return receivedMsgs; }