Skip to content
4 changes: 3 additions & 1 deletion sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@
"extract-api": "tsc -p . && api-extractor run --local",
"format": "prettier --write --config ../../../.prettierrc.json --ignore-path ../../../.prettierignore \"samples/**/*.{ts,js}\" \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"",
"integration-test:browser": "karma start --single-run",
"integration-test:node": "nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace dist-esm/test/*.spec.js dist-esm/test/**/*.spec.js",
"integration-test:node": "cross-env DEBUG=azure*,-azure:service-bus:administration*,-azure:service-bus:messages*,-azure:core-http*,-azure:service-bus:verbose,-azure:core-amqp*,-azure:service-bus:sender*,rhea:events,rhea:frames nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace dist-esm/test/*.spec.js dist-esm/test/**/*.spec.js",
"integration-test:node:no-debug": "nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace dist-esm/test/*.spec.js dist-esm/test/**/*.spec.js",
"integration-test": "npm run integration-test:node && npm run integration-test:browser",
"lint:fix": "eslint package.json api-extractor.json src test --ext .ts --fix --fix-type [problem,suggestion]",
"lint": "eslint package.json api-extractor.json src test --ext .ts -f html -o service-bus-lintReport.html || exit 0",
"pack": "npm pack 2>&1",
"prebuild": "npm run clean",
"test:browser": "npm run clean && npm run build:test:browser && npm run integration-test:browser",
"test:node": "npm run clean && npm run build:test:node && npm run integration-test:node",
"test:node:no-debug": "npm run clean && npm run build:test:node && npm run integration-test:node:no-debug",
"test": "npm run test:node && npm run test:browser",
"unit-test:browser": "echo skipped",
"unit-test:node": "npm run build:test:node && nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace dist-esm/test/internal/**/*.spec.js dist-esm/test/node/*.spec.js",
Expand Down
48 changes: 44 additions & 4 deletions sdk/servicebus/service-bus/src/connectionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ export namespace ConnectionContext {
// by cleaning up the timers and closing the links.
// We don't call onDetached for sender after `refreshConnection()`
// because any new send calls that potentially initialize links would also get affected if called later.
// TODO: do the same for batching receiver
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` +
`senders. We should not reconnect.`
Expand Down Expand Up @@ -354,12 +353,54 @@ export namespace ConnectionContext {
await Promise.all(detachCalls);
}

// Calling onDetached on batching receiver for the same reasons as sender
if (!state.wasConnectionCloseCalled && state.numReceivers) {
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numReceivers} ` +
`receivers. We should reconnect.`
);
await delay(Constants.connectionReconnectDelay);

const detachCalls: Promise<void>[] = [];

// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation
for (const receiverName of Object.keys(connectionContext.messageReceivers)) {
const receiver = connectionContext.messageReceivers[receiverName];
if (
receiver &&
(receiver.receiverType == "batching" || receiver.receiverType == "session")
) {
logger.verbose(
"[%s] calling detached on %s receiver '%s'.",
connectionContext.connection.id,
receiver.receiverType,
receiver.name
);
detachCalls.push(
receiver.onDetached(connectionError || contextError).catch((err) => {
logger.logError(
err,
"[%s] An error occurred while calling onDetached() on the %s receiver '%s'",
connectionContext.connection.id,
receiver.receiverType,
receiver.name
);
})
);
}
}

await Promise.all(detachCalls);
}

await refreshConnection(connectionContext);
waitForConnectionRefreshResolve();
waitForConnectionRefreshPromise = undefined;
// The connection should always be brought back up if the sdk did not call connection.close()
// and there was at least one receiver link on the connection before it went down.
logger.verbose("[%s] state: %O", connectionContext.connectionId, state);

// Calling onDetached on streaming receiver
if (!state.wasConnectionCloseCalled && state.numReceivers) {
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numReceivers} ` +
Expand All @@ -369,11 +410,10 @@ export namespace ConnectionContext {

const detachCalls: Promise<void>[] = [];

// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation
// and streaming receivers can decide whether to reconnect or not.
// Call onDetached() on streaming receivers can recover
for (const receiverName of Object.keys(connectionContext.messageReceivers)) {
const receiver = connectionContext.messageReceivers[receiverName];
if (receiver) {
if (receiver && receiver.receiverType == "streaming") {
logger.verbose(
"[%s] calling detached on %s receiver '%s'.",
connectionContext.connection.id,
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class BatchingReceiver extends MessageReceiver {
);
}

await this._batchingReceiverLite.close(connectionError);
this._batchingReceiverLite.close(connectionError);
}

/**
Expand Down
Loading