Skip to content

Commit c04f11e

Browse files
authored
[Event Hubs] Resilient management link creation (#46544)
The focus of these changes is to correctly handle the scenario where the management AMQP link is in the process of closing (such as for an idle timeout) as a new operation attempts to use it. This error should be detected as a special case and retried as it is with producer and consumer operations.
1 parent 86b7e38 commit c04f11e

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- Querying runtime data and other management operations will now correctly guards against the race condition where an AMQP link is in the process of closing as the operation attempts to use it. These errors will now properly be classified as retriable as they are for producer and consumer operations.
12+
1113
### Other Changes
1214

1315
## 5.11.5 (2024-07-31)

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ protected AmqpClient(string host,
200200
clientOptions.CertificateValidationCallback);
201201

202202
ManagementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
203-
linkTimeout => ConnectionScope.OpenManagementLinkAsync(operationTimeout, linkTimeout, CancellationToken.None),
203+
linkTimeout => CreateManagementLinkAsync(operationTimeout, linkTimeout, CancellationToken.None),
204204
link =>
205205
{
206206
link.Session?.SafeClose();
@@ -545,6 +545,35 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
545545
}
546546
}
547547

548+
/// <summary>
549+
/// Creates the AMQP link to be used for management operations and ensures
550+
/// that any corresponding state has been updated based on the link configuration.
551+
/// </summary>
552+
///
553+
/// <param name="operationTimeout">The timeout to apply to management operations using the link..</param>
554+
/// <param name="linkTimeout">The timeout to apply for creating the link.</param>
555+
/// <param name="cancellationToken">The cancellation token to consider when creating the link.</param>
556+
///
557+
/// <returns>The AMQP link to use for management operations.</returns>
558+
///
559+
private async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(TimeSpan operationTimeout,
560+
TimeSpan linkTimeout,
561+
CancellationToken cancellationToken)
562+
{
563+
var link = default(RequestResponseAmqpLink);
564+
565+
try
566+
{
567+
link = await ConnectionScope.OpenManagementLinkAsync(operationTimeout, linkTimeout, CancellationToken.None).ConfigureAwait(false);
568+
}
569+
catch (Exception ex)
570+
{
571+
ExceptionDispatchInfo.Capture(ex.TranslateConnectionCloseDuringLinkCreationException(EventHubName)).Throw();
572+
}
573+
574+
return link;
575+
}
576+
548577
/// <summary>
549578
/// Acquires an access token for authorization with the Event Hubs service.
550579
/// </summary>

0 commit comments

Comments
 (0)