From f42cfb8e1473d006d19ef5c91d4454405b33445d Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Thu, 21 Jan 2021 18:14:05 -0800 Subject: [PATCH 01/11] Test for 1000 iterations --- .../service-bus/test/batchReceiver.spec.ts | 479 +++++++++--------- 1 file changed, 242 insertions(+), 237 deletions(-) diff --git a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts index 537f0b80fc08..4db30831f495 100644 --- a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts @@ -26,7 +26,7 @@ const should = chai.should(); chai.use(chaiAsPromised); const assert = chai.assert; -const noSessionTestClientType = getRandomTestClientTypeWithNoSessions(); +let noSessionTestClientType = getRandomTestClientTypeWithNoSessions(); const withSessionTestClientType = getRandomTestClientTypeWithSessions(); const anyRandomTestClientType = getRandomTestClientType(); @@ -865,288 +865,293 @@ describe("Batching Receiver", () => { ); }); - describe(noSessionTestClientType + ": Batch Receiver - disconnects", function(): void { - let serviceBusClient: ServiceBusClientForTests; - let sender: ServiceBusSender; - let receiver: ServiceBusReceiver; + for (let index = 0; index < 1000; index++) { + noSessionTestClientType = getRandomTestClientTypeWithNoSessions(); + describe(`${index}. ${noSessionTestClientType}: Batch Receiver - disconnects`, function(): void { + let serviceBusClient: ServiceBusClientForTests; + let sender: ServiceBusSender; + let receiver: ServiceBusReceiver; + + async function beforeEachTest( + receiveMode: "peekLock" | "receiveAndDelete" = "peekLock" + ): Promise { + serviceBusClient = createServiceBusClientForTests(); + const entityNames = await serviceBusClient.test.createTestEntities(noSessionTestClientType); + if (receiveMode == "receiveAndDelete") { + receiver = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames); + } else { + receiver = await serviceBusClient.test.createPeekLockReceiver(entityNames); + } - async function beforeEachTest( - receiveMode: "peekLock" | "receiveAndDelete" = "peekLock" - ): Promise { - serviceBusClient = createServiceBusClientForTests(); - const entityNames = await serviceBusClient.test.createTestEntities(noSessionTestClientType); - if (receiveMode == "receiveAndDelete") { - receiver = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames); - } else { - receiver = await serviceBusClient.test.createPeekLockReceiver(entityNames); + sender = serviceBusClient.test.addToCleanup( + serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + ); } - sender = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) - ); - } + afterEach(async () => { + if (serviceBusClient) { + await serviceBusClient.test.afterEach(); + await serviceBusClient.test.after(); + } + }); - afterEach(async () => { - if (serviceBusClient) { - await serviceBusClient.test.afterEach(); - await serviceBusClient.test.after(); - } - }); + it.only(`can receive and settle messages after a disconnect`, async function(): Promise< + void + > { + // Create the sender and receiver. + await beforeEachTest(); - it("can receive and settle messages after a disconnect", async function(): Promise { - // Create the sender and receiver. - await beforeEachTest(); + // Send a message so we can be sure when the receiver is open and active. + await sender.sendMessages(TestMessage.getSample()); - // Send a message so we can be sure when the receiver is open and active. - await sender.sendMessages(TestMessage.getSample()); + let settledMessageCount = 0; - let settledMessageCount = 0; + const messages1 = await receiver.receiveMessages(1); + for (const message of messages1) { + await receiver.completeMessage(message); + settledMessageCount++; + } - const messages1 = await receiver.receiveMessages(1); - for (const message of messages1) { - await receiver.completeMessage(message); - settledMessageCount++; - } + settledMessageCount.should.equal(1, "Unexpected number of settled messages."); - settledMessageCount.should.equal(1, "Unexpected number of settled messages."); + const connectionContext = (receiver as any)["_context"]; + const refreshConnection = connectionContext.refreshConnection; + let refreshConnectionCalled = 0; + connectionContext.refreshConnection = function(...args: any) { + refreshConnectionCalled++; + refreshConnection.apply(this, args); + }; - const connectionContext = (receiver as any)["_context"]; - const refreshConnection = connectionContext.refreshConnection; - let refreshConnectionCalled = 0; - connectionContext.refreshConnection = function(...args: any) { - refreshConnectionCalled++; - refreshConnection.apply(this, args); - }; + // Simulate a disconnect being called with a non-retryable error. + (receiver as ServiceBusReceiverImpl)["_context"].connection["_connection"].idle(); - // Simulate a disconnect being called with a non-retryable error. - (receiver as ServiceBusReceiverImpl)["_context"].connection["_connection"].idle(); + // send a second message to trigger the message handler again. + await sender.sendMessages(TestMessage.getSample()); - // send a second message to trigger the message handler again. - await sender.sendMessages(TestMessage.getSample()); + // wait for the 2nd message to be received. + const messages2 = await (receiver as ServiceBusReceiver).receiveMessages(1); + for (const message of messages2) { + await receiver.completeMessage(message); + settledMessageCount++; + } + settledMessageCount.should.equal(2, "Unexpected number of settled messages."); + refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called."); + }); - // wait for the 2nd message to be received. - const messages2 = await (receiver as ServiceBusReceiver).receiveMessages(1); - for (const message of messages2) { - await receiver.completeMessage(message); - settledMessageCount++; - } - settledMessageCount.should.equal(2, "Unexpected number of settled messages."); - refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called."); - }); + it("returns messages if drain is in progress (receiveAndDelete)", async function(): Promise< + void + > { + // Create the sender and receiver. + await beforeEachTest("receiveAndDelete"); + + // The first time `receiveMessages` is called the receiver link is created. + // The `receiver_drained` handler is only added after the link is created, + // which is a non-blocking task. + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); + const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; + const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; + + if (!batchingReceiver || !batchingReceiver.isOpen()) { + throw new Error(`Unable to initialize receiver link.`); + } - it("returns messages if drain is in progress (receiveAndDelete)", async function(): Promise< - void - > { - // Create the sender and receiver. - await beforeEachTest("receiveAndDelete"); - - // The first time `receiveMessages` is called the receiver link is created. - // The `receiver_drained` handler is only added after the link is created, - // which is a non-blocking task. - await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); - const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; - const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; - - if (!batchingReceiver || !batchingReceiver.isOpen()) { - throw new Error(`Unable to initialize receiver link.`); - } + // Send a message so we have something to receive. + await sender.sendMessages(TestMessage.getSample()); - // Send a message so we have something to receive. - await sender.sendMessages(TestMessage.getSample()); - - // Since the receiver has already been initialized, - // the `receiver_drained` handler is attached as soon - // as receiveMessages is invoked. - // We remove the `receiver_drained` timeout after `receiveMessages` - // does it's initial setup by wrapping it in a `setTimeout`. - // This triggers the `receiver_drained` handler removal on the next - // tick of the event loop; after the handler has been attached. - setTimeout(() => { - // remove `receiver_drained` event - batchingReceiver["link"]!.removeAllListeners(ReceiverEvents.receiverDrained); - }, 0); - - // We want to simulate a disconnect once the batching receiver is draining. - // We can detect when the receiver enters a draining state when `addCredit` is - // called while `drain` is set to true. - let didRequestDrain = false; - const addCredit = batchingReceiver["link"]!.addCredit; - batchingReceiver["link"]!.addCredit = function(credits) { - addCredit.call(this, credits); - if (batchingReceiver["link"]!.drain) { - didRequestDrain = true; - // Simulate a disconnect being called with a non-retryable error. - receiverContext.connection["_connection"].idle(); - } - }; + // Since the receiver has already been initialized, + // the `receiver_drained` handler is attached as soon + // as receiveMessages is invoked. + // We remove the `receiver_drained` timeout after `receiveMessages` + // does it's initial setup by wrapping it in a `setTimeout`. + // This triggers the `receiver_drained` handler removal on the next + // tick of the event loop; after the handler has been attached. + setTimeout(() => { + // remove `receiver_drained` event + batchingReceiver["link"]!.removeAllListeners(ReceiverEvents.receiverDrained); + }, 0); - // Purposefully request more messages than what's available - // so that the receiver will have to drain. - const messages1 = await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); + // We want to simulate a disconnect once the batching receiver is draining. + // We can detect when the receiver enters a draining state when `addCredit` is + // called while `drain` is set to true. + let didRequestDrain = false; + const addCredit = batchingReceiver["link"]!.addCredit; + batchingReceiver["link"]!.addCredit = function(credits) { + addCredit.call(this, credits); + if (batchingReceiver["link"]!.drain) { + didRequestDrain = true; + // Simulate a disconnect being called with a non-retryable error. + receiverContext.connection["_connection"].idle(); + } + }; - didRequestDrain.should.equal(true, "Drain was not requested."); - messages1.length.should.equal(1, "Unexpected number of messages received."); + // Purposefully request more messages than what's available + // so that the receiver will have to drain. + const messages1 = await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); - // Make sure that a 2nd receiveMessages call still works - // by sending and receiving a single message again. - await sender.sendMessages(TestMessage.getSample()); + didRequestDrain.should.equal(true, "Drain was not requested."); + messages1.length.should.equal(1, "Unexpected number of messages received."); - // wait for the 2nd message to be received. - const messages2 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + // Make sure that a 2nd receiveMessages call still works + // by sending and receiving a single message again. + await sender.sendMessages(TestMessage.getSample()); - messages2.length.should.equal(1, "Unexpected number of messages received."); - }); + // wait for the 2nd message to be received. + const messages2 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); - it("throws an error if drain is in progress (peekLock)", async function(): Promise { - // Create the sender and receiver. - await beforeEachTest(); + messages2.length.should.equal(1, "Unexpected number of messages received."); + }); - // The first time `receiveMessages` is called the receiver link is created. - // The `receiver_drained` handler is only added after the link is created, - // which is a non-blocking task. - await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); - const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; - const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; + it("throws an error if drain is in progress (peekLock)", async function(): Promise { + // Create the sender and receiver. + await beforeEachTest(); - if (!batchingReceiver || !batchingReceiver.isOpen()) { - throw new Error(`Unable to initialize receiver link.`); - } + // The first time `receiveMessages` is called the receiver link is created. + // The `receiver_drained` handler is only added after the link is created, + // which is a non-blocking task. + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); + const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; + const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; - // Send a message so we have something to receive. - await sender.sendMessages(TestMessage.getSample()); - - // Since the receiver has already been initialized, - // the `receiver_drained` handler is attached as soon - // as receiveMessages is invoked. - // We remove the `receiver_drained` timeout after `receiveMessages` - // does it's initial setup by wrapping it in a `setTimeout`. - // This triggers the `receiver_drained` handler removal on the next - // tick of the event loop; after the handler has been attached. - setTimeout(() => { - // remove `receiver_drained` event - batchingReceiver["link"]!.removeAllListeners(ReceiverEvents.receiverDrained); - }, 0); - - // We want to simulate a disconnect once the batching receiver is draining. - // We can detect when the receiver enters a draining state when `addCredit` is - // called while `drain` is set to true. - let didRequestDrain = false; - const addCredit = batchingReceiver["link"]!.addCredit; - batchingReceiver["link"]!.addCredit = function(credits) { - didRequestDrain = true; - addCredit.call(this, credits); - if (batchingReceiver["link"]!.drain) { - // Simulate a disconnect being called with a non-retryable error. - receiverContext.connection["_connection"].idle(); + if (!batchingReceiver || !batchingReceiver.isOpen()) { + throw new Error(`Unable to initialize receiver link.`); } - }; - // Purposefully request more messages than what's available - // so that the receiver will have to drain. - const testFailureMessage = "Test failure"; - try { - await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); - throw new Error(testFailureMessage); - } catch (err) { - err.message && err.message.should.not.equal(testFailureMessage); - } + // Send a message so we have something to receive. + await sender.sendMessages(TestMessage.getSample()); - didRequestDrain.should.equal(true, "Drain was not requested."); + // Since the receiver has already been initialized, + // the `receiver_drained` handler is attached as soon + // as receiveMessages is invoked. + // We remove the `receiver_drained` timeout after `receiveMessages` + // does it's initial setup by wrapping it in a `setTimeout`. + // This triggers the `receiver_drained` handler removal on the next + // tick of the event loop; after the handler has been attached. + setTimeout(() => { + // remove `receiver_drained` event + batchingReceiver["link"]!.removeAllListeners(ReceiverEvents.receiverDrained); + }, 0); - // Make sure that a 2nd receiveMessages call still works - // by sending and receiving a single message again. - await sender.sendMessages(TestMessage.getSample()); + // We want to simulate a disconnect once the batching receiver is draining. + // We can detect when the receiver enters a draining state when `addCredit` is + // called while `drain` is set to true. + let didRequestDrain = false; + const addCredit = batchingReceiver["link"]!.addCredit; + batchingReceiver["link"]!.addCredit = function(credits) { + didRequestDrain = true; + addCredit.call(this, credits); + if (batchingReceiver["link"]!.drain) { + // Simulate a disconnect being called with a non-retryable error. + receiverContext.connection["_connection"].idle(); + } + }; - // wait for the 2nd message to be received. - const messages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + // Purposefully request more messages than what's available + // so that the receiver will have to drain. + const testFailureMessage = "Test failure"; + try { + await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); + throw new Error(testFailureMessage); + } catch (err) { + err.message && err.message.should.not.equal(testFailureMessage); + } - messages.length.should.equal(1, "Unexpected number of messages received."); - }); + didRequestDrain.should.equal(true, "Drain was not requested."); - it("returns messages if receive in progress (receiveAndDelete)", async function(): Promise< - void - > { - // Create the sender and receiver. - await beforeEachTest("receiveAndDelete"); - - // The first time `receiveMessages` is called the receiver link is created. - // The `receiver_drained` handler is only added after the link is created, - // which is a non-blocking task. - await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); - const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; - const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; - - if (!batchingReceiver || !batchingReceiver.isOpen()) { - throw new Error(`Unable to initialize receiver link.`); - } + // Make sure that a 2nd receiveMessages call still works + // by sending and receiving a single message again. + await sender.sendMessages(TestMessage.getSample()); - // Send a message so we have something to receive. - await sender.sendMessages(TestMessage.getSample()); + // wait for the 2nd message to be received. + const messages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); - // Simulate a disconnect after a message has been received. - batchingReceiver["link"]!.once("message", function() { - setTimeout(() => { - // Simulate a disconnect being called with a non-retryable error. - receiverContext.connection["_connection"].idle(); - }, 0); + messages.length.should.equal(1, "Unexpected number of messages received."); }); - // Purposefully request more messages than what's available - // so that the receiver will have to drain. - const messages1 = await receiver.receiveMessages(10, { maxWaitTimeInMs: 10000 }); + it("returns messages if receive in progress (receiveAndDelete)", async function(): Promise< + void + > { + // Create the sender and receiver. + await beforeEachTest("receiveAndDelete"); + + // The first time `receiveMessages` is called the receiver link is created. + // The `receiver_drained` handler is only added after the link is created, + // which is a non-blocking task. + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); + const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; + const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; + + if (!batchingReceiver || !batchingReceiver.isOpen()) { + throw new Error(`Unable to initialize receiver link.`); + } - messages1.length.should.equal(1, "Unexpected number of messages received."); + // Send a message so we have something to receive. + await sender.sendMessages(TestMessage.getSample()); - // Make sure that a 2nd receiveMessages call still works - // by sending and receiving a single message again. - await sender.sendMessages(TestMessage.getSample()); + // Simulate a disconnect after a message has been received. + batchingReceiver["link"]!.once("message", function() { + setTimeout(() => { + // Simulate a disconnect being called with a non-retryable error. + receiverContext.connection["_connection"].idle(); + }, 0); + }); - // wait for the 2nd message to be received. - const messages2 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + // Purposefully request more messages than what's available + // so that the receiver will have to drain. + const messages1 = await receiver.receiveMessages(10, { maxWaitTimeInMs: 10000 }); - messages2.length.should.equal(1, "Unexpected number of messages received."); - }); + messages1.length.should.equal(1, "Unexpected number of messages received."); - it("throws an error if receive is in progress (peekLock)", async function(): Promise { - // Create the sender and receiver. - await beforeEachTest(); + // Make sure that a 2nd receiveMessages call still works + // by sending and receiving a single message again. + await sender.sendMessages(TestMessage.getSample()); - // The first time `receiveMessages` is called the receiver link is created. - // The `receiver_drained` handler is only added after the link is created, - // which is a non-blocking task. - await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); - const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; - const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; + // wait for the 2nd message to be received. + const messages2 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); - if (!batchingReceiver || !batchingReceiver.isOpen()) { - throw new Error(`Unable to initialize receiver link.`); - } + messages2.length.should.equal(1, "Unexpected number of messages received."); + }); - // Simulate a disconnect - setTimeout(() => { - // Simulate a disconnect being called with a non-retryable error. - receiverContext.connection["_connection"].idle(); - }, 0); + it("throws an error if receive is in progress (peekLock)", async function(): Promise { + // Create the sender and receiver. + await beforeEachTest(); - // Purposefully request more messages than what's available - // so that the receiver will have to drain. - const testFailureMessage = "Test failure"; - try { - const msgs = await receiver.receiveMessages(10, { maxWaitTimeInMs: 10000 }); - console.log(msgs.length); - throw new Error(testFailureMessage); - } catch (err) { - err.message && err.message.should.not.equal(testFailureMessage); - } + // The first time `receiveMessages` is called the receiver link is created. + // The `receiver_drained` handler is only added after the link is created, + // which is a non-blocking task. + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); + const receiverContext = (receiver as ServiceBusReceiverImpl)["_context"]; + const batchingReceiver = (receiver as ServiceBusReceiverImpl)["_batchingReceiver"]; + + if (!batchingReceiver || !batchingReceiver.isOpen()) { + throw new Error(`Unable to initialize receiver link.`); + } + + // Simulate a disconnect + setTimeout(() => { + // Simulate a disconnect being called with a non-retryable error. + receiverContext.connection["_connection"].idle(); + }, 0); - // Make sure that a 2nd receiveMessages call still works - // by sending and receiving a single message again. - await sender.sendMessages(TestMessage.getSample()); + // Purposefully request more messages than what's available + // so that the receiver will have to drain. + const testFailureMessage = "Test failure"; + try { + const msgs = await receiver.receiveMessages(10, { maxWaitTimeInMs: 10000 }); + console.log(msgs.length); + throw new Error(testFailureMessage); + } catch (err) { + err.message && err.message.should.not.equal(testFailureMessage); + } + + // Make sure that a 2nd receiveMessages call still works + // by sending and receiving a single message again. + await sender.sendMessages(TestMessage.getSample()); - // wait for the 2nd message to be received. - const messages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + // wait for the 2nd message to be received. + const messages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); - messages.length.should.equal(1, "Unexpected number of messages received."); + messages.length.should.equal(1, "Unexpected number of messages received."); + }); }); - }); + } }); From f14c5e46bde0593d0f2ff158379b1f82c2fbea73 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Thu, 21 Jan 2021 18:22:09 -0800 Subject: [PATCH 02/11] drop the whole thing in try-catch --- .../service-bus/test/batchReceiver.spec.ts | 63 ++++++++++--------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts index 4db30831f495..f44cd53c4d38 100644 --- a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts @@ -898,44 +898,49 @@ describe("Batching Receiver", () => { it.only(`can receive and settle messages after a disconnect`, async function(): Promise< void > { - // Create the sender and receiver. - await beforeEachTest(); + try { + // Create the sender and receiver. + await beforeEachTest(); - // Send a message so we can be sure when the receiver is open and active. - await sender.sendMessages(TestMessage.getSample()); + // Send a message so we can be sure when the receiver is open and active. + await sender.sendMessages(TestMessage.getSample()); - let settledMessageCount = 0; + let settledMessageCount = 0; - const messages1 = await receiver.receiveMessages(1); - for (const message of messages1) { - await receiver.completeMessage(message); - settledMessageCount++; - } + const messages1 = await receiver.receiveMessages(1); + for (const message of messages1) { + await receiver.completeMessage(message); + settledMessageCount++; + } - settledMessageCount.should.equal(1, "Unexpected number of settled messages."); + settledMessageCount.should.equal(1, "Unexpected number of settled messages."); - const connectionContext = (receiver as any)["_context"]; - const refreshConnection = connectionContext.refreshConnection; - let refreshConnectionCalled = 0; - connectionContext.refreshConnection = function(...args: any) { - refreshConnectionCalled++; - refreshConnection.apply(this, args); - }; + const connectionContext = (receiver as any)["_context"]; + const refreshConnection = connectionContext.refreshConnection; + let refreshConnectionCalled = 0; + connectionContext.refreshConnection = function(...args: any) { + refreshConnectionCalled++; + refreshConnection.apply(this, args); + }; - // Simulate a disconnect being called with a non-retryable error. - (receiver as ServiceBusReceiverImpl)["_context"].connection["_connection"].idle(); + // Simulate a disconnect being called with a non-retryable error. + (receiver as ServiceBusReceiverImpl)["_context"].connection["_connection"].idle(); - // send a second message to trigger the message handler again. - await sender.sendMessages(TestMessage.getSample()); + // send a second message to trigger the message handler again. + await sender.sendMessages(TestMessage.getSample()); - // wait for the 2nd message to be received. - const messages2 = await (receiver as ServiceBusReceiver).receiveMessages(1); - for (const message of messages2) { - await receiver.completeMessage(message); - settledMessageCount++; + // wait for the 2nd message to be received. + const messages2 = await (receiver as ServiceBusReceiver).receiveMessages(1); + for (const message of messages2) { + await receiver.completeMessage(message); + settledMessageCount++; + } + settledMessageCount.should.equal(2, "Unexpected number of settled messages."); + refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called."); + } catch (error) { + console.log(error); + throw error; } - settledMessageCount.should.equal(2, "Unexpected number of settled messages."); - refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called."); }); it("returns messages if drain is in progress (receiveAndDelete)", async function(): Promise< From add3f0d87c2b28f1b9b02b52d72b9c3497dbfcfd Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Fri, 22 Jan 2021 11:22:49 -0800 Subject: [PATCH 03/11] package.json with debug variable --- sdk/servicebus/service-bus/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/package.json b/sdk/servicebus/service-bus/package.json index 34e0a6437d4f..aac1863ceb0f 100644 --- a/sdk/servicebus/service-bus/package.json +++ b/sdk/servicebus/service-bus/package.json @@ -62,7 +62,7 @@ "extract-api": "tsc -p . && api-extractor run --local", "format": "prettier --write --config ../../../.prettierrc.json --ignore-path ../../../.prettierignore \"samples/**/*.{ts,js}\" \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"", "integration-test:browser": "karma start --single-run", - "integration-test:node": "nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace dist-esm/test/*.spec.js dist-esm/test/**/*.spec.js", + "integration-test:node": "cross-env DEBUG=azure*,-azure:service-bus:administration*,-azure:service-bus:messages*,-azure:core-http*,-azure:service-bus:verbose,-azure:core-amqp*,-azure:service-bus:sender*,rhea:events,rhea:frames nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace dist-esm/test/*.spec.js dist-esm/test/**/*.spec.js", "integration-test": "npm run integration-test:node && npm run integration-test:browser", "lint:fix": "eslint package.json api-extractor.json src test --ext .ts --fix --fix-type [problem,suggestion]", "lint": "eslint package.json api-extractor.json src test --ext .ts -f html -o service-bus-lintReport.html || exit 0", From b498b63012f42dadbbe39b8ef829b3db6508be80 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Fri, 22 Jan 2021 11:24:12 -0800 Subject: [PATCH 04/11] Add debug logs for disconnect test --- .../service-bus/test/batchReceiver.spec.ts | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts index f44cd53c4d38..34957e08786c 100644 --- a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts @@ -21,6 +21,7 @@ import { import { ServiceBusReceivedMessage } from "../src/serviceBusMessage"; import { AbortController } from "@azure/abort-controller"; import { ReceiverEvents } from "rhea-promise"; +import debug from "debug"; const should = chai.should(); chai.use(chaiAsPromised); @@ -865,7 +866,14 @@ describe("Batching Receiver", () => { ); }); - for (let index = 0; index < 1000; index++) { + afterEach(async () => { + if (serviceBusClient) { + await serviceBusClient.test.afterEach(); + await serviceBusClient.test.after(); + } + }); + + for (let index = 0; index < 100; index++) { noSessionTestClientType = getRandomTestClientTypeWithNoSessions(); describe(`${index}. ${noSessionTestClientType}: Batch Receiver - disconnects`, function(): void { let serviceBusClient: ServiceBusClientForTests; @@ -888,13 +896,6 @@ describe("Batching Receiver", () => { ); } - afterEach(async () => { - if (serviceBusClient) { - await serviceBusClient.test.afterEach(); - await serviceBusClient.test.after(); - } - }); - it.only(`can receive and settle messages after a disconnect`, async function(): Promise< void > { @@ -921,6 +922,7 @@ describe("Batching Receiver", () => { connectionContext.refreshConnection = function(...args: any) { refreshConnectionCalled++; refreshConnection.apply(this, args); + debug.log("inside refreshConnection"); }; // Simulate a disconnect being called with a non-retryable error. @@ -928,11 +930,15 @@ describe("Batching Receiver", () => { // send a second message to trigger the message handler again. await sender.sendMessages(TestMessage.getSample()); + debug.log("after sendMessages"); + process.stderr.write("after sendMessages\n"); // wait for the 2nd message to be received. const messages2 = await (receiver as ServiceBusReceiver).receiveMessages(1); + debug.log("after receiveMessages"); for (const message of messages2) { await receiver.completeMessage(message); + debug.log("after completeMessage - 2"); settledMessageCount++; } settledMessageCount.should.equal(2, "Unexpected number of settled messages."); From 1c0c6100dfc654c67cca648af10c5ca3776da00a Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Fri, 22 Jan 2021 12:06:17 -0800 Subject: [PATCH 05/11] remove process.stderr since it fails in the browser --- sdk/servicebus/service-bus/test/batchReceiver.spec.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts index 34957e08786c..9e252ad05d1b 100644 --- a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts @@ -931,7 +931,6 @@ describe("Batching Receiver", () => { // send a second message to trigger the message handler again. await sender.sendMessages(TestMessage.getSample()); debug.log("after sendMessages"); - process.stderr.write("after sendMessages\n"); // wait for the 2nd message to be received. const messages2 = await (receiver as ServiceBusReceiver).receiveMessages(1); From d6f7a2af66574cb4ceaa41ff660548f75376c13d Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Fri, 22 Jan 2021 14:43:25 -0800 Subject: [PATCH 06/11] remove local declarations --- sdk/servicebus/service-bus/test/batchReceiver.spec.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts index 9e252ad05d1b..325e421095f8 100644 --- a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts @@ -876,10 +876,6 @@ describe("Batching Receiver", () => { for (let index = 0; index < 100; index++) { noSessionTestClientType = getRandomTestClientTypeWithNoSessions(); describe(`${index}. ${noSessionTestClientType}: Batch Receiver - disconnects`, function(): void { - let serviceBusClient: ServiceBusClientForTests; - let sender: ServiceBusSender; - let receiver: ServiceBusReceiver; - async function beforeEachTest( receiveMode: "peekLock" | "receiveAndDelete" = "peekLock" ): Promise { From 908dacab7abcc2d73fbd1617d90bf3eaa396e660 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Fri, 22 Jan 2021 16:40:54 -0800 Subject: [PATCH 07/11] refresh after onDetached --- .../service-bus/src/connectionContext.ts | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/sdk/servicebus/service-bus/src/connectionContext.ts b/sdk/servicebus/service-bus/src/connectionContext.ts index 7657d8f941e1..9e819e542947 100644 --- a/sdk/servicebus/service-bus/src/connectionContext.ts +++ b/sdk/servicebus/service-bus/src/connectionContext.ts @@ -325,7 +325,6 @@ export namespace ConnectionContext { // by cleaning up the timers and closing the links. // We don't call onDetached for sender after `refreshConnection()` // because any new send calls that potentially initialize links would also get affected if called later. - // TODO: do the same for batching receiver logger.verbose( `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` + `senders. We should not reconnect.` @@ -354,12 +353,9 @@ export namespace ConnectionContext { await Promise.all(detachCalls); } - await refreshConnection(connectionContext); - waitForConnectionRefreshResolve(); - waitForConnectionRefreshPromise = undefined; - // The connection should always be brought back up if the sdk did not call connection.close() - // and there was at least one receiver link on the connection before it went down. - logger.verbose("[%s] state: %O", connectionContext.connectionId, state); + // Calling onDetached on receiver for the same reasons as sender + // Batching receiver will be closed + // Streaming receiver will recover based on the logic at its own onDetached if (!state.wasConnectionCloseCalled && state.numReceivers) { logger.verbose( `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numReceivers} ` + @@ -396,6 +392,13 @@ export namespace ConnectionContext { await Promise.all(detachCalls); } + + await refreshConnection(connectionContext); + waitForConnectionRefreshResolve(); + waitForConnectionRefreshPromise = undefined; + // The connection should always be brought back up if the sdk did not call connection.close() + // and there was at least one receiver link on the connection before it went down. + logger.verbose("[%s] state: %O", connectionContext.connectionId, state); }; const protocolError: OnAmqpEvent = async (context: EventContext) => { From 08738a1861939b663579ab1c633bcd8419408de7 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Fri, 22 Jan 2021 16:42:51 -0800 Subject: [PATCH 08/11] leftover await --- sdk/servicebus/service-bus/src/core/batchingReceiver.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index f3203ec0c78f..45bbfee55aaf 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -93,7 +93,7 @@ export class BatchingReceiver extends MessageReceiver { ); } - await this._batchingReceiverLite.close(connectionError); + this._batchingReceiverLite.close(connectionError); } /** From ed1af4a73268177b1436d0603264f05d4ddcbc37 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Fri, 22 Jan 2021 17:05:41 -0800 Subject: [PATCH 09/11] retain streaming receiver logic --- .../service-bus/src/connectionContext.ts | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/service-bus/src/connectionContext.ts b/sdk/servicebus/service-bus/src/connectionContext.ts index 9e819e542947..ff98bc8335a1 100644 --- a/sdk/servicebus/service-bus/src/connectionContext.ts +++ b/sdk/servicebus/service-bus/src/connectionContext.ts @@ -353,9 +353,7 @@ export namespace ConnectionContext { await Promise.all(detachCalls); } - // Calling onDetached on receiver for the same reasons as sender - // Batching receiver will be closed - // Streaming receiver will recover based on the logic at its own onDetached + // Calling onDetached on batching receiver for the same reasons as sender if (!state.wasConnectionCloseCalled && state.numReceivers) { logger.verbose( `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numReceivers} ` + @@ -366,10 +364,12 @@ export namespace ConnectionContext { const detachCalls: Promise[] = []; // Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation - // and streaming receivers can decide whether to reconnect or not. for (const receiverName of Object.keys(connectionContext.messageReceivers)) { const receiver = connectionContext.messageReceivers[receiverName]; - if (receiver) { + if ( + receiver && + (receiver.receiverType == "batching" || receiver.receiverType == "session") + ) { logger.verbose( "[%s] calling detached on %s receiver '%s'.", connectionContext.connection.id, @@ -399,6 +399,43 @@ export namespace ConnectionContext { // The connection should always be brought back up if the sdk did not call connection.close() // and there was at least one receiver link on the connection before it went down. logger.verbose("[%s] state: %O", connectionContext.connectionId, state); + + // Calling onDetached on streaming receiver + if (!state.wasConnectionCloseCalled && state.numReceivers) { + logger.verbose( + `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numReceivers} ` + + `receivers. We should reconnect.` + ); + await delay(Constants.connectionReconnectDelay); + + const detachCalls: Promise[] = []; + + // Call onDetached() on streaming receivers can recover + for (const receiverName of Object.keys(connectionContext.messageReceivers)) { + const receiver = connectionContext.messageReceivers[receiverName]; + if (receiver && receiver.receiverType == "streaming") { + logger.verbose( + "[%s] calling detached on %s receiver '%s'.", + connectionContext.connection.id, + receiver.receiverType, + receiver.name + ); + detachCalls.push( + receiver.onDetached(connectionError || contextError).catch((err) => { + logger.logError( + err, + "[%s] An error occurred while calling onDetached() on the %s receiver '%s'", + connectionContext.connection.id, + receiver.receiverType, + receiver.name + ); + }) + ); + } + } + + await Promise.all(detachCalls); + } }; const protocolError: OnAmqpEvent = async (context: EventContext) => { From a27c34decde0822be366e7fd7dfb5212a6911cd9 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Fri, 22 Jan 2021 17:06:17 -0800 Subject: [PATCH 10/11] integration-test:node:no-debug --- sdk/servicebus/service-bus/package.json | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/servicebus/service-bus/package.json b/sdk/servicebus/service-bus/package.json index aac1863ceb0f..83dad1a25971 100644 --- a/sdk/servicebus/service-bus/package.json +++ b/sdk/servicebus/service-bus/package.json @@ -63,6 +63,7 @@ "format": "prettier --write --config ../../../.prettierrc.json --ignore-path ../../../.prettierignore \"samples/**/*.{ts,js}\" \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"", "integration-test:browser": "karma start --single-run", "integration-test:node": "cross-env DEBUG=azure*,-azure:service-bus:administration*,-azure:service-bus:messages*,-azure:core-http*,-azure:service-bus:verbose,-azure:core-amqp*,-azure:service-bus:sender*,rhea:events,rhea:frames nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace dist-esm/test/*.spec.js dist-esm/test/**/*.spec.js", + "integration-test:node:no-debug": "nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace dist-esm/test/*.spec.js dist-esm/test/**/*.spec.js", "integration-test": "npm run integration-test:node && npm run integration-test:browser", "lint:fix": "eslint package.json api-extractor.json src test --ext .ts --fix --fix-type [problem,suggestion]", "lint": "eslint package.json api-extractor.json src test --ext .ts -f html -o service-bus-lintReport.html || exit 0", @@ -70,6 +71,7 @@ "prebuild": "npm run clean", "test:browser": "npm run clean && npm run build:test:browser && npm run integration-test:browser", "test:node": "npm run clean && npm run build:test:node && npm run integration-test:node", + "test:node:no-debug": "npm run clean && npm run build:test:node && npm run integration-test:node:no-debug", "test": "npm run test:node && npm run test:browser", "unit-test:browser": "echo skipped", "unit-test:node": "npm run build:test:node && nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace dist-esm/test/internal/**/*.spec.js dist-esm/test/node/*.spec.js", From 27ae86a4e7d1a699235cda1dae596d996f5dbba3 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Fri, 22 Jan 2021 17:06:26 -0800 Subject: [PATCH 11/11] for sanity --- sdk/servicebus/service-bus/test/streamingReceiver.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts index b70defec35d7..75feec490fc8 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts @@ -862,7 +862,7 @@ describe("Streaming Receiver Tests", () => { ); }); -describe(testClientType + ": Streaming - disconnects", function(): void { +describe.only(testClientType + ": Streaming - disconnects", function(): void { let serviceBusClient: ServiceBusClientForTests; let sender: ServiceBusSender; let receiver: ServiceBusReceiver;