diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 49306a29e0ef..3bb96e38c0f7 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -21,8 +21,7 @@ import { ErrorNameConditionMapper, RetryConfig, RetryOperationType, - Constants, - randomNumberFromInterval + Constants } from "@azure/core-amqp"; import { EventData, toAmqpMessage } from "./eventData"; import { ConnectionContext } from "./connectionContext"; @@ -414,16 +413,6 @@ export class EventHubSender extends LinkEntity { throw error; } - if (!this.isOpen()) { - log.sender( - "Acquiring lock %s for initializing the session, sender and " + - "possibly the connection.", - this.senderLock - ); - await defaultLock.acquire(this.senderLock, () => { - return this._init(); - }); - } log.sender( "[%s] Sender '%s', trying to send EventData[].", this._context.connectionId, @@ -512,13 +501,22 @@ export class EventHubSender extends LinkEntity { ): Promise { const abortSignal: AbortSignalLike | undefined = options.abortSignal; const sendEventPromise = () => - new Promise((resolve, reject) => { + new Promise(async (resolve, reject) => { + let waitTimer: any; + + let onRejected: Func; + let onReleased: Func; + let onModified: Func; + let onAccepted: Func; + let onAborted: () => void; + const rejectOnAbort = () => { const desc: string = - `[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` + - `address "${this.address}" has been cancelled by the user.`; + `[${this._context.connectionId}] The send operation on the Sender "${ + this.name + }" with ` + `address "${this.address}" has been cancelled by the user.`; log.error(desc); - reject(new AbortError("The send operation has been cancelled by the user.")); + return reject(new AbortError("The send operation has been cancelled by the user.")); }; if (abortSignal && abortSignal.aborted) { @@ -526,132 +524,148 @@ export class EventHubSender extends LinkEntity { return rejectOnAbort(); } - let waitTimer: any; - log.sender( - "[%s] Sender '%s', credit: %d available: %d", - this._context.connectionId, - this.name, - this._sender!.credit, - this._sender!.session.outgoing.available() - ); - if (this._sender!.sendable()) { + onAborted = () => { + removeListeners(); + rejectOnAbort(); + }; + + onAccepted = (context: EventContext) => { + // Since we will be adding listener for accepted and rejected event every time + // we send a message, we need to remove listener for both the events. + // This will ensure duplicate listeners are not added for the same event. + removeListeners(); log.sender( - "[%s] Sender '%s', sending message with id '%s'.", + "[%s] Sender '%s', got event accepted.", this._context.connectionId, this.name ); - let onRejected: Func; - let onReleased: Func; - let onModified: Func; - let onAccepted: Func; - let onAborted: () => void; + resolve(); + }; - const removeListeners = (): void => { - clearTimeout(waitTimer); - // When `removeListeners` is called on timeout, the sender might be closed and cleared - // So, check if it exists, before removing listeners from it. - if (abortSignal) { - abortSignal.removeEventListener("abort", onAborted); - } - if (this._sender) { - this._sender.removeListener(SenderEvents.rejected, onRejected); - this._sender.removeListener(SenderEvents.accepted, onAccepted); - this._sender.removeListener(SenderEvents.released, onReleased); - this._sender.removeListener(SenderEvents.modified, onModified); - } - }; + onRejected = (context: EventContext) => { + removeListeners(); + log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name); + const err = translate(context!.delivery!.remote_state!.error); + log.error(err); + reject(err); + }; - onAborted = () => { - removeListeners(); - rejectOnAbort(); - }; - onAccepted = (context: EventContext) => { - // Since we will be adding listener for accepted and rejected event every time - // we send a message, we need to remove listener for both the events. - // This will ensure duplicate listeners are not added for the same event. - removeListeners(); - log.sender( - "[%s] Sender '%s', got event accepted.", - this._context.connectionId, - this.name + onReleased = (context: EventContext) => { + removeListeners(); + log.error("[%s] Sender '%s', got event released.", this._context.connectionId, this.name); + let err: Error; + if (context!.delivery!.remote_state!.error) { + err = translate(context!.delivery!.remote_state!.error); + } else { + err = new Error( + `[${this._context.connectionId}] Sender '${this.name}', ` + + `received a release disposition.Hence we are rejecting the promise.` ); - resolve(); - }; - onRejected = (context: EventContext) => { - removeListeners(); - log.error( - "[%s] Sender '%s', got event rejected.", - this._context.connectionId, - this.name + } + log.error(err); + reject(err); + }; + + onModified = (context: EventContext) => { + removeListeners(); + log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name); + let err: Error; + if (context!.delivery!.remote_state!.error) { + err = translate(context!.delivery!.remote_state!.error); + } else { + err = new Error( + `[${this._context.connectionId}] Sender "${this.name}", ` + + `received a modified disposition.Hence we are rejecting the promise.` ); - const err = translate(context!.delivery!.remote_state!.error); - log.error(err); - reject(err); + } + log.error(err); + reject(err); + }; + + const removeListeners = (): void => { + clearTimeout(waitTimer); + // When `removeListeners` is called on timeout, the sender might be closed and cleared + // So, check if it exists, before removing listeners from it. + if (abortSignal) { + abortSignal.removeEventListener("abort", onAborted); + } + if (this._sender) { + this._sender.removeListener(SenderEvents.rejected, onRejected); + this._sender.removeListener(SenderEvents.accepted, onAccepted); + this._sender.removeListener(SenderEvents.released, onReleased); + this._sender.removeListener(SenderEvents.modified, onModified); + } + }; + + const actionAfterTimeout = () => { + removeListeners(); + const desc: string = + `[${this._context.connectionId}] Sender "${this.name}" with ` + + `address "${this.address}", was not able to send the message right now, due ` + + `to operation timeout.`; + log.error(desc); + const e: Error = { + name: "OperationTimeoutError", + message: desc }; - onReleased = (context: EventContext) => { - removeListeners(); - log.error( - "[%s] Sender '%s', got event released.", - this._context.connectionId, - this.name - ); - let err: Error; - if (context!.delivery!.remote_state!.error) { - err = translate(context!.delivery!.remote_state!.error); - } else { - err = new Error( - `[${this._context.connectionId}] Sender '${this.name}', ` + - `received a release disposition.Hence we are rejecting the promise.` - ); + return reject(translate(e)); + }; + + if (abortSignal) { + abortSignal.addEventListener("abort", onAborted); + } + + waitTimer = setTimeout( + actionAfterTimeout, + getRetryAttemptTimeoutInMs(options.retryOptions) + ); + + if (!this.isOpen()) { + log.sender( + "Acquiring lock %s for initializing the session, sender and " + + "possibly the connection.", + this.senderLock + ); + + try { + await defaultLock.acquire(this.senderLock, () => { + return this._init(); + }); + } catch (err) { + if (abortSignal) { + abortSignal.removeEventListener("abort", onAborted); } - log.error(err); - reject(err); - }; - onModified = (context: EventContext) => { - removeListeners(); + clearTimeout(waitTimer); + err = translate(err); log.error( - "[%s] Sender '%s', got event modified.", + "[%s] An error occurred while creating the sender %s", this._context.connectionId, - this.name + this.name, + err ); - let err: Error; - if (context!.delivery!.remote_state!.error) { - err = translate(context!.delivery!.remote_state!.error); - } else { - err = new Error( - `[${this._context.connectionId}] Sender "${this.name}", ` + - `received a modified disposition.Hence we are rejecting the promise.` - ); - } - log.error(err); - reject(err); - }; + return reject(err); + } + } - const actionAfterTimeout = () => { - removeListeners(); - const desc: string = - `[${this._context.connectionId}] Sender "${this.name}" with ` + - `address "${this.address}", was not able to send the message right now, due ` + - `to operation timeout.`; - log.error(desc); - const e: AmqpError = { - condition: ErrorNameConditionMapper.ServiceUnavailableError, - description: desc - }; - return reject(translate(e)); - }; + log.sender( + "[%s] Sender '%s', credit: %d available: %d", + this._context.connectionId, + this.name, + this._sender!.credit, + this._sender!.session.outgoing.available() + ); + if (this._sender!.sendable()) { + log.sender( + "[%s] Sender '%s', sending message with id '%s'.", + this._context.connectionId, + this.name + ); - if (abortSignal) { - abortSignal.addEventListener("abort", onAborted); - } this._sender!.on(SenderEvents.accepted, onAccepted); this._sender!.on(SenderEvents.rejected, onRejected); this._sender!.on(SenderEvents.modified, onModified); this._sender!.on(SenderEvents.released, onReleased); - waitTimer = setTimeout( - actionAfterTimeout, - getRetryAttemptTimeoutInMs(options.retryOptions) - ); + const delivery = this._sender!.send(message, undefined, 0x80013700); log.sender( "[%s] Sender '%s', sent message with delivery id: %d", @@ -673,7 +687,6 @@ export class EventHubSender extends LinkEntity { } }); - const jitterInSeconds = randomNumberFromInterval(1, 4); const maxRetries = options.retryOptions && options.retryOptions.maxRetries; const delayInSeconds = options.retryOptions && @@ -686,7 +699,7 @@ export class EventHubSender extends LinkEntity { connectionId: this._context.connectionId, operationType: RetryOperationType.sendMessage, maxRetries: maxRetries, - delayInSeconds: delayInSeconds + jitterInSeconds + delayInSeconds: delayInSeconds }; return retry(config); }