From 260b0e644d4ca9e41b5b428d83cb1e2b51a67186 Mon Sep 17 00:00:00 2001 From: ramya0820 <45977823+ramya0820@users.noreply.github.com> Date: Thu, 11 Jul 2019 15:56:24 -0700 Subject: [PATCH 01/22] [Event Hubs] Introduce timeoutInMs on RetryOptions (#4239) --- sdk/eventhub/event-hubs/src/eventHubClient.ts | 16 ++++++++++++++++ sdk/eventhub/event-hubs/src/eventHubSender.ts | 11 +++-------- sdk/eventhub/event-hubs/src/managementClient.ts | 10 +++------- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubClient.ts b/sdk/eventhub/event-hubs/src/eventHubClient.ts index 6218d4fd287a..60b2828d01da 100644 --- a/sdk/eventhub/event-hubs/src/eventHubClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubClient.ts @@ -35,6 +35,11 @@ export interface RetryOptions { * Number of milliseconds to wait between attempts. */ retryInterval?: number; + /** + * Number of milliseconds to wait before declaring that current attempt has timed out which will trigger a retry + * A minimum value of 60 seconds will be used if a value not greater than this is provided. + */ + timeoutInMs?: number; // /** // * The maximum value the `retryInterval` gets incremented exponentially between retries. // * Not applicable, when `isExponential` is set to `false`. @@ -47,6 +52,17 @@ export interface RetryOptions { // isExponential?: boolean; } +export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefined): number { + const timeoutInMs = + retryOptions == undefined || + typeof retryOptions.timeoutInMs !== "number" || + !isFinite(retryOptions.timeoutInMs) || + retryOptions.timeoutInMs < Constants.defaultOperationTimeoutInSeconds * 1000 + ? Constants.defaultOperationTimeoutInSeconds * 1000 + : retryOptions.timeoutInMs; + return timeoutInMs; +} + /** * The set of options to configure the behavior of an `EventHubProducer`. * These can be specified when creating the producer via the `createProducer` method. diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 4b78bb798cd1..de34ad5db3e7 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -30,6 +30,7 @@ import { ConnectionContext } from "./connectionContext"; import { LinkEntity } from "./linkEntity"; import { SendOptions, EventHubProducerOptions } from "./eventHubClient"; import { AbortSignalLike, AbortError } from "@azure/abort-controller"; +import { getRetryAttemptTimeoutInMs } from "./eventHubClient"; /** * @ignore @@ -464,12 +465,9 @@ export class EventHubSender extends LinkEntity { private _trySendBatch( message: AmqpMessage | Buffer, tag: any, - options?: SendOptions & EventHubProducerOptions, + options: SendOptions & EventHubProducerOptions = {}, format?: number ): Promise { - if (!options) { - options = {}; - } const abortSignal: AbortSignalLike | undefined = options.abortSignal; const sendEventPromise = () => @@ -610,10 +608,7 @@ export class EventHubSender extends LinkEntity { this._sender!.on(SenderEvents.rejected, onRejected); this._sender!.on(SenderEvents.modified, onModified); this._sender!.on(SenderEvents.released, onReleased); - waitTimer = setTimeout( - actionAfterTimeout, - Constants.defaultOperationTimeoutInSeconds * 1000 - ); + waitTimer = setTimeout(actionAfterTimeout, getRetryAttemptTimeoutInMs(options.retryOptions)); const delivery = this._sender!.send(message, tag, 0x80013700); log.sender( "[%s] Sender '%s', sent message with delivery id: %d and tag: %s", diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index e0bbc4ea9758..26e615be262b 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -20,7 +20,7 @@ import { import { ConnectionContext } from "./connectionContext"; import { LinkEntity } from "./linkEntity"; import * as log from "./log"; -import { RetryOptions } from "./eventHubClient"; +import { RetryOptions, getRetryAttemptTimeoutInMs } from "./eventHubClient"; import { AbortSignalLike } from "@azure/abort-controller"; /** * Describes the runtime information of an Event Hub. @@ -303,12 +303,7 @@ export class ManagementClient extends LinkEntity { */ private async _makeManagementRequest( request: Message, - options?: { - retryOptions?: RetryOptions; - timeout?: number; - abortSignal?: AbortSignalLike; - requestName?: string; - } + options?: { retryOptions?: RetryOptions; abortSignal?: AbortSignalLike; requestName?: string } ): Promise { try { log.mgmt( @@ -327,6 +322,7 @@ export class ManagementClient extends LinkEntity { maxRetries: options.retryOptions && options.retryOptions.maxRetries, abortSignal: options.abortSignal, requestName: options.requestName, + timeoutInSeconds: getRetryAttemptTimeoutInMs(options.retryOptions) / 1000, delayInSeconds: options.retryOptions && options.retryOptions.retryInterval && From 4916ce389f1cec1cd8dc910cd8a04242f237e59f Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Mon, 15 Jul 2019 14:02:07 -0700 Subject: [PATCH 02/22] Move init() to within sendOperation() --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 51 +++++++++++++++---- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index de34ad5db3e7..2acbd29ea331 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -369,16 +369,7 @@ export class EventHubSender extends LinkEntity { ); throw error; } - if (!this.isOpen()) { - log.sender( - "Acquiring lock %s for initializing the session, sender and " + - "possibly the connection.", - this.senderLock - ); - await defaultLock.acquire(this.senderLock, () => { - return this._init(); - }); - } + log.sender( "[%s] Sender '%s', trying to send EventData[].", this._context.connectionId, @@ -631,6 +622,44 @@ export class EventHubSender extends LinkEntity { } }); + const sendOperationPromise = () => + new Promise((resolve, reject) => { + const rejectOnSendError = (err: Error) => { + err = translate(err); + log.error( + "[%s] An error occurred while sending %s", + this._context.connectionId, + this.name, + err + ); + reject(err); + }; + + if (!this.isOpen()) { + log.sender( + "Acquiring lock %s for initializing the session, sender and " + + "possibly the connection.", + this.senderLock + ); + defaultLock + .acquire(this.senderLock, () => { + return this._init(); + }) + .then(() => { + sendEventPromise() + .then(() => { + resolve(); + }) + .catch((err: Error) => { + rejectOnSendError(err); + }); + }) + .catch((err: Error) => { + rejectOnSendError(err); + }); + } + }); + const jitterInSeconds = randomNumberFromInterval(1, 4); const maxRetries = options.retryOptions && options.retryOptions.maxRetries; const delayInSeconds = @@ -640,7 +669,7 @@ export class EventHubSender extends LinkEntity { ? options.retryOptions.retryInterval / 1000 : Constants.defaultDelayBetweenOperationRetriesInSeconds; const config: RetryConfig = { - operation: sendEventPromise, + operation: sendOperationPromise, connectionId: this._context.connectionId, operationType: RetryOperationType.sendMessage, maxRetries: maxRetries, From af5d8c4b9cfaf36ec1ac11f1794518ffd2b083fe Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Mon, 15 Jul 2019 14:12:24 -0700 Subject: [PATCH 03/22] Update operation timeout error in send() to throw the newly defined OperationTimeoutError --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 2acbd29ea331..3a3ba269ba54 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -585,9 +585,9 @@ export class EventHubSender extends LinkEntity { `address "${this.address}", was not able to send the message right now, due ` + `to operation timeout.`; log.error(desc); - const e: AmqpError = { - condition: ErrorNameConditionMapper.ServiceUnavailableError, - description: desc + const e: Error = { + name: "OperationTimeoutError", + message: desc }; return reject(translate(e)); }; From a66272aa551e67d4bb4151d3cef29a21c30e03ff Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Mon, 15 Jul 2019 14:15:30 -0700 Subject: [PATCH 04/22] Update log message --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 3a3ba269ba54..afc8cb763d9a 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -627,7 +627,7 @@ export class EventHubSender extends LinkEntity { const rejectOnSendError = (err: Error) => { err = translate(err); log.error( - "[%s] An error occurred while sending %s", + "[%s] An error occurred while performing send on %s", this._context.connectionId, this.name, err From a74824dd6396671681dc52507338261616f9621f Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Mon, 15 Jul 2019 16:27:40 -0700 Subject: [PATCH 05/22] Fix operation timeout handling --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 290 +++++++++--------- 1 file changed, 144 insertions(+), 146 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index afc8cb763d9a..a28d8729d494 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -459,171 +459,97 @@ export class EventHubSender extends LinkEntity { options: SendOptions & EventHubProducerOptions = {}, format?: number ): Promise { - const abortSignal: AbortSignalLike | undefined = options.abortSignal; - const sendEventPromise = () => + + const sendOperationPromise = () => new Promise((resolve, reject) => { + let waitTimer: any; + + let onRejected: Func; + let onReleased: Func; + let onModified: Func; + let onAccepted: Func; + let onAborted: () => void; + const rejectOnAbort = () => { const desc: string = - `[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` + - `address "${this.address}" has been cancelled by the user.`; + `[${this._context.connectionId}] The send operation on the Sender "${ + this.name + }" with ` + `address "${this.address}" has been cancelled by the user.`; log.error(desc); reject(new AbortError("The send operation has been cancelled by the user.")); }; - if (abortSignal && abortSignal.aborted) { - // operation has been cancelled, so exit quickly - return rejectOnAbort(); - } + onAborted = () => { + removeListeners(); + rejectOnAbort(); + }; - let waitTimer: any; - log.sender( - "[%s] Sender '%s', credit: %d available: %d", - this._context.connectionId, - this.name, - this._sender!.credit, - this._sender!.session.outgoing.available() - ); - if (this._sender!.sendable()) { + onAccepted = (context: EventContext) => { + // Since we will be adding listener for accepted and rejected event every time + // we send a message, we need to remove listener for both the events. + // This will ensure duplicate listeners are not added for the same event. + removeListeners(); log.sender( - "[%s] Sender '%s', sending message with id '%s'.", + "[%s] Sender '%s', got event accepted.", this._context.connectionId, - this.name, - (Buffer.isBuffer(message) ? tag : message.message_id) || tag || "" + this.name ); - let onRejected: Func; - let onReleased: Func; - let onModified: Func; - let onAccepted: Func; - let onAborted: () => void; - - const removeListeners = (): void => { - clearTimeout(waitTimer); - // When `removeListeners` is called on timeout, the sender might be closed and cleared - // So, check if it exists, before removing listeners from it. - if (abortSignal) { - abortSignal.removeEventListener("abort", onAborted); - } - if (this._sender) { - this._sender.removeListener(SenderEvents.rejected, onRejected); - this._sender.removeListener(SenderEvents.accepted, onAccepted); - this._sender.removeListener(SenderEvents.released, onReleased); - this._sender.removeListener(SenderEvents.modified, onModified); - } - }; + resolve(); + }; - onAborted = () => { - removeListeners(); - rejectOnAbort(); - }; - onAccepted = (context: EventContext) => { - // Since we will be adding listener for accepted and rejected event every time - // we send a message, we need to remove listener for both the events. - // This will ensure duplicate listeners are not added for the same event. - removeListeners(); - log.sender( - "[%s] Sender '%s', got event accepted.", - this._context.connectionId, - this.name - ); - resolve(); - }; - onRejected = (context: EventContext) => { - removeListeners(); - log.error( - "[%s] Sender '%s', got event rejected.", - this._context.connectionId, - this.name - ); - const err = translate(context!.delivery!.remote_state!.error); - log.error(err); - reject(err); - }; - onReleased = (context: EventContext) => { - removeListeners(); - log.error( - "[%s] Sender '%s', got event released.", - this._context.connectionId, - this.name + onRejected = (context: EventContext) => { + removeListeners(); + log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name); + const err = translate(context!.delivery!.remote_state!.error); + log.error(err); + reject(err); + }; + onReleased = (context: EventContext) => { + removeListeners(); + log.error("[%s] Sender '%s', got event released.", this._context.connectionId, this.name); + let err: Error; + if (context!.delivery!.remote_state!.error) { + err = translate(context!.delivery!.remote_state!.error); + } else { + err = new Error( + `[${this._context.connectionId}] Sender '${this.name}', ` + + `received a release disposition.Hence we are rejecting the promise.` ); - let err: Error; - if (context!.delivery!.remote_state!.error) { - err = translate(context!.delivery!.remote_state!.error); - } else { - err = new Error( - `[${this._context.connectionId}] Sender '${this.name}', ` + - `received a release disposition.Hence we are rejecting the promise.` - ); - } - log.error(err); - reject(err); - }; - onModified = (context: EventContext) => { - removeListeners(); - log.error( - "[%s] Sender '%s', got event modified.", - this._context.connectionId, - this.name + } + log.error(err); + reject(err); + }; + onModified = (context: EventContext) => { + removeListeners(); + log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name); + let err: Error; + if (context!.delivery!.remote_state!.error) { + err = translate(context!.delivery!.remote_state!.error); + } else { + err = new Error( + `[${this._context.connectionId}] Sender "${this.name}", ` + + `received a modified disposition.Hence we are rejecting the promise.` ); - let err: Error; - if (context!.delivery!.remote_state!.error) { - err = translate(context!.delivery!.remote_state!.error); - } else { - err = new Error( - `[${this._context.connectionId}] Sender "${this.name}", ` + - `received a modified disposition.Hence we are rejecting the promise.` - ); - } - log.error(err); - reject(err); - }; - - const actionAfterTimeout = () => { - removeListeners(); - const desc: string = - `[${this._context.connectionId}] Sender "${this.name}" with ` + - `address "${this.address}", was not able to send the message right now, due ` + - `to operation timeout.`; - log.error(desc); - const e: Error = { - name: "OperationTimeoutError", - message: desc - }; - return reject(translate(e)); - }; + } + log.error(err); + reject(err); + }; + const removeListeners = (): void => { + // When `removeListeners` is called on timeout, the sender might be closed and cleared + // So, check if it exists, before removing listeners from it. if (abortSignal) { - abortSignal.addEventListener("abort", onAborted); + abortSignal.removeEventListener("abort", onAborted); } - this._sender!.on(SenderEvents.accepted, onAccepted); - this._sender!.on(SenderEvents.rejected, onRejected); - this._sender!.on(SenderEvents.modified, onModified); - this._sender!.on(SenderEvents.released, onReleased); - waitTimer = setTimeout(actionAfterTimeout, getRetryAttemptTimeoutInMs(options.retryOptions)); - const delivery = this._sender!.send(message, tag, 0x80013700); - log.sender( - "[%s] Sender '%s', sent message with delivery id: %d and tag: %s", - this._context.connectionId, - this.name, - delivery.id, - delivery.tag.toString() - ); - } else { - // let us retry to send the message after some time. - const msg = - `[${this._context.connectionId}] Sender "${this.name}", ` + - `cannot send the message right now. Please try later.`; - log.error(msg); - const amqpError: AmqpError = { - condition: ErrorNameConditionMapper.SenderBusyError, - description: msg - }; - reject(translate(amqpError)); - } - }); + if (this._sender) { + this._sender.removeListener(SenderEvents.rejected, onRejected); + this._sender.removeListener(SenderEvents.accepted, onAccepted); + this._sender.removeListener(SenderEvents.released, onReleased); + this._sender.removeListener(SenderEvents.modified, onModified); + } + }; - const sendOperationPromise = () => - new Promise((resolve, reject) => { const rejectOnSendError = (err: Error) => { err = translate(err); log.error( @@ -635,6 +561,78 @@ export class EventHubSender extends LinkEntity { reject(err); }; + const actionAfterTimeout = () => { + clearTimeout(waitTimer); + removeListeners(); + const desc: string = + `[${this._context.connectionId}] Sender "${this.name}" with ` + + `address "${this.address}", was not able to send the message right now, due ` + + `to operation timeout.`; + log.error(desc); + const e: Error = { + name: "OperationTimeoutError", + message: desc + }; + return reject(translate(e)); + }; + + const sendEventPromise = () => + new Promise((resolve, reject) => { + if (abortSignal && abortSignal.aborted) { + // operation has been cancelled, so exit quickly + return rejectOnAbort(); + } + + log.sender( + "[%s] Sender '%s', credit: %d available: %d", + this._context.connectionId, + this.name, + this._sender!.credit, + this._sender!.session.outgoing.available() + ); + if (this._sender!.sendable()) { + log.sender( + "[%s] Sender '%s', sending message with id '%s'.", + this._context.connectionId, + this.name, + (Buffer.isBuffer(message) ? tag : message.message_id) || tag || "" + ); + + if (abortSignal) { + abortSignal.addEventListener("abort", onAborted); + } + this._sender!.on(SenderEvents.accepted, onAccepted); + this._sender!.on(SenderEvents.rejected, onRejected); + this._sender!.on(SenderEvents.modified, onModified); + this._sender!.on(SenderEvents.released, onReleased); + + const delivery = this._sender!.send(message, tag, 0x80013700); + log.sender( + "[%s] Sender '%s', sent message with delivery id: %d and tag: %s", + this._context.connectionId, + this.name, + delivery.id, + delivery.tag.toString() + ); + } else { + // let us retry to send the message after some time. + const msg = + `[${this._context.connectionId}] Sender "${this.name}", ` + + `cannot send the message right now. Please try later.`; + log.error(msg); + const amqpError: AmqpError = { + condition: ErrorNameConditionMapper.SenderBusyError, + description: msg + }; + reject(translate(amqpError)); + } + }); + + waitTimer = setTimeout( + actionAfterTimeout, + getRetryAttemptTimeoutInMs(options.retryOptions) + ); + if (!this.isOpen()) { log.sender( "Acquiring lock %s for initializing the session, sender and " + From 0631cebf109d53393bdc13aec27e98d3611ee582 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Mon, 15 Jul 2019 18:05:39 -0700 Subject: [PATCH 06/22] Fix timer clearance --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 1901bdac6a6b..ed32498d6f67 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -576,6 +576,7 @@ export class EventHubSender extends LinkEntity { }; const removeListeners = (): void => { + clearTimeout(waitTimer); // When `removeListeners` is called on timeout, the sender might be closed and cleared // So, check if it exists, before removing listeners from it. if (abortSignal) { @@ -601,7 +602,6 @@ export class EventHubSender extends LinkEntity { }; const actionAfterTimeout = () => { - clearTimeout(waitTimer); removeListeners(); const desc: string = `[${this._context.connectionId}] Sender "${this.name}" with ` + From 60fa5ae47f88c37c046e94627ad48dea62b0c58b Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Mon, 15 Jul 2019 18:17:25 -0700 Subject: [PATCH 07/22] Simplify promise handling using async await --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 35 ++++--------------- 1 file changed, 7 insertions(+), 28 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index ed32498d6f67..64a47dbffaaa 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -501,7 +501,7 @@ export class EventHubSender extends LinkEntity { const abortSignal: AbortSignalLike | undefined = options.abortSignal; const sendOperationPromise = () => - new Promise((resolve, reject) => { + new Promise(async (resolve, reject) => { let waitTimer: any; let onRejected: Func; @@ -590,17 +590,6 @@ export class EventHubSender extends LinkEntity { } }; - const rejectOnSendError = (err: Error) => { - err = translate(err); - log.error( - "[%s] An error occurred while performing send on %s", - this._context.connectionId, - this.name, - err - ); - reject(err); - }; - const actionAfterTimeout = () => { removeListeners(); const desc: string = @@ -678,23 +667,13 @@ export class EventHubSender extends LinkEntity { "possibly the connection.", this.senderLock ); - defaultLock - .acquire(this.senderLock, () => { - return this._init(); - }) - .then(() => { - sendEventPromise() - .then(() => { - resolve(); - }) - .catch((err: Error) => { - rejectOnSendError(err); - }); - }) - .catch((err: Error) => { - rejectOnSendError(err); - }); + + await defaultLock.acquire(this.senderLock, () => { + return this._init(); + }); } + + await sendEventPromise(); }); const jitterInSeconds = randomNumberFromInterval(1, 4); From 3168339919bba32d5fd181bed111668851347e68 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Mon, 15 Jul 2019 18:40:44 -0700 Subject: [PATCH 08/22] Fix few merge errors --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 64a47dbffaaa..1155bcf466cd 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -403,7 +403,9 @@ export class EventHubSender extends LinkEntity { if (events instanceof EventDataBatch && options && options.partitionKey) { // throw an error if partition key is different than the one provided in the options. - const error = new Error("Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead."); + const error = new Error( + "Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead." + ); log.error( "[%s] Partition key is not supported when using createBatch(). %O", this._context.connectionId, @@ -622,8 +624,7 @@ export class EventHubSender extends LinkEntity { log.sender( "[%s] Sender '%s', sending message with id '%s'.", this._context.connectionId, - this.name, - (Buffer.isBuffer(message) ? tag : message.message_id) || tag || "" + this.name ); if (abortSignal) { @@ -634,13 +635,12 @@ export class EventHubSender extends LinkEntity { this._sender!.on(SenderEvents.modified, onModified); this._sender!.on(SenderEvents.released, onReleased); - const delivery = this._sender!.send(message, tag, 0x80013700); + const delivery = this._sender!.send(message, 0x80013700); log.sender( - "[%s] Sender '%s', sent message with delivery id: %d and tag: %s", + "[%s] Sender '%s', sent message with delivery id: %d", this._context.connectionId, this.name, - delivery.id, - delivery.tag.toString() + delivery.id ); } else { // let us retry to send the message after some time. From b449ceff097f1f830f6fcef1623978eacff87fc0 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Tue, 16 Jul 2019 11:11:55 -0700 Subject: [PATCH 09/22] Simplify changes --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 295 +++++++++--------- 1 file changed, 147 insertions(+), 148 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 1155bcf466cd..079316028339 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -500,180 +500,179 @@ export class EventHubSender extends LinkEntity { message: AmqpMessage | Buffer, options: SendOptions & EventHubProducerOptions = {} ): Promise { - const abortSignal: AbortSignalLike | undefined = options.abortSignal; - const sendOperationPromise = () => + const abortSignal: AbortSignalLike | undefined = options.abortSignal; + const sendEventPromise = () => new Promise(async (resolve, reject) => { - let waitTimer: any; - - let onRejected: Func; - let onReleased: Func; - let onModified: Func; - let onAccepted: Func; - let onAborted: () => void; - const rejectOnAbort = () => { const desc: string = - `[${this._context.connectionId}] The send operation on the Sender "${ - this.name - }" with ` + `address "${this.address}" has been cancelled by the user.`; + `[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` + + `address "${this.address}" has been cancelled by the user.`; log.error(desc); reject(new AbortError("The send operation has been cancelled by the user.")); }; - onAborted = () => { - removeListeners(); - rejectOnAbort(); - }; + if (abortSignal && abortSignal.aborted) { + // operation has been cancelled, so exit quickly + return rejectOnAbort(); + } - onAccepted = (context: EventContext) => { - // Since we will be adding listener for accepted and rejected event every time - // we send a message, we need to remove listener for both the events. - // This will ensure duplicate listeners are not added for the same event. - removeListeners(); + let waitTimer: any; + + if (!this.isOpen()) { log.sender( - "[%s] Sender '%s', got event accepted.", - this._context.connectionId, - this.name + "Acquiring lock %s for initializing the session, sender and " + + "possibly the connection.", + this.senderLock ); - resolve(); - }; - - onRejected = (context: EventContext) => { - removeListeners(); - log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name); - const err = translate(context!.delivery!.remote_state!.error); - log.error(err); - reject(err); - }; - onReleased = (context: EventContext) => { - removeListeners(); - log.error("[%s] Sender '%s', got event released.", this._context.connectionId, this.name); - let err: Error; - if (context!.delivery!.remote_state!.error) { - err = translate(context!.delivery!.remote_state!.error); - } else { - err = new Error( - `[${this._context.connectionId}] Sender '${this.name}', ` + - `received a release disposition.Hence we are rejecting the promise.` - ); - } - log.error(err); - reject(err); - }; - onModified = (context: EventContext) => { - removeListeners(); - log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name); - let err: Error; - if (context!.delivery!.remote_state!.error) { - err = translate(context!.delivery!.remote_state!.error); - } else { - err = new Error( - `[${this._context.connectionId}] Sender "${this.name}", ` + - `received a modified disposition.Hence we are rejecting the promise.` - ); - } - log.error(err); - reject(err); - }; - const removeListeners = (): void => { - clearTimeout(waitTimer); - // When `removeListeners` is called on timeout, the sender might be closed and cleared - // So, check if it exists, before removing listeners from it. - if (abortSignal) { - abortSignal.removeEventListener("abort", onAborted); - } - if (this._sender) { - this._sender.removeListener(SenderEvents.rejected, onRejected); - this._sender.removeListener(SenderEvents.accepted, onAccepted); - this._sender.removeListener(SenderEvents.released, onReleased); - this._sender.removeListener(SenderEvents.modified, onModified); - } - }; - - const actionAfterTimeout = () => { - removeListeners(); - const desc: string = - `[${this._context.connectionId}] Sender "${this.name}" with ` + - `address "${this.address}", was not able to send the message right now, due ` + - `to operation timeout.`; - log.error(desc); - const e: Error = { - name: "OperationTimeoutError", - message: desc - }; - return reject(translate(e)); - }; + await defaultLock.acquire(this.senderLock, () => { + return this._init(); + }); + } - const sendEventPromise = () => - new Promise((resolve, reject) => { - if (abortSignal && abortSignal.aborted) { - // operation has been cancelled, so exit quickly - return rejectOnAbort(); + log.sender( + "[%s] Sender '%s', credit: %d available: %d", + this._context.connectionId, + this.name, + this._sender!.credit, + this._sender!.session.outgoing.available() + ); + if (this._sender!.sendable()) { + log.sender( + "[%s] Sender '%s', sending message with id '%s'.", + this._context.connectionId, + this.name, + (Buffer.isBuffer(message) ? tag : message.message_id) || tag || "" + ); + let onRejected: Func; + let onReleased: Func; + let onModified: Func; + let onAccepted: Func; + let onAborted: () => void; + + const removeListeners = (): void => { + clearTimeout(waitTimer); + // When `removeListeners` is called on timeout, the sender might be closed and cleared + // So, check if it exists, before removing listeners from it. + if (abortSignal) { + abortSignal.removeEventListener("abort", onAborted); } + if (this._sender) { + this._sender.removeListener(SenderEvents.rejected, onRejected); + this._sender.removeListener(SenderEvents.accepted, onAccepted); + this._sender.removeListener(SenderEvents.released, onReleased); + this._sender.removeListener(SenderEvents.modified, onModified); + } + }; + onAborted = () => { + removeListeners(); + rejectOnAbort(); + }; + onAccepted = (context: EventContext) => { + // Since we will be adding listener for accepted and rejected event every time + // we send a message, we need to remove listener for both the events. + // This will ensure duplicate listeners are not added for the same event. + removeListeners(); log.sender( - "[%s] Sender '%s', credit: %d available: %d", + "[%s] Sender '%s', got event accepted.", this._context.connectionId, - this.name, - this._sender!.credit, - this._sender!.session.outgoing.available() + this.name ); - if (this._sender!.sendable()) { - log.sender( - "[%s] Sender '%s', sending message with id '%s'.", - this._context.connectionId, - this.name - ); - - if (abortSignal) { - abortSignal.addEventListener("abort", onAborted); - } - this._sender!.on(SenderEvents.accepted, onAccepted); - this._sender!.on(SenderEvents.rejected, onRejected); - this._sender!.on(SenderEvents.modified, onModified); - this._sender!.on(SenderEvents.released, onReleased); - - const delivery = this._sender!.send(message, 0x80013700); - log.sender( - "[%s] Sender '%s', sent message with delivery id: %d", - this._context.connectionId, - this.name, - delivery.id + resolve(); + }; + onRejected = (context: EventContext) => { + removeListeners(); + log.error( + "[%s] Sender '%s', got event rejected.", + this._context.connectionId, + this.name + ); + const err = translate(context!.delivery!.remote_state!.error); + log.error(err); + reject(err); + }; + onReleased = (context: EventContext) => { + removeListeners(); + log.error( + "[%s] Sender '%s', got event released.", + this._context.connectionId, + this.name + ); + let err: Error; + if (context!.delivery!.remote_state!.error) { + err = translate(context!.delivery!.remote_state!.error); + } else { + err = new Error( + `[${this._context.connectionId}] Sender '${this.name}', ` + + `received a release disposition.Hence we are rejecting the promise.` ); + } + log.error(err); + reject(err); + }; + onModified = (context: EventContext) => { + removeListeners(); + log.error( + "[%s] Sender '%s', got event modified.", + this._context.connectionId, + this.name + ); + let err: Error; + if (context!.delivery!.remote_state!.error) { + err = translate(context!.delivery!.remote_state!.error); } else { - // let us retry to send the message after some time. - const msg = + err = new Error( `[${this._context.connectionId}] Sender "${this.name}", ` + - `cannot send the message right now. Please try later.`; - log.error(msg); - const amqpError: AmqpError = { - condition: ErrorNameConditionMapper.SenderBusyError, - description: msg - }; - reject(translate(amqpError)); + `received a modified disposition.Hence we are rejecting the promise.` + ); } - }); + log.error(err); + reject(err); + }; - waitTimer = setTimeout( - actionAfterTimeout, - getRetryAttemptTimeoutInMs(options.retryOptions) - ); + const actionAfterTimeout = () => { + removeListeners(); + const desc: string = + `[${this._context.connectionId}] Sender "${this.name}" with ` + + `address "${this.address}", was not able to send the message right now, due ` + + `to operation timeout.`; + log.error(desc); + const e: AmqpError = { + condition: ErrorNameConditionMapper.ServiceUnavailableError, + description: desc + }; + return reject(translate(e)); + }; - if (!this.isOpen()) { + if (abortSignal) { + abortSignal.addEventListener("abort", onAborted); + } + this._sender!.on(SenderEvents.accepted, onAccepted); + this._sender!.on(SenderEvents.rejected, onRejected); + this._sender!.on(SenderEvents.modified, onModified); + this._sender!.on(SenderEvents.released, onReleased); + waitTimer = setTimeout(actionAfterTimeout, getRetryAttemptTimeoutInMs(options.retryOptions)); + const delivery = this._sender!.send(message, 0x80013700); log.sender( - "Acquiring lock %s for initializing the session, sender and " + - "possibly the connection.", - this.senderLock + "[%s] Sender '%s', sent message with delivery id: %d and tag: %s", + this._context.connectionId, + this.name, + delivery.id ); - - await defaultLock.acquire(this.senderLock, () => { - return this._init(); - }); + } else { + // let us retry to send the message after some time. + const msg = + `[${this._context.connectionId}] Sender "${this.name}", ` + + `cannot send the message right now. Please try later.`; + log.error(msg); + const amqpError: AmqpError = { + condition: ErrorNameConditionMapper.SenderBusyError, + description: msg + }; + reject(translate(amqpError)); } - - await sendEventPromise(); }); const jitterInSeconds = randomNumberFromInterval(1, 4); @@ -685,7 +684,7 @@ export class EventHubSender extends LinkEntity { ? options.retryOptions.retryInterval / 1000 : Constants.defaultDelayBetweenOperationRetriesInSeconds; const config: RetryConfig = { - operation: sendOperationPromise, + operation: sendEventPromise, connectionId: this._context.connectionId, operationType: RetryOperationType.sendMessage, maxRetries: maxRetries, From e0c8e926eed01109f5a319f6776df7a1b1e881bf Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Tue, 16 Jul 2019 11:23:52 -0700 Subject: [PATCH 10/22] Undo prettier changes, fix merge --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 079316028339..d97baaa83683 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -403,9 +403,7 @@ export class EventHubSender extends LinkEntity { if (events instanceof EventDataBatch && options && options.partitionKey) { // throw an error if partition key is different than the one provided in the options. - const error = new Error( - "Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead." - ); + const error = new Error("Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead."); log.error( "[%s] Partition key is not supported when using createBatch(). %O", this._context.connectionId, @@ -542,8 +540,7 @@ export class EventHubSender extends LinkEntity { log.sender( "[%s] Sender '%s', sending message with id '%s'.", this._context.connectionId, - this.name, - (Buffer.isBuffer(message) ? tag : message.message_id) || tag || "" + this.name ); let onRejected: Func; let onReleased: Func; @@ -606,7 +603,7 @@ export class EventHubSender extends LinkEntity { } else { err = new Error( `[${this._context.connectionId}] Sender '${this.name}', ` + - `received a release disposition.Hence we are rejecting the promise.` + `received a release disposition.Hence we are rejecting the promise.` ); } log.error(err); @@ -625,7 +622,7 @@ export class EventHubSender extends LinkEntity { } else { err = new Error( `[${this._context.connectionId}] Sender "${this.name}", ` + - `received a modified disposition.Hence we are rejecting the promise.` + `received a modified disposition.Hence we are rejecting the promise.` ); } log.error(err); @@ -654,9 +651,9 @@ export class EventHubSender extends LinkEntity { this._sender!.on(SenderEvents.modified, onModified); this._sender!.on(SenderEvents.released, onReleased); waitTimer = setTimeout(actionAfterTimeout, getRetryAttemptTimeoutInMs(options.retryOptions)); - const delivery = this._sender!.send(message, 0x80013700); + const delivery = this._sender!.send(message, undefined, 0x80013700); log.sender( - "[%s] Sender '%s', sent message with delivery id: %d and tag: %s", + "[%s] Sender '%s', sent message with delivery id: %d", this._context.connectionId, this.name, delivery.id From 85c2ffcc2c12d17517e5eefb519468e0689fb0f5 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Tue, 16 Jul 2019 13:57:06 -0700 Subject: [PATCH 11/22] Nit: whitespaces --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index d97baaa83683..ad3a1310650e 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -603,7 +603,7 @@ export class EventHubSender extends LinkEntity { } else { err = new Error( `[${this._context.connectionId}] Sender '${this.name}', ` + - `received a release disposition.Hence we are rejecting the promise.` + `received a release disposition.Hence we are rejecting the promise.` ); } log.error(err); @@ -622,7 +622,7 @@ export class EventHubSender extends LinkEntity { } else { err = new Error( `[${this._context.connectionId}] Sender "${this.name}", ` + - `received a modified disposition.Hence we are rejecting the promise.` + `received a modified disposition.Hence we are rejecting the promise.` ); } log.error(err); From 1212ae76f8a5d632564f08e283bbaadd1fce614c Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Tue, 16 Jul 2019 15:28:14 -0700 Subject: [PATCH 12/22] Fix code move around and error name --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 197 +++++++++--------- 1 file changed, 95 insertions(+), 102 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index ad3a1310650e..bf5f18544c70 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -517,10 +517,104 @@ export class EventHubSender extends LinkEntity { let waitTimer: any; + let onRejected: Func; + let onReleased: Func; + let onModified: Func; + let onAccepted: Func; + let onAborted: () => void; + + onAborted = () => { + removeListeners(); + rejectOnAbort(); + }; + onAccepted = (context: EventContext) => { + // Since we will be adding listener for accepted and rejected event every time + // we send a message, we need to remove listener for both the events. + // This will ensure duplicate listeners are not added for the same event. + removeListeners(); + log.sender( + "[%s] Sender '%s', got event accepted.", + this._context.connectionId, + this.name + ); + resolve(); + }; + onRejected = (context: EventContext) => { + removeListeners(); + log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name); + const err = translate(context!.delivery!.remote_state!.error); + log.error(err); + reject(err); + }; + onReleased = (context: EventContext) => { + removeListeners(); + log.error("[%s] Sender '%s', got event released.", this._context.connectionId, this.name); + let err: Error; + if (context!.delivery!.remote_state!.error) { + err = translate(context!.delivery!.remote_state!.error); + } else { + err = new Error( + `[${this._context.connectionId}] Sender '${this.name}', ` + + `received a release disposition.Hence we are rejecting the promise.` + ); + } + log.error(err); + reject(err); + }; + onModified = (context: EventContext) => { + removeListeners(); + log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name); + let err: Error; + if (context!.delivery!.remote_state!.error) { + err = translate(context!.delivery!.remote_state!.error); + } else { + err = new Error( + `[${this._context.connectionId}] Sender "${this.name}", ` + + `received a modified disposition.Hence we are rejecting the promise.` + ); + } + log.error(err); + reject(err); + }; + + const removeListeners = (): void => { + clearTimeout(waitTimer); + // When `removeListeners` is called on timeout, the sender might be closed and cleared + // So, check if it exists, before removing listeners from it. + if (abortSignal) { + abortSignal.removeEventListener("abort", onAborted); + } + if (this._sender) { + this._sender.removeListener(SenderEvents.rejected, onRejected); + this._sender.removeListener(SenderEvents.accepted, onAccepted); + this._sender.removeListener(SenderEvents.released, onReleased); + this._sender.removeListener(SenderEvents.modified, onModified); + } + }; + + const actionAfterTimeout = () => { + removeListeners(); + const desc: string = + `[${this._context.connectionId}] Sender "${this.name}" with ` + + `address "${this.address}", was not able to send the message right now, due ` + + `to operation timeout.`; + log.error(desc); + const e: Error = { + name: "OperationTimeoutError", + message: desc + }; + return reject(translate(e)); + }; + + waitTimer = setTimeout( + actionAfterTimeout, + getRetryAttemptTimeoutInMs(options.retryOptions) + ); + if (!this.isOpen()) { log.sender( "Acquiring lock %s for initializing the session, sender and " + - "possibly the connection.", + "possibly the connection.", this.senderLock ); @@ -542,106 +636,6 @@ export class EventHubSender extends LinkEntity { this._context.connectionId, this.name ); - let onRejected: Func; - let onReleased: Func; - let onModified: Func; - let onAccepted: Func; - let onAborted: () => void; - - const removeListeners = (): void => { - clearTimeout(waitTimer); - // When `removeListeners` is called on timeout, the sender might be closed and cleared - // So, check if it exists, before removing listeners from it. - if (abortSignal) { - abortSignal.removeEventListener("abort", onAborted); - } - if (this._sender) { - this._sender.removeListener(SenderEvents.rejected, onRejected); - this._sender.removeListener(SenderEvents.accepted, onAccepted); - this._sender.removeListener(SenderEvents.released, onReleased); - this._sender.removeListener(SenderEvents.modified, onModified); - } - }; - - onAborted = () => { - removeListeners(); - rejectOnAbort(); - }; - onAccepted = (context: EventContext) => { - // Since we will be adding listener for accepted and rejected event every time - // we send a message, we need to remove listener for both the events. - // This will ensure duplicate listeners are not added for the same event. - removeListeners(); - log.sender( - "[%s] Sender '%s', got event accepted.", - this._context.connectionId, - this.name - ); - resolve(); - }; - onRejected = (context: EventContext) => { - removeListeners(); - log.error( - "[%s] Sender '%s', got event rejected.", - this._context.connectionId, - this.name - ); - const err = translate(context!.delivery!.remote_state!.error); - log.error(err); - reject(err); - }; - onReleased = (context: EventContext) => { - removeListeners(); - log.error( - "[%s] Sender '%s', got event released.", - this._context.connectionId, - this.name - ); - let err: Error; - if (context!.delivery!.remote_state!.error) { - err = translate(context!.delivery!.remote_state!.error); - } else { - err = new Error( - `[${this._context.connectionId}] Sender '${this.name}', ` + - `received a release disposition.Hence we are rejecting the promise.` - ); - } - log.error(err); - reject(err); - }; - onModified = (context: EventContext) => { - removeListeners(); - log.error( - "[%s] Sender '%s', got event modified.", - this._context.connectionId, - this.name - ); - let err: Error; - if (context!.delivery!.remote_state!.error) { - err = translate(context!.delivery!.remote_state!.error); - } else { - err = new Error( - `[${this._context.connectionId}] Sender "${this.name}", ` + - `received a modified disposition.Hence we are rejecting the promise.` - ); - } - log.error(err); - reject(err); - }; - - const actionAfterTimeout = () => { - removeListeners(); - const desc: string = - `[${this._context.connectionId}] Sender "${this.name}" with ` + - `address "${this.address}", was not able to send the message right now, due ` + - `to operation timeout.`; - log.error(desc); - const e: AmqpError = { - condition: ErrorNameConditionMapper.ServiceUnavailableError, - description: desc - }; - return reject(translate(e)); - }; if (abortSignal) { abortSignal.addEventListener("abort", onAborted); @@ -650,7 +644,6 @@ export class EventHubSender extends LinkEntity { this._sender!.on(SenderEvents.rejected, onRejected); this._sender!.on(SenderEvents.modified, onModified); this._sender!.on(SenderEvents.released, onReleased); - waitTimer = setTimeout(actionAfterTimeout, getRetryAttemptTimeoutInMs(options.retryOptions)); const delivery = this._sender!.send(message, undefined, 0x80013700); log.sender( "[%s] Sender '%s', sent message with delivery id: %d", From f28c5661342222567484ff47adfa4651bd4c53ce Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Wed, 17 Jul 2019 10:59:53 -0700 Subject: [PATCH 13/22] Fix timer handling --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index bf5f18544c70..3186ec4d086d 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -618,9 +618,22 @@ export class EventHubSender extends LinkEntity { this.senderLock ); - await defaultLock.acquire(this.senderLock, () => { - return this._init(); - }); + try { + await defaultLock.acquire(this.senderLock, () => { + return this._init(); + }); + } catch (err) { + err = translate(err); + log.error( + "[%s] An error occurred while creating the sender %s", + this._context.connectionId, + this.name, + err + ); + reject(err); + } finally { + clearTimeout(waitTimer); + } } log.sender( From 66127ee88feae59f400dc7adb160248c9221ed21 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Wed, 17 Jul 2019 11:06:44 -0700 Subject: [PATCH 14/22] Refactor to simplify callback definitions --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 71 +++++++++---------- 1 file changed, 35 insertions(+), 36 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 3186ec4d086d..20532dd3e2a6 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -403,7 +403,9 @@ export class EventHubSender extends LinkEntity { if (events instanceof EventDataBatch && options && options.partitionKey) { // throw an error if partition key is different than the one provided in the options. - const error = new Error("Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead."); + const error = new Error( + "Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead." + ); log.error( "[%s] Partition key is not supported when using createBatch(). %O", this._context.connectionId, @@ -498,36 +500,28 @@ export class EventHubSender extends LinkEntity { message: AmqpMessage | Buffer, options: SendOptions & EventHubProducerOptions = {} ): Promise { - const abortSignal: AbortSignalLike | undefined = options.abortSignal; const sendEventPromise = () => new Promise(async (resolve, reject) => { + // Callbacks used for being registered with events on sender const rejectOnAbort = () => { const desc: string = - `[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` + - `address "${this.address}" has been cancelled by the user.`; + `[${this._context.connectionId}] The send operation on the Sender "${ + this.name + }" with ` + `address "${this.address}" has been cancelled by the user.`; log.error(desc); reject(new AbortError("The send operation has been cancelled by the user.")); }; - if (abortSignal && abortSignal.aborted) { - // operation has been cancelled, so exit quickly - return rejectOnAbort(); - } - - let waitTimer: any; - - let onRejected: Func; - let onReleased: Func; - let onModified: Func; - let onAccepted: Func; - let onAborted: () => void; - - onAborted = () => { + const onRejected: Func = (context: EventContext) => { removeListeners(); - rejectOnAbort(); + log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name); + const err = translate(context!.delivery!.remote_state!.error); + log.error(err); + reject(err); }; - onAccepted = (context: EventContext) => { + + const onAccepted: Func = (context: EventContext) => { // Since we will be adding listener for accepted and rejected event every time // we send a message, we need to remove listener for both the events. // This will ensure duplicate listeners are not added for the same event. @@ -539,14 +533,13 @@ export class EventHubSender extends LinkEntity { ); resolve(); }; - onRejected = (context: EventContext) => { + + const onAborted = () => { removeListeners(); - log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name); - const err = translate(context!.delivery!.remote_state!.error); - log.error(err); - reject(err); + rejectOnAbort(); }; - onReleased = (context: EventContext) => { + + const onReleased = (context: EventContext) => { removeListeners(); log.error("[%s] Sender '%s', got event released.", this._context.connectionId, this.name); let err: Error; @@ -561,7 +554,8 @@ export class EventHubSender extends LinkEntity { log.error(err); reject(err); }; - onModified = (context: EventContext) => { + + const onModified = (context: EventContext) => { removeListeners(); log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name); let err: Error; @@ -606,11 +600,16 @@ export class EventHubSender extends LinkEntity { return reject(translate(e)); }; - waitTimer = setTimeout( + const waitTimer = setTimeout( actionAfterTimeout, getRetryAttemptTimeoutInMs(options.retryOptions) ); + if (abortSignal && abortSignal.aborted) { + // operation has been cancelled, so exit quickly + return rejectOnAbort(); + } + if (!this.isOpen()) { log.sender( "Acquiring lock %s for initializing the session, sender and " + @@ -623,14 +622,14 @@ export class EventHubSender extends LinkEntity { return this._init(); }); } catch (err) { - err = translate(err); - log.error( - "[%s] An error occurred while creating the sender %s", - this._context.connectionId, - this.name, - err - ); - reject(err); + err = translate(err); + log.error( + "[%s] An error occurred while creating the sender %s", + this._context.connectionId, + this.name, + err + ); + reject(err); } finally { clearTimeout(waitTimer); } From 0e54d619fd859d3a3f2fcaab6d2ca544d4ae8fc5 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Wed, 17 Jul 2019 11:34:53 -0700 Subject: [PATCH 15/22] Remove jitter --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 20532dd3e2a6..1ac38af14726 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -677,7 +677,6 @@ export class EventHubSender extends LinkEntity { } }); - const jitterInSeconds = randomNumberFromInterval(1, 4); const maxRetries = options.retryOptions && options.retryOptions.maxRetries; const delayInSeconds = options.retryOptions && @@ -690,7 +689,7 @@ export class EventHubSender extends LinkEntity { connectionId: this._context.connectionId, operationType: RetryOperationType.sendMessage, maxRetries: maxRetries, - delayInSeconds: delayInSeconds + jitterInSeconds + delayInSeconds: delayInSeconds }; return retry(config); } From 3ec9c9d524fd452e71ece40181c9d1b4ec004348 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Wed, 17 Jul 2019 15:52:42 -0700 Subject: [PATCH 16/22] Update try catch scope --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 100 +++++++++--------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 1ac38af14726..2588b7ca42d5 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -610,70 +610,70 @@ export class EventHubSender extends LinkEntity { return rejectOnAbort(); } - if (!this.isOpen()) { - log.sender( - "Acquiring lock %s for initializing the session, sender and " + - "possibly the connection.", - this.senderLock - ); + try { + if (!this.isOpen()) { + log.sender( + "Acquiring lock %s for initializing the session, sender and " + + "possibly the connection.", + this.senderLock + ); - try { await defaultLock.acquire(this.senderLock, () => { return this._init(); }); - } catch (err) { - err = translate(err); - log.error( - "[%s] An error occurred while creating the sender %s", - this._context.connectionId, - this.name, - err - ); - reject(err); - } finally { - clearTimeout(waitTimer); } - } - log.sender( - "[%s] Sender '%s', credit: %d available: %d", - this._context.connectionId, - this.name, - this._sender!.credit, - this._sender!.session.outgoing.available() - ); - if (this._sender!.sendable()) { log.sender( - "[%s] Sender '%s', sending message with id '%s'.", + "[%s] Sender '%s', credit: %d available: %d", this._context.connectionId, - this.name + this.name, + this._sender!.credit, + this._sender!.session.outgoing.available() ); + if (this._sender!.sendable()) { + log.sender( + "[%s] Sender '%s', sending message with id '%s'.", + this._context.connectionId, + this.name + ); - if (abortSignal) { - abortSignal.addEventListener("abort", onAborted); + if (abortSignal) { + abortSignal.addEventListener("abort", onAborted); + } + this._sender!.on(SenderEvents.accepted, onAccepted); + this._sender!.on(SenderEvents.rejected, onRejected); + this._sender!.on(SenderEvents.modified, onModified); + this._sender!.on(SenderEvents.released, onReleased); + const delivery = this._sender!.send(message, undefined, 0x80013700); + log.sender( + "[%s] Sender '%s', sent message with delivery id: %d", + this._context.connectionId, + this.name, + delivery.id + ); + } else { + // let us retry to send the message after some time. + const msg = + `[${this._context.connectionId}] Sender "${this.name}", ` + + `cannot send the message right now. Please try later.`; + log.error(msg); + const amqpError: AmqpError = { + condition: ErrorNameConditionMapper.SenderBusyError, + description: msg + }; + reject(translate(amqpError)); } - this._sender!.on(SenderEvents.accepted, onAccepted); - this._sender!.on(SenderEvents.rejected, onRejected); - this._sender!.on(SenderEvents.modified, onModified); - this._sender!.on(SenderEvents.released, onReleased); - const delivery = this._sender!.send(message, undefined, 0x80013700); - log.sender( - "[%s] Sender '%s', sent message with delivery id: %d", + } catch (err) { + err = translate(err); + log.error( + "[%s] An error occurred while creating the sender %s", this._context.connectionId, this.name, - delivery.id + err ); - } else { - // let us retry to send the message after some time. - const msg = - `[${this._context.connectionId}] Sender "${this.name}", ` + - `cannot send the message right now. Please try later.`; - log.error(msg); - const amqpError: AmqpError = { - condition: ErrorNameConditionMapper.SenderBusyError, - description: msg - }; - reject(translate(amqpError)); + reject(err); + } finally { + clearTimeout(waitTimer); } }); From d9d38a3382c192c4e47bcdf957ca0f33064ba4d4 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Wed, 17 Jul 2019 15:57:15 -0700 Subject: [PATCH 17/22] Fix timer clearance --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 101 +++++++++--------- 1 file changed, 50 insertions(+), 51 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 2588b7ca42d5..37f60c577596 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -610,70 +610,69 @@ export class EventHubSender extends LinkEntity { return rejectOnAbort(); } - try { - if (!this.isOpen()) { - log.sender( - "Acquiring lock %s for initializing the session, sender and " + - "possibly the connection.", - this.senderLock - ); + if (!this.isOpen()) { + log.sender( + "Acquiring lock %s for initializing the session, sender and " + + "possibly the connection.", + this.senderLock + ); + try { await defaultLock.acquire(this.senderLock, () => { return this._init(); }); - } + } catch (err) { + clearTimeout(waitTimer); + err = translate(err); + log.error( + "[%s] An error occurred while creating the sender %s", + this._context.connectionId, + this.name, + err + ); + reject(err); + } + } + log.sender( + "[%s] Sender '%s', credit: %d available: %d", + this._context.connectionId, + this.name, + this._sender!.credit, + this._sender!.session.outgoing.available() + ); + if (this._sender!.sendable()) { log.sender( - "[%s] Sender '%s', credit: %d available: %d", + "[%s] Sender '%s', sending message with id '%s'.", this._context.connectionId, - this.name, - this._sender!.credit, - this._sender!.session.outgoing.available() + this.name ); - if (this._sender!.sendable()) { - log.sender( - "[%s] Sender '%s', sending message with id '%s'.", - this._context.connectionId, - this.name - ); - if (abortSignal) { - abortSignal.addEventListener("abort", onAborted); - } - this._sender!.on(SenderEvents.accepted, onAccepted); - this._sender!.on(SenderEvents.rejected, onRejected); - this._sender!.on(SenderEvents.modified, onModified); - this._sender!.on(SenderEvents.released, onReleased); - const delivery = this._sender!.send(message, undefined, 0x80013700); - log.sender( - "[%s] Sender '%s', sent message with delivery id: %d", - this._context.connectionId, - this.name, - delivery.id - ); - } else { - // let us retry to send the message after some time. - const msg = - `[${this._context.connectionId}] Sender "${this.name}", ` + - `cannot send the message right now. Please try later.`; - log.error(msg); - const amqpError: AmqpError = { - condition: ErrorNameConditionMapper.SenderBusyError, - description: msg - }; - reject(translate(amqpError)); + if (abortSignal) { + abortSignal.addEventListener("abort", onAborted); } - } catch (err) { - err = translate(err); - log.error( - "[%s] An error occurred while creating the sender %s", + this._sender!.on(SenderEvents.accepted, onAccepted); + this._sender!.on(SenderEvents.rejected, onRejected); + this._sender!.on(SenderEvents.modified, onModified); + this._sender!.on(SenderEvents.released, onReleased); + const delivery = this._sender!.send(message, undefined, 0x80013700); + log.sender( + "[%s] Sender '%s', sent message with delivery id: %d", this._context.connectionId, this.name, - err + delivery.id ); - reject(err); - } finally { - clearTimeout(waitTimer); + } else { + // let us retry to send the message after some time. + const msg = + `[${this._context.connectionId}] Sender "${this.name}", ` + + `cannot send the message right now. Please try later.`; + log.error(msg); + const amqpError: AmqpError = { + condition: ErrorNameConditionMapper.SenderBusyError, + description: msg + }; + reject(translate(amqpError)); } }); From f85e11295cf4258ecc899a349fa829268f53aeb4 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Wed, 17 Jul 2019 15:58:31 -0700 Subject: [PATCH 18/22] Remove comment --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 37f60c577596..4bedb66b5435 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -503,7 +503,6 @@ export class EventHubSender extends LinkEntity { const abortSignal: AbortSignalLike | undefined = options.abortSignal; const sendEventPromise = () => new Promise(async (resolve, reject) => { - // Callbacks used for being registered with events on sender const rejectOnAbort = () => { const desc: string = `[${this._context.connectionId}] The send operation on the Sender "${ @@ -631,7 +630,7 @@ export class EventHubSender extends LinkEntity { err ); reject(err); - } + } } log.sender( From f5b54eba8bc2274b2e6582ac825e0ed303944fa7 Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Wed, 17 Jul 2019 15:59:20 -0700 Subject: [PATCH 19/22] Return reject --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 4bedb66b5435..3b14d74ab450 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -629,7 +629,7 @@ export class EventHubSender extends LinkEntity { this.name, err ); - reject(err); + return reject(err); } } From ddb6195d4c221f31af77aa25e16d1ca50bf240da Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Wed, 17 Jul 2019 16:54:05 -0700 Subject: [PATCH 20/22] Fix build error --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 3b14d74ab450..2a7971177363 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -21,8 +21,7 @@ import { ErrorNameConditionMapper, RetryConfig, RetryOperationType, - Constants, - randomNumberFromInterval + Constants } from "@azure/core-amqp"; import { EventData, toAmqpMessage } from "./eventData"; import { ConnectionContext } from "./connectionContext"; From eaaf17e92e7532f69c79fe8d74fd30f3db6ae78b Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Fri, 19 Jul 2019 11:02:19 -0700 Subject: [PATCH 21/22] Revert refactoring --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 2a7971177363..e847fdeda329 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -502,6 +502,14 @@ export class EventHubSender extends LinkEntity { const abortSignal: AbortSignalLike | undefined = options.abortSignal; const sendEventPromise = () => new Promise(async (resolve, reject) => { + let waitTimer: any; + + let onRejected: Func; + let onReleased: Func; + let onModified: Func; + let onAccepted: Func; + let onAborted: () => void; + const rejectOnAbort = () => { const desc: string = `[${this._context.connectionId}] The send operation on the Sender "${ @@ -511,15 +519,17 @@ export class EventHubSender extends LinkEntity { reject(new AbortError("The send operation has been cancelled by the user.")); }; - const onRejected: Func = (context: EventContext) => { + if (abortSignal && abortSignal.aborted) { + // operation has been cancelled, so exit quickly + return rejectOnAbort(); + } + + onAborted = () => { removeListeners(); - log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name); - const err = translate(context!.delivery!.remote_state!.error); - log.error(err); - reject(err); + rejectOnAbort(); }; - const onAccepted: Func = (context: EventContext) => { + onAccepted = (context: EventContext) => { // Since we will be adding listener for accepted and rejected event every time // we send a message, we need to remove listener for both the events. // This will ensure duplicate listeners are not added for the same event. @@ -532,12 +542,15 @@ export class EventHubSender extends LinkEntity { resolve(); }; - const onAborted = () => { + onRejected = (context: EventContext) => { removeListeners(); - rejectOnAbort(); + log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name); + const err = translate(context!.delivery!.remote_state!.error); + log.error(err); + reject(err); }; - const onReleased = (context: EventContext) => { + onReleased = (context: EventContext) => { removeListeners(); log.error("[%s] Sender '%s', got event released.", this._context.connectionId, this.name); let err: Error; @@ -552,8 +565,8 @@ export class EventHubSender extends LinkEntity { log.error(err); reject(err); }; - - const onModified = (context: EventContext) => { + + onModified = (context: EventContext) => { removeListeners(); log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name); let err: Error; @@ -598,16 +611,11 @@ export class EventHubSender extends LinkEntity { return reject(translate(e)); }; - const waitTimer = setTimeout( + waitTimer = setTimeout( actionAfterTimeout, getRetryAttemptTimeoutInMs(options.retryOptions) ); - if (abortSignal && abortSignal.aborted) { - // operation has been cancelled, so exit quickly - return rejectOnAbort(); - } - if (!this.isOpen()) { log.sender( "Acquiring lock %s for initializing the session, sender and " + From f6e72713e1fcb5c93c69b0de0ff91df39d2157de Mon Sep 17 00:00:00 2001 From: ramya0820 Date: Fri, 19 Jul 2019 15:13:54 -0700 Subject: [PATCH 22/22] Handle abort around init --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 2c46a569e4bf..3bb96e38c0f7 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -516,7 +516,7 @@ export class EventHubSender extends LinkEntity { this.name }" with ` + `address "${this.address}" has been cancelled by the user.`; log.error(desc); - reject(new AbortError("The send operation has been cancelled by the user.")); + return reject(new AbortError("The send operation has been cancelled by the user.")); }; if (abortSignal && abortSignal.aborted) { @@ -565,7 +565,7 @@ export class EventHubSender extends LinkEntity { log.error(err); reject(err); }; - + onModified = (context: EventContext) => { removeListeners(); log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name); @@ -611,6 +611,10 @@ export class EventHubSender extends LinkEntity { return reject(translate(e)); }; + if (abortSignal) { + abortSignal.addEventListener("abort", onAborted); + } + waitTimer = setTimeout( actionAfterTimeout, getRetryAttemptTimeoutInMs(options.retryOptions) @@ -628,6 +632,9 @@ export class EventHubSender extends LinkEntity { return this._init(); }); } catch (err) { + if (abortSignal) { + abortSignal.removeEventListener("abort", onAborted); + } clearTimeout(waitTimer); err = translate(err); log.error( @@ -654,14 +661,11 @@ export class EventHubSender extends LinkEntity { this.name ); - if (abortSignal) { - abortSignal.addEventListener("abort", onAborted); - } this._sender!.on(SenderEvents.accepted, onAccepted); this._sender!.on(SenderEvents.rejected, onRejected); this._sender!.on(SenderEvents.modified, onModified); this._sender!.on(SenderEvents.released, onReleased); - + const delivery = this._sender!.send(message, undefined, 0x80013700); log.sender( "[%s] Sender '%s', sent message with delivery id: %d",