Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.TestFramework",
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "External", "External", "{797FF941-76FD-45FD-AC17-A73DFE2BA621}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs", "..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj", "{87A3ED70-190D-4E6B-A568-40DF5B9F3939}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -29,12 +31,17 @@ Global
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Release|Any CPU.Build.0 = Release|Any CPU
{87A3ED70-190D-4E6B-A568-40DF5B9F3939}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{87A3ED70-190D-4E6B-A568-40DF5B9F3939}.Debug|Any CPU.Build.0 = Debug|Any CPU
{87A3ED70-190D-4E6B-A568-40DF5B9F3939}.Release|Any CPU.ActiveCfg = Release|Any CPU
{87A3ED70-190D-4E6B-A568-40DF5B9F3939}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{7DFF0E65-DC9A-410D-9A11-AD6A06860FE1} = {797FF941-76FD-45FD-AC17-A73DFE2BA621}
{87A3ED70-190D-4E6B-A568-40DF5B9F3939} = {797FF941-76FD-45FD-AC17-A73DFE2BA621}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {44BD3BD5-61DF-464D-8627-E00B0BC4B3A3}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

<ItemGroup>
<PackageReference Include="Azure.Core" />
<PackageReference Include="Azure.Messaging.EventHubs" />

<!-- TEMP :: Change back to package reference before releasing -->
<!-- <PackageReference Include="Azure.Messaging.EventHubs" /> -->
<ProjectReference Include="../../Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj" />
<!-- TEMP :: Remove project reference before releasing-->

<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Microsoft.Azure.Amqp" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public static EventHubConnectionOptions Clone(this EventHubConnectionOptions ins
new EventHubConnectionOptions
{
TransportType = instance.TransportType,
ConnectionIdleTimeout = instance.ConnectionIdleTimeout,
Proxy = instance.Proxy,
CustomEndpointAddress = instance.CustomEndpointAddress,
SendBufferSizeInBytes = instance.SendBufferSizeInBytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Security;
using System.Reflection;
using System.Security.Cryptography.X509Certificates;
using Azure.Messaging.EventHubs.Core;
using Moq;
Expand All @@ -19,6 +21,37 @@ namespace Azure.Messaging.EventHubs.Tests
[TestFixture]
public class EventHubConnectionOptionsExtensionsTests
{
/// <summary>
/// Verifies functionality of the <see cref="EventHubConnectionOptions.Clone" />
/// method.
/// </summary>
[Test]
public void CloneKnowsAllMembersOfEventHubConnectionOptions()
{
// This approach is inelegant and brute force, but allows us to detect
// additional members added to the options that we're not currently
// cloning and avoid drift.

var knownMembers = new HashSet<string>(StringComparer.OrdinalIgnoreCase)
{
"TransportType",
"Properties",
"ConnectionIdleTimeout",
"CustomEndpointAddress",
"SendBufferSizeInBytes",
"ReceiveBufferSizeInBytes",
"CertificateValidationCallback"
};

var getterSetterProperties = typeof(EventHubConnectionOptions)
.GetProperties(BindingFlags.Public | BindingFlags.GetProperty | BindingFlags.SetProperty);

foreach (var property in getterSetterProperties)
{
Assert.That(knownMembers.Contains(property.Name), $"The property: { property.Name } of { nameof(EventHubConnectionOptions) } is not being cloned.");
}
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubConnectionOptions.Clone" />
/// method.
Expand All @@ -30,6 +63,7 @@ public void CloneProducesACopy()
var options = new EventHubConnectionOptions
{
TransportType = EventHubsTransportType.AmqpWebSockets,
ConnectionIdleTimeout = TimeSpan.FromHours(3),
Proxy = Mock.Of<IWebProxy>(),
CustomEndpointAddress = new Uri("https://fake.servciebus.net"),
SendBufferSizeInBytes = 65,
Expand All @@ -40,6 +74,7 @@ public void CloneProducesACopy()
EventHubConnectionOptions clone = options.Clone();
Assert.That(clone, Is.Not.Null, "The clone should not be null.");
Assert.That(clone.TransportType, Is.EqualTo(options.TransportType), "The connection type of the clone should match.");
Assert.That(clone.ConnectionIdleTimeout, Is.EqualTo(options.ConnectionIdleTimeout), "The connection idle timeout of the clone should match.");
Assert.That(clone.Proxy, Is.EqualTo(options.Proxy), "The proxy of the clone should match.");
Assert.That(clone.CustomEndpointAddress, Is.EqualTo(options.CustomEndpointAddress), "The custom endpoint address clone should match.");
Assert.That(clone.SendBufferSizeInBytes, Is.EqualTo(options.SendBufferSizeInBytes), "The send buffer size clone should match.");
Expand Down
2 changes: 2 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

### Other Changes

- Added the ability to adjust the connection idle timeout using the `EventHubConnectionOptions` available within the options for each client type.

## 5.5.0 (2021-07-07)

### Acknowledgments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public partial class EventHubConnectionOptions
{
public EventHubConnectionOptions() { }
public System.Net.Security.RemoteCertificateValidationCallback CertificateValidationCallback { get { throw null; } set { } }
public System.TimeSpan ConnectionIdleTimeout { get { throw null; } set { } }
public System.Uri CustomEndpointAddress { get { throw null; } set { } }
public System.Net.IWebProxy Proxy { get { throw null; } set { } }
public int ReceiveBufferSizeInBytes { get { throw null; } set { } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ protected AmqpClient(string host,
credential,
clientOptions.TransportType,
clientOptions.Proxy,
clientOptions.ConnectionIdleTimeout,
null,
clientOptions.SendBufferSizeInBytes,
clientOptions.ReceiveBufferSizeInBytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,6 @@ internal class AmqpConnectionScope : IDisposable
///
private static Version AmqpVersion { get; } = new Version(1, 0, 0, 0);

/// <summary>
/// The amount of time to allow an AMQP connection to be idle before considering
/// it to be timed out.
/// </summary>
///
private static TimeSpan ConnectionIdleTimeout { get; } = TimeSpan.FromMinutes(1);

/// <summary>
/// The amount of buffer to apply to account for clock skew when
/// refreshing authorization. Authorization will be refreshed earlier
Expand Down Expand Up @@ -102,6 +95,12 @@ internal class AmqpConnectionScope : IDisposable
///
public TimeSpan SessionTimeout { get; } = TimeSpan.FromSeconds(30);

/// <summary>
/// The amount of time to allow a connection to have no observed traffic before considering it idle.
/// </summary>
///
public uint ConnectionIdleTimeoutMilliseconds { get; }

/// <summary>
/// Indicates whether this <see cref="AmqpConnectionScope"/> has been disposed.
/// </summary>
Expand Down Expand Up @@ -204,6 +203,7 @@ public bool IsDisposed
/// <param name="credential">The credential to use for authorization with the Event Hubs service.</param>
/// <param name="transport">The transport to use for communication.</param>
/// <param name="proxy">The proxy, if any, to use for communication.</param>
/// <param name="idleTimeout">The amount of time to allow a connection to have no observed traffic before considering it idle.</param>
/// <param name="identifier">The identifier to assign this scope; if not provided, one will be generated.</param>
/// <param name="sendBufferSizeBytes">The size, in bytes, of the buffer to use for sending via the transport.</param>
/// <param name="receiveBufferSizeBytes">The size, in bytes, of the buffer to use for receiving from the transport.</param>
Expand All @@ -215,6 +215,7 @@ public AmqpConnectionScope(Uri serviceEndpoint,
EventHubTokenCredential credential,
EventHubsTransportType transport,
IWebProxy proxy,
TimeSpan idleTimeout,
string identifier = default,
int sendBufferSizeBytes = AmqpConstants.TransportBufferSize,
int receiveBufferSizeBytes = AmqpConstants.TransportBufferSize,
Expand All @@ -224,13 +225,15 @@ public AmqpConnectionScope(Uri serviceEndpoint,
Argument.AssertNotNull(connectionEndpoint, nameof(connectionEndpoint));
Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
Argument.AssertNotNull(credential, nameof(credential));
Argument.AssertNotNegative(idleTimeout, nameof(idleTimeout));
ValidateTransport(transport);

ServiceEndpoint = serviceEndpoint;
ConnectionEndpoint = connectionEndpoint;
EventHubName = eventHubName;
Transport = transport;
Proxy = proxy;
ConnectionIdleTimeoutMilliseconds = (uint)idleTimeout.TotalMilliseconds;
SendBufferSizeInBytes = sendBufferSizeBytes;
ReceiveBufferSizeInBytes = receiveBufferSizeBytes;
CertificateValidationCallback = certificateValidationCallback;
Expand Down Expand Up @@ -494,7 +497,7 @@ protected virtual async Task<AmqpConnection> CreateAndOpenConnectionAsync(Versio
try
{
var amqpSettings = CreateAmpqSettings(AmqpVersion);
var connectionSetings = CreateAmqpConnectionSettings(serviceEndpoint.Host, scopeIdentifier);
var connectionSetings = CreateAmqpConnectionSettings(serviceEndpoint.Host, scopeIdentifier, ConnectionIdleTimeoutMilliseconds);

var transportSettings = transportType.IsWebSocketTransport()
? CreateTransportSettingsForWebSockets(connectionEndpoint.Host, proxy, sendBufferSizeBytes, receiveBufferSizeBytes)
Expand Down Expand Up @@ -1186,15 +1189,17 @@ private static TransportSettings CreateTransportSettingsForWebSockets(string hos
///
/// <param name="hostName">The host name of the Event Hubs service endpoint.</param>
/// <param name="identifier">unique identifier of the current Event Hubs scope.</param>
/// <param name="idleTimeoutMilliseconds">The amount of time, in milliseconds, to allow a connection to have no observed traffic before considering it idle.</param>
///
/// <returns>The settings to apply to the connection.</returns>
///
private static AmqpConnectionSettings CreateAmqpConnectionSettings(string hostName,
string identifier)
string identifier,
uint idleTimeoutMilliseconds)
{
var connectionSettings = new AmqpConnectionSettings
{
IdleTimeOut = (uint)ConnectionIdleTimeout.TotalMilliseconds,
IdleTimeOut = idleTimeoutMilliseconds,
MaxFrameSize = AmqpConstants.DefaultMaxFrameSize,
ContainerId = identifier,
HostName = hostName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using Azure.Core;

namespace Azure.Messaging.EventHubs
{
Expand All @@ -16,6 +17,9 @@ namespace Azure.Messaging.EventHubs
///
public class EventHubConnectionOptions
{
// <summary>The amount of time to allow a connection to have no observed traffic before considering it idle.</summary>
private TimeSpan _connectionIdleTimeout = TimeSpan.FromMinutes(1);

/// <summary>
/// The type of protocol and transport that will be used for communicating with the Event Hubs
/// service.
Expand All @@ -25,6 +29,32 @@ public class EventHubConnectionOptions
///
public EventHubsTransportType TransportType { get; set; } = EventHubsTransportType.AmqpTcp;

/// <summary>
/// The amount of time to allow a connection to have no observed traffic before considering
/// it idle and eligible to close.
/// </summary>
///
/// <value>The default idle timeout is 60 seconds. The timeout must be a positive value.</value>
///
/// <remarks>
/// If a connection is closed due to being idle, the Event Hubs clients will automatically
/// reopen the connection when it is needed for a network operation. An idle connection
/// being closed does not cause client errors or interfere with normal operation.
///
/// It is recommended to use the default value unless your application has special needs and
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this line break need a <para tag, or will it be automatically preserved by the docs processing?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some really awesome human fixed the doc transform tools so that a blank line between two lines of text will automatically break into paragraphs. 😄

/// you've tested the impact of changing the idle timeout.
/// </remarks>
///
public TimeSpan ConnectionIdleTimeout
{
get => _connectionIdleTimeout;
set
{
Argument.AssertNotNegative(value, nameof(ConnectionIdleTimeout));
_connectionIdleTimeout = value;
}
}

/// <summary>
/// The size of the buffer used for sending information via the active transport.
/// </summary>
Expand Down
Loading