Skip to content

Commit 6630763

Browse files
[service-bus] init() refactor to propagate abortSignal support. (Azure#10578)
Uniting all the "link" related entities (mgmt link, receiver, sender). This PR unites all the links so they use the same code to open and close the underlying link. As part of this a few nice refactors came in: 1. All link related classes, including the mgmt link, now use the same init() code (which also means they can, if passed, handle abortSignal's when running) 2. open/close state has been moved into LinkEntity. 3. Boundaries between LinkEntity and the child classes is more clear.
1 parent 6f51415 commit 6630763

18 files changed

+875
-561
lines changed

sdk/servicebus/service-bus/src/core/batchingReceiver.ts

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,7 @@ import {
1313
Session
1414
} from "rhea-promise";
1515
import { InternalReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage";
16-
import {
17-
MessageReceiver,
18-
OnAmqpEventAsPromise,
19-
ReceiveOptions,
20-
ReceiverType
21-
} from "./messageReceiver";
16+
import { MessageReceiver, OnAmqpEventAsPromise, ReceiveOptions } from "./messageReceiver";
2217
import { ConnectionContext } from "../connectionContext";
2318
import { throwErrorIfConnectionClosed } from "../util/errors";
2419
import { AbortSignalLike } from "@azure/abort-controller";
@@ -41,7 +36,7 @@ export class BatchingReceiver extends MessageReceiver {
4136
* @param {ReceiveOptions} [options] Options for how you'd like to connect.
4237
*/
4338
constructor(context: ConnectionContext, protected _entityPath: string, options?: ReceiveOptions) {
44-
super(context, _entityPath, ReceiverType.batching, options);
39+
super(context, _entityPath, "br", options);
4540

4641
this._batchingReceiverLite = new BatchingReceiverLite(
4742
context,
@@ -69,7 +64,7 @@ export class BatchingReceiver extends MessageReceiver {
6964
throw lastError;
7065
}
7166

72-
return this._receiver;
67+
return this.link;
7368
},
7469
this.receiveMode
7570
);
@@ -87,8 +82,7 @@ export class BatchingReceiver extends MessageReceiver {
8782
* @returns {Promise<void>} Promise<void>.
8883
*/
8984
async onDetached(connectionError?: AmqpError | Error): Promise<void> {
90-
// Clears the token renewal timer. Closes the link and its session if they are open.
91-
await this._closeLink(this._receiver);
85+
await this.closeLink("linkonly");
9286

9387
if (connectionError == null) {
9488
connectionError = new Error(

0 commit comments

Comments
 (0)