Skip to content

Commit

Permalink
introduce Connection Config class
Browse files Browse the repository at this point in the history
Move ConnectionConfig class to API, update approval test
  • Loading branch information
bollhals authored and lukebakken committed Mar 10, 2022
1 parent 4e18f60 commit fd8c750
Show file tree
Hide file tree
Showing 20 changed files with 299 additions and 162 deletions.
15 changes: 15 additions & 0 deletions projects/RabbitMQ.Client/FrameworkExtension/StringExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;

namespace RabbitMQ
{
#nullable enable
#if NETSTANDARD
internal static class StringExtension
{
public static bool Contains(this string toSearch, string value, StringComparison comparisonType)
{
return toSearch.IndexOf(value, comparisonType) > 0;
}
}
#endif
}
5 changes: 5 additions & 0 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
<Deterministic>true</Deterministic>
</PropertyGroup>

<!-- disable nullable warnings for .NET Standard 2.0 -->
<PropertyGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<NoWarn>$(NoWarn);nullable</NoWarn>
</PropertyGroup>

<Target Name="SetVersionFromConcourseData" AfterTargets="MinVer" Condition="'$(CONCOURSE_PULL_REQUEST_NUMBER)' != ''">
<PropertyGroup>
<PackageVersion>$(MinVerMajor).$(MinVerMinor).$(MinVerPatch)-$(MinVerPreRelease)-pr.$(CONCOURSE_PULL_REQUEST_NUMBER)</PackageVersion>
Expand Down
154 changes: 154 additions & 0 deletions projects/RabbitMQ.Client/client/api/ConnectionConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;

using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
{
#nullable enable
/// <summary>
/// The configuration of a connection.
/// </summary>
public sealed class ConnectionConfig
{
/// <summary>
/// Virtual host to access during this connection.
/// </summary>
public string VirtualHost { get; }

/// <summary>
/// Username to use when authenticating to the server.
/// </summary>
public string UserName { get; }

/// <summary>
/// Password to use when authenticating to the server.
/// </summary>
public string Password { get; internal set; }

/// <summary>
/// SASL auth mechanisms to use.
/// </summary>
public IList<IAuthMechanismFactory> AuthMechanisms { get; }

/// <summary>
/// Dictionary of client properties to be sent to the server.
/// </summary>
public IDictionary<string, object?> ClientProperties { get; }

/// <summary>
/// Default client provided name to be used for connections.
/// </summary>
public string? ClientProvidedName { get; }

/// <summary>
/// Maximum channel number to ask for.
/// </summary>
public ushort MaxChannelCount { get; }
/// <summary>
/// Frame-max parameter to ask for (in bytes).
/// </summary>
public uint MaxFrameSize { get; }

/// <summary>
/// Set to false to make automatic connection recovery not recover topology (exchanges, queues, bindings, etc).
/// </summary>
public bool TopologyRecoveryEnabled { get; }

/// <summary>
/// Amount of time client will wait for before re-trying to recover connection.
/// </summary>
public TimeSpan NetworkRecoveryInterval { get; }

/// <summary>
/// Heartbeat timeout to use when negotiating with the server.
/// </summary>
public TimeSpan HeartbeatInterval { get; }

/// <summary>
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before timing out.
/// </summary>
public TimeSpan ContinuationTimeout { get; }

/// <summary>
/// Amount of time protocol handshake operations are allowed to take before timing out.
/// </summary>

public TimeSpan HandshakeContinuationTimeout { get; }
/// <summary>
/// Timeout setting for connection attempts.
/// </summary>
public TimeSpan RequestedConnectionTimeout { get; }

/// <summary>
/// Set to true will enable an asynchronous consumer dispatcher which is compatible with <see cref="IAsyncBasicConsumer"/>.
/// </summary>
public bool DispatchConsumersAsync { get; }

/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
/// </summary>
public int DispatchConsumerConcurrency { get; }

internal Func<AmqpTcpEndpoint, IFrameHandler> FrameHandlerFactory { get; }

internal ConnectionConfig(string virtualHost, string userName, string password, IList<IAuthMechanismFactory> authMechanisms,
IDictionary<string, object?> clientProperties, string? clientProvidedName,
ushort maxChannelCount, uint maxFrameSize, bool topologyRecoveryEnabled,
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
bool dispatchConsumersAsync, int dispatchConsumerConcurrency,
Func<AmqpTcpEndpoint, IFrameHandler> frameHandlerFactory)
{
VirtualHost = virtualHost;
UserName = userName;
Password = password;
AuthMechanisms = authMechanisms;
ClientProperties = clientProperties;
ClientProvidedName = clientProvidedName;
MaxChannelCount = maxChannelCount;
MaxFrameSize = maxFrameSize;
TopologyRecoveryEnabled = topologyRecoveryEnabled;
NetworkRecoveryInterval = networkRecoveryInterval;
HeartbeatInterval = heartbeatInterval;
ContinuationTimeout = continuationTimeout;
HandshakeContinuationTimeout = handshakeContinuationTimeout;
RequestedConnectionTimeout = requestedConnectionTimeout;
DispatchConsumersAsync = dispatchConsumersAsync;
DispatchConsumerConcurrency = dispatchConsumerConcurrency;
FrameHandlerFactory = frameHandlerFactory;
}
}
}
39 changes: 27 additions & 12 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public TimeSpan HandshakeContinuationTimeout
}

/// <summary>
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
/// timing out.
/// </summary>
public TimeSpan ContinuationTimeout
Expand Down Expand Up @@ -492,32 +492,47 @@ public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints, string cli
/// </exception>
public IConnection CreateConnection(IEndpointResolver endpointResolver, string clientProvidedName)
{
IConnection conn;
ConnectionConfig config = CreateConfig(clientProvidedName);
try
{
if (AutomaticRecoveryEnabled)
{
var autorecoveringConnection = new AutorecoveringConnection(this, clientProvidedName);
autorecoveringConnection.Init(endpointResolver);
conn = autorecoveringConnection;
}
else
{
conn = ((ProtocolBase)Protocols.AMQP_0_9_1).CreateConnection(this, endpointResolver.SelectOne(CreateFrameHandler), clientProvidedName);
return new AutorecoveringConnection(config, endpointResolver);
}

return new Connection(config, endpointResolver.SelectOne(CreateFrameHandler));
}
catch (Exception e)
{
throw new BrokerUnreachableException(e);
}
}

return conn;
private ConnectionConfig CreateConfig(string clientProvidedName)
{
return new ConnectionConfig(
VirtualHost,
UserName,
Password,
AuthMechanisms,
ClientProperties,
clientProvidedName,
RequestedChannelMax,
RequestedFrameMax,
TopologyRecoveryEnabled,
NetworkRecoveryInterval,
RequestedHeartbeat,
ContinuationTimeout,
HandshakeContinuationTimeout,
RequestedConnectionTimeout,
DispatchConsumersAsync,
ConsumerDispatchConcurrency,
CreateFrameHandler);
}

internal IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint)
{
IFrameHandler fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, SocketFactory,
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
IFrameHandler fh = new SocketFrameHandler(endpoint, SocketFactory, RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
return ConfigureFrameHandler(fh);
}

Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/ExternalMechanism.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class ExternalMechanism : IAuthMechanism
/// <summary>
/// Handle one round of challenge-response.
/// </summary>
public byte[] handleChallenge(byte[] challenge, IConnectionFactory factory)
public byte[] handleChallenge(byte[] challenge, ConnectionConfig config)
{
return Array.Empty<byte>();
}
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/IAuthMechanism.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ public interface IAuthMechanism
/// <summary>
/// Handle one round of challenge-response.
/// </summary>
byte[] handleChallenge(byte[] challenge, IConnectionFactory factory);
byte[] handleChallenge(byte[] challenge, ConnectionConfig config);
}
}
2 changes: 0 additions & 2 deletions projects/RabbitMQ.Client/client/api/IProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
{
/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/PlainMechanism.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ namespace RabbitMQ.Client
{
public class PlainMechanism : IAuthMechanism
{
public byte[] handleChallenge(byte[] challenge, IConnectionFactory factory)
public byte[] handleChallenge(byte[] challenge, ConnectionConfig config)
{
return Encoding.UTF8.GetBytes($"\0{factory.UserName}\0{factory.Password}");
return Encoding.UTF8.GetBytes($"\0{config.UserName}\0{config.Password}");
}
}
}
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/framing/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@

using System.Collections.Generic;
using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.client.impl;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client.Framing.Impl
{
internal class Model: ModelBase
{
public Model(bool dispatchAsync, int concurrency, ISession session) : base(dispatchAsync, concurrency, session)
public Model(ConnectionConfig config, ISession session) : base(config, session)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ internal void DeleteRecordedBinding(in RecordedBinding rb)

internal void RecordConsumer(in RecordedConsumer consumer)
{
if (!_factory.TopologyRecoveryEnabled)
if (!_config.TopologyRecoveryEnabled)
{
return;
}
Expand All @@ -189,7 +189,7 @@ internal void RecordConsumer(in RecordedConsumer consumer)

internal void DeleteRecordedConsumer(string consumerTag)
{
if (!_factory.TopologyRecoveryEnabled)
if (!_config.TopologyRecoveryEnabled)
{
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private async Task RecoverConnectionAsync()
bool success;
do
{
await Task.Delay(_factory.NetworkRecoveryInterval, token).ConfigureAwait(false);
await Task.Delay(_config.NetworkRecoveryInterval, token).ConfigureAwait(false);
success = TryPerformAutomaticRecovery();
} while (!success && !token.IsCancellationRequested);
}
Expand Down Expand Up @@ -104,7 +104,7 @@ private void StopRecoveryLoop()
}
RecoveryCancellationTokenSource.Cancel();

Task timeout = Task.Delay(_factory.RequestedConnectionTimeout);
Task timeout = Task.Delay(_config.RequestedConnectionTimeout);
if (Task.WhenAny(task, timeout).Result == timeout)
{
ESLog.Warn("Timeout while trying to stop background AutorecoveringConnection recovery loop.");
Expand Down Expand Up @@ -135,7 +135,7 @@ private bool TryPerformAutomaticRecovery()
lock (_recordedEntitiesLock)
{
ThrowIfDisposed();
if (_factory.TopologyRecoveryEnabled)
if (_config.TopologyRecoveryEnabled)
{
// The recovery sequence is the following:
//
Expand Down Expand Up @@ -176,7 +176,7 @@ private bool TryPerformAutomaticRecovery()
*/
if (_innerConnection?.IsOpen == true)
{
_innerConnection.Abort(Constants.InternalError, "FailedAutoRecovery", _factory.RequestedConnectionTimeout);
_innerConnection.Abort(Constants.InternalError, "FailedAutoRecovery", _config.RequestedConnectionTimeout);
}
}
catch (Exception e2)
Expand All @@ -193,8 +193,8 @@ private bool TryRecoverConnectionDelegate()
try
{
var defunctConnection = _innerConnection;
IFrameHandler fh = _endpoints.SelectOne(_factory.CreateFrameHandler);
_innerConnection = new Connection(_factory, fh, ClientProvidedName);
IFrameHandler fh = _endpoints.SelectOne(_config.FrameHandlerFactory);
_innerConnection = new Connection(_config, fh);
_innerConnection.TakeOver(defunctConnection);
return true;
}
Expand Down Expand Up @@ -312,7 +312,7 @@ private void RecoverModelsAndItsConsumers()
{
foreach (AutorecoveringModel m in _models)
{
m.AutomaticallyRecover(this, _factory.TopologyRecoveryEnabled);
m.AutomaticallyRecover(this, _config.TopologyRecoveryEnabled);
}
}
}
Expand Down
Loading

0 comments on commit fd8c750

Please sign in to comment.