-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[Event Hubs] Update send operation to include initialization #4319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
260b0e6
c74f5e4
00016cb
4916ce3
af5d8c4
a66272a
a74824d
1607ddd
0631ceb
60fa5ae
3168339
b449cef
e0c8e92
85c2ffc
1212ae7
f28c566
66127ee
0e54d61
3ec9c9d
d9d38a3
f85e112
f5b54eb
ddb6195
6e6ef92
eaaf17e
78c0b4a
f6e7271
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,8 +21,7 @@ import { | |
| ErrorNameConditionMapper, | ||
| RetryConfig, | ||
| RetryOperationType, | ||
| Constants, | ||
| randomNumberFromInterval | ||
| Constants | ||
| } from "@azure/core-amqp"; | ||
| import { EventData, toAmqpMessage } from "./eventData"; | ||
| import { ConnectionContext } from "./connectionContext"; | ||
|
|
@@ -414,16 +413,6 @@ export class EventHubSender extends LinkEntity { | |
| throw error; | ||
| } | ||
|
|
||
| if (!this.isOpen()) { | ||
| log.sender( | ||
| "Acquiring lock %s for initializing the session, sender and " + | ||
| "possibly the connection.", | ||
| this.senderLock | ||
| ); | ||
| await defaultLock.acquire(this.senderLock, () => { | ||
| return this._init(); | ||
| }); | ||
| } | ||
| log.sender( | ||
| "[%s] Sender '%s', trying to send EventData[].", | ||
| this._context.connectionId, | ||
|
|
@@ -512,146 +501,171 @@ export class EventHubSender extends LinkEntity { | |
| ): Promise<void> { | ||
| const abortSignal: AbortSignalLike | undefined = options.abortSignal; | ||
| const sendEventPromise = () => | ||
| new Promise<void>((resolve, reject) => { | ||
| new Promise<void>(async (resolve, reject) => { | ||
| let waitTimer: any; | ||
|
|
||
| let onRejected: Func<EventContext, void>; | ||
| let onReleased: Func<EventContext, void>; | ||
| let onModified: Func<EventContext, void>; | ||
| let onAccepted: Func<EventContext, void>; | ||
| let onAborted: () => void; | ||
|
|
||
| const rejectOnAbort = () => { | ||
| const desc: string = | ||
| `[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` + | ||
| `address "${this.address}" has been cancelled by the user.`; | ||
| `[${this._context.connectionId}] The send operation on the Sender "${ | ||
| this.name | ||
| }" with ` + `address "${this.address}" has been cancelled by the user.`; | ||
| log.error(desc); | ||
| reject(new AbortError("The send operation has been cancelled by the user.")); | ||
| return reject(new AbortError("The send operation has been cancelled by the user.")); | ||
| }; | ||
|
|
||
| if (abortSignal && abortSignal.aborted) { | ||
| // operation has been cancelled, so exit quickly | ||
| return rejectOnAbort(); | ||
| } | ||
|
|
||
| let waitTimer: any; | ||
| log.sender( | ||
| "[%s] Sender '%s', credit: %d available: %d", | ||
| this._context.connectionId, | ||
| this.name, | ||
| this._sender!.credit, | ||
| this._sender!.session.outgoing.available() | ||
| ); | ||
| if (this._sender!.sendable()) { | ||
| onAborted = () => { | ||
| removeListeners(); | ||
| rejectOnAbort(); | ||
| }; | ||
|
|
||
| onAccepted = (context: EventContext) => { | ||
| // Since we will be adding listener for accepted and rejected event every time | ||
| // we send a message, we need to remove listener for both the events. | ||
| // This will ensure duplicate listeners are not added for the same event. | ||
| removeListeners(); | ||
| log.sender( | ||
| "[%s] Sender '%s', sending message with id '%s'.", | ||
| "[%s] Sender '%s', got event accepted.", | ||
| this._context.connectionId, | ||
| this.name | ||
| ); | ||
| let onRejected: Func<EventContext, void>; | ||
| let onReleased: Func<EventContext, void>; | ||
| let onModified: Func<EventContext, void>; | ||
| let onAccepted: Func<EventContext, void>; | ||
| let onAborted: () => void; | ||
| resolve(); | ||
| }; | ||
|
|
||
| const removeListeners = (): void => { | ||
| clearTimeout(waitTimer); | ||
| // When `removeListeners` is called on timeout, the sender might be closed and cleared | ||
| // So, check if it exists, before removing listeners from it. | ||
| if (abortSignal) { | ||
| abortSignal.removeEventListener("abort", onAborted); | ||
| } | ||
| if (this._sender) { | ||
| this._sender.removeListener(SenderEvents.rejected, onRejected); | ||
| this._sender.removeListener(SenderEvents.accepted, onAccepted); | ||
| this._sender.removeListener(SenderEvents.released, onReleased); | ||
| this._sender.removeListener(SenderEvents.modified, onModified); | ||
| } | ||
| }; | ||
| onRejected = (context: EventContext) => { | ||
| removeListeners(); | ||
| log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name); | ||
| const err = translate(context!.delivery!.remote_state!.error); | ||
| log.error(err); | ||
| reject(err); | ||
| }; | ||
|
|
||
| onAborted = () => { | ||
| removeListeners(); | ||
| rejectOnAbort(); | ||
| }; | ||
| onAccepted = (context: EventContext) => { | ||
| // Since we will be adding listener for accepted and rejected event every time | ||
| // we send a message, we need to remove listener for both the events. | ||
| // This will ensure duplicate listeners are not added for the same event. | ||
| removeListeners(); | ||
| log.sender( | ||
| "[%s] Sender '%s', got event accepted.", | ||
| this._context.connectionId, | ||
| this.name | ||
| onReleased = (context: EventContext) => { | ||
| removeListeners(); | ||
| log.error("[%s] Sender '%s', got event released.", this._context.connectionId, this.name); | ||
| let err: Error; | ||
| if (context!.delivery!.remote_state!.error) { | ||
| err = translate(context!.delivery!.remote_state!.error); | ||
| } else { | ||
| err = new Error( | ||
| `[${this._context.connectionId}] Sender '${this.name}', ` + | ||
| `received a release disposition.Hence we are rejecting the promise.` | ||
| ); | ||
| resolve(); | ||
| }; | ||
| onRejected = (context: EventContext) => { | ||
| removeListeners(); | ||
| log.error( | ||
| "[%s] Sender '%s', got event rejected.", | ||
| this._context.connectionId, | ||
| this.name | ||
| } | ||
| log.error(err); | ||
| reject(err); | ||
| }; | ||
|
|
||
| onModified = (context: EventContext) => { | ||
| removeListeners(); | ||
| log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name); | ||
| let err: Error; | ||
| if (context!.delivery!.remote_state!.error) { | ||
| err = translate(context!.delivery!.remote_state!.error); | ||
| } else { | ||
| err = new Error( | ||
| `[${this._context.connectionId}] Sender "${this.name}", ` + | ||
| `received a modified disposition.Hence we are rejecting the promise.` | ||
| ); | ||
| const err = translate(context!.delivery!.remote_state!.error); | ||
| log.error(err); | ||
| reject(err); | ||
| } | ||
| log.error(err); | ||
| reject(err); | ||
| }; | ||
|
|
||
| const removeListeners = (): void => { | ||
| clearTimeout(waitTimer); | ||
| // When `removeListeners` is called on timeout, the sender might be closed and cleared | ||
| // So, check if it exists, before removing listeners from it. | ||
| if (abortSignal) { | ||
| abortSignal.removeEventListener("abort", onAborted); | ||
| } | ||
| if (this._sender) { | ||
| this._sender.removeListener(SenderEvents.rejected, onRejected); | ||
| this._sender.removeListener(SenderEvents.accepted, onAccepted); | ||
| this._sender.removeListener(SenderEvents.released, onReleased); | ||
| this._sender.removeListener(SenderEvents.modified, onModified); | ||
| } | ||
| }; | ||
|
|
||
| const actionAfterTimeout = () => { | ||
| removeListeners(); | ||
| const desc: string = | ||
| `[${this._context.connectionId}] Sender "${this.name}" with ` + | ||
| `address "${this.address}", was not able to send the message right now, due ` + | ||
| `to operation timeout.`; | ||
| log.error(desc); | ||
| const e: Error = { | ||
| name: "OperationTimeoutError", | ||
| message: desc | ||
| }; | ||
| onReleased = (context: EventContext) => { | ||
| removeListeners(); | ||
| log.error( | ||
| "[%s] Sender '%s', got event released.", | ||
| this._context.connectionId, | ||
| this.name | ||
| ); | ||
| let err: Error; | ||
| if (context!.delivery!.remote_state!.error) { | ||
| err = translate(context!.delivery!.remote_state!.error); | ||
| } else { | ||
| err = new Error( | ||
| `[${this._context.connectionId}] Sender '${this.name}', ` + | ||
| `received a release disposition.Hence we are rejecting the promise.` | ||
| ); | ||
| return reject(translate(e)); | ||
| }; | ||
|
|
||
| if (abortSignal) { | ||
| abortSignal.addEventListener("abort", onAborted); | ||
| } | ||
|
|
||
| waitTimer = setTimeout( | ||
|
ramya0820 marked this conversation as resolved.
|
||
| actionAfterTimeout, | ||
| getRetryAttemptTimeoutInMs(options.retryOptions) | ||
| ); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Due to the re-arrangement of the timer, if Please consider keeping the previous order of the callbacks/helper-functions and only adding the link creation part at the right place
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The ordering in the recent commit of eaaf17e is better, but still needs work. In the current setup, if the abort signal is fired when the async process of Please make the below changes
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #4322 (comment) thread specifically clarifies about abort during init(). This scenario if we want to address would then need to apply for the managementRequest as well correct? Or did we specifically exclude this because of complexity involved? (In not having access to removeListeners() on sender link from the SDK?) It just feels like UX is not consistent in both cases. Okay to implement the optimization and new corner cases from this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This scenario applies to managementRequest as well i.e. the scenario of abort signal being fired when the async process of init() is in progress. I have updated the comment thread for managementRequest. Please see https://github.com/Azure/azure-sdk-for-js/pull/4322/files#r305599979 |
||
|
|
||
| if (!this.isOpen()) { | ||
| log.sender( | ||
| "Acquiring lock %s for initializing the session, sender and " + | ||
| "possibly the connection.", | ||
| this.senderLock | ||
| ); | ||
|
|
||
| try { | ||
| await defaultLock.acquire(this.senderLock, () => { | ||
| return this._init(); | ||
| }); | ||
| } catch (err) { | ||
| if (abortSignal) { | ||
| abortSignal.removeEventListener("abort", onAborted); | ||
| } | ||
| log.error(err); | ||
| reject(err); | ||
| }; | ||
| onModified = (context: EventContext) => { | ||
| removeListeners(); | ||
| clearTimeout(waitTimer); | ||
| err = translate(err); | ||
| log.error( | ||
| "[%s] Sender '%s', got event modified.", | ||
| "[%s] An error occurred while creating the sender %s", | ||
| this._context.connectionId, | ||
| this.name | ||
| this.name, | ||
| err | ||
| ); | ||
| let err: Error; | ||
| if (context!.delivery!.remote_state!.error) { | ||
| err = translate(context!.delivery!.remote_state!.error); | ||
| } else { | ||
| err = new Error( | ||
| `[${this._context.connectionId}] Sender "${this.name}", ` + | ||
| `received a modified disposition.Hence we are rejecting the promise.` | ||
| ); | ||
| } | ||
| log.error(err); | ||
| reject(err); | ||
| }; | ||
| return reject(err); | ||
| } | ||
| } | ||
|
|
||
| const actionAfterTimeout = () => { | ||
| removeListeners(); | ||
| const desc: string = | ||
| `[${this._context.connectionId}] Sender "${this.name}" with ` + | ||
| `address "${this.address}", was not able to send the message right now, due ` + | ||
| `to operation timeout.`; | ||
| log.error(desc); | ||
| const e: AmqpError = { | ||
| condition: ErrorNameConditionMapper.ServiceUnavailableError, | ||
| description: desc | ||
| }; | ||
| return reject(translate(e)); | ||
| }; | ||
| log.sender( | ||
| "[%s] Sender '%s', credit: %d available: %d", | ||
| this._context.connectionId, | ||
| this.name, | ||
| this._sender!.credit, | ||
| this._sender!.session.outgoing.available() | ||
| ); | ||
| if (this._sender!.sendable()) { | ||
| log.sender( | ||
| "[%s] Sender '%s', sending message with id '%s'.", | ||
| this._context.connectionId, | ||
| this.name | ||
| ); | ||
|
|
||
| if (abortSignal) { | ||
| abortSignal.addEventListener("abort", onAborted); | ||
| } | ||
| this._sender!.on(SenderEvents.accepted, onAccepted); | ||
| this._sender!.on(SenderEvents.rejected, onRejected); | ||
| this._sender!.on(SenderEvents.modified, onModified); | ||
| this._sender!.on(SenderEvents.released, onReleased); | ||
| waitTimer = setTimeout( | ||
| actionAfterTimeout, | ||
| getRetryAttemptTimeoutInMs(options.retryOptions) | ||
| ); | ||
|
|
||
| const delivery = this._sender!.send(message, undefined, 0x80013700); | ||
| log.sender( | ||
| "[%s] Sender '%s', sent message with delivery id: %d", | ||
|
|
@@ -673,7 +687,6 @@ export class EventHubSender extends LinkEntity { | |
| } | ||
| }); | ||
|
|
||
| const jitterInSeconds = randomNumberFromInterval(1, 4); | ||
| const maxRetries = options.retryOptions && options.retryOptions.maxRetries; | ||
| const delayInSeconds = | ||
| options.retryOptions && | ||
|
|
@@ -686,7 +699,7 @@ export class EventHubSender extends LinkEntity { | |
| connectionId: this._context.connectionId, | ||
| operationType: RetryOperationType.sendMessage, | ||
| maxRetries: maxRetries, | ||
| delayInSeconds: delayInSeconds + jitterInSeconds | ||
| delayInSeconds: delayInSeconds | ||
| }; | ||
| return retry<void>(config); | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.