Skip to content

Commit a972cba

Browse files
committed
Merge branch '2.0' into add-messagepack
2 parents bde559a + e135279 commit a972cba

File tree

7 files changed

+98
-3
lines changed

7 files changed

+98
-3
lines changed

RawRabbit.sln

+7
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RawRabbit.Enrichers.Polly.T
7474
EndProject
7575
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RawRabbit.Enrichers.RetryLater", "src\RawRabbit.Enrichers.RetryLater\RawRabbit.Enrichers.RetryLater.csproj", "{E1816B3D-9C4B-4D08-9537-ACB6806AF690}"
7676
EndProject
77+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RawRabbit.Enrichers.ZeroFormatter", "src\RawRabbit.Enrichers.ZeroFormatter\RawRabbit.Enrichers.ZeroFormatter.csproj", "{B5B01F7F-4592-4F58-9E7B-3C689E76590A}"
78+
EndProject
7779
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RawRabbit.Enrichers.MessagePack", "src\RawRabbit.Enrichers.MessagePack\RawRabbit.Enrichers.MessagePack.csproj", "{56BD6C7D-5AE4-4E2F-BF9D-3DCBB54D2552}"
7880
EndProject
7981
Global
@@ -202,6 +204,10 @@ Global
202204
{E1816B3D-9C4B-4D08-9537-ACB6806AF690}.Debug|Any CPU.Build.0 = Debug|Any CPU
203205
{E1816B3D-9C4B-4D08-9537-ACB6806AF690}.Release|Any CPU.ActiveCfg = Release|Any CPU
204206
{E1816B3D-9C4B-4D08-9537-ACB6806AF690}.Release|Any CPU.Build.0 = Release|Any CPU
207+
{B5B01F7F-4592-4F58-9E7B-3C689E76590A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
208+
{B5B01F7F-4592-4F58-9E7B-3C689E76590A}.Debug|Any CPU.Build.0 = Debug|Any CPU
209+
{B5B01F7F-4592-4F58-9E7B-3C689E76590A}.Release|Any CPU.ActiveCfg = Release|Any CPU
210+
{B5B01F7F-4592-4F58-9E7B-3C689E76590A}.Release|Any CPU.Build.0 = Release|Any CPU
205211
{56BD6C7D-5AE4-4E2F-BF9D-3DCBB54D2552}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
206212
{56BD6C7D-5AE4-4E2F-BF9D-3DCBB54D2552}.Debug|Any CPU.Build.0 = Debug|Any CPU
207213
{56BD6C7D-5AE4-4E2F-BF9D-3DCBB54D2552}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -241,6 +247,7 @@ Global
241247
{8D45F8AC-B65F-4A2B-9153-8A7F3D423575} = {7FCF8D3B-BA55-4C47-AC60-5CEF75418BEB}
242248
{4B4C5936-D61E-4FD8-AEB7-154CEAF84E15} = {2F91E22A-AEBA-4BEF-9A03-C8232830F697}
243249
{E1816B3D-9C4B-4D08-9537-ACB6806AF690} = {7FCF8D3B-BA55-4C47-AC60-5CEF75418BEB}
250+
{B5B01F7F-4592-4F58-9E7B-3C689E76590A} = {7FCF8D3B-BA55-4C47-AC60-5CEF75418BEB}
244251
{56BD6C7D-5AE4-4E2F-BF9D-3DCBB54D2552} = {7FCF8D3B-BA55-4C47-AC60-5CEF75418BEB}
245252
EndGlobalSection
246253
GlobalSection(ExtensibilityGlobals) = postSolution
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFrameworks>netstandard1.6;net451</TargetFrameworks>
5+
<Authors>LordMike</Authors>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="ZeroFormatter" Version="1.6.4" />
10+
</ItemGroup>
11+
12+
<ItemGroup>
13+
<ProjectReference Include="..\RawRabbit\RawRabbit.csproj" />
14+
</ItemGroup>
15+
16+
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using RawRabbit.Instantiation;
2+
using RawRabbit.Serialization;
3+
4+
namespace RawRabbit.Enrichers.ZeroFormatter
5+
{
6+
public static class ZeroFormatterPlugin
7+
{
8+
/// <summary>
9+
/// Replaces the default serializer with ZeroFormatter.
10+
/// </summary>
11+
public static IClientBuilder UseZeroFormatter(this IClientBuilder builder)
12+
{
13+
builder.Register(
14+
pipe: p => { },
15+
ioc: di => di.AddSingleton<ISerializer, ZeroFormatterSerializerWorker>());
16+
return builder;
17+
}
18+
}
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
using System;
2+
using System.Linq;
3+
using System.Reflection;
4+
using RawRabbit.Serialization;
5+
using ZeroFormatter;
6+
7+
namespace RawRabbit.Enrichers.ZeroFormatter
8+
{
9+
internal class ZeroFormatterSerializerWorker : ISerializer
10+
{
11+
public string ContentType => "application/x-zeroformatter";
12+
private readonly MethodInfo _deserializeType;
13+
private readonly MethodInfo _serializeType;
14+
15+
public ZeroFormatterSerializerWorker()
16+
{
17+
_deserializeType = typeof(ZeroFormatterSerializer)
18+
.GetMethod(nameof(ZeroFormatterSerializer.Deserialize), new[] { typeof(byte[]) });
19+
_serializeType = typeof(ZeroFormatterSerializer)
20+
.GetMethods()
21+
.FirstOrDefault(s => s.Name == nameof(ZeroFormatterSerializer.Serialize) && s.ReturnType == typeof(byte[]));
22+
}
23+
24+
public byte[] Serialize(object obj)
25+
{
26+
if (obj == null)
27+
throw new ArgumentNullException();
28+
29+
return (byte[])_serializeType
30+
.MakeGenericMethod(obj.GetType())
31+
.Invoke(null, new[] { obj });
32+
}
33+
34+
public object Deserialize(Type type, byte[] bytes)
35+
{
36+
return _deserializeType.MakeGenericMethod(type)
37+
.Invoke(null, new object[] { bytes });
38+
}
39+
40+
public TType Deserialize<TType>(byte[] bytes)
41+
{
42+
return ZeroFormatterSerializer.Deserialize<TType>(bytes);
43+
}
44+
}
45+
}

src/RawRabbit.Operations.Request/Middleware/ResponseConsumeMiddleware.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ public class ResponseConsumerOptions
2323

2424
public class ResponseConsumeMiddleware : Pipe.Middleware.Middleware
2525
{
26+
protected static readonly ConcurrentDictionary<IBasicConsumer, ConcurrentDictionary<string, TaskCompletionSource<BasicDeliverEventArgs>>> AllResponses =
27+
new ConcurrentDictionary<IBasicConsumer, ConcurrentDictionary<string, TaskCompletionSource<BasicDeliverEventArgs>>>();
28+
2629
protected readonly IConsumerFactory ConsumerFactory;
2730
protected readonly Pipe.Middleware.Middleware ResponsePipe;
2831
private readonly ILog _logger = LogProvider.For<ResponseConsumeMiddleware>();
29-
protected readonly ConcurrentDictionary<IBasicConsumer, ConcurrentDictionary<string, TaskCompletionSource<BasicDeliverEventArgs>>> AllResponses;
3032
protected Func<IPipeContext, ConsumerConfiguration> ResponseConfigFunc;
3133
protected Func<IPipeContext, string> CorrelationidFunc;
3234
protected Func<IPipeContext, bool> DedicatedConsumerFunc;
@@ -38,7 +40,6 @@ public ResponseConsumeMiddleware(IConsumerFactory consumerFactory, IPipeBuilderF
3840
DedicatedConsumerFunc = options?.UseDedicatedConsumer ?? (context => context.GetDedicatedResponseConsumer());
3941
ConsumerFactory = consumerFactory;
4042
ResponsePipe = factory.Create(options.ResponseRecieved);
41-
AllResponses = new ConcurrentDictionary<IBasicConsumer, ConcurrentDictionary<string, TaskCompletionSource<BasicDeliverEventArgs>>>();
4243
}
4344

4445
public override async Task InvokeAsync(IPipeContext context, CancellationToken token)

src/RawRabbit/Channel/ChannelFactory.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public ChannelFactory(IConnectionFactory connectionFactory, RawRabbitConfigurati
8282

8383
_logger.Debug("Connection is recoverable. Waiting for 'Recovery' event to be triggered. ");
8484
var recoverTcs = new TaskCompletionSource<IConnection>();
85-
token.Register(() => recoverTcs.SetCanceled());
85+
token.Register(() => recoverTcs.TrySetCanceled());
8686

8787
EventHandler<EventArgs> completeTask = null;
8888
completeTask = (sender, args) =>

src/RawRabbit/Channel/StaticChannelPool.cs

+7
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,17 @@ private void StartServeChannels()
5353
do
5454
{
5555
_current = _current?.Next ?? Pool.First;
56+
if (_current == null)
57+
{
58+
_logger.Debug("Unable to server channels. Pool empty.");
59+
Monitor.Exit(_workLock);
60+
return;
61+
}
5662
if (_current.Value.IsClosed)
5763
{
5864
Pool.Remove(_current);
5965
if (Pool.Count != 0) continue;
66+
Monitor.Exit(_workLock);
6067
if (Recoverables.Count == 0)
6168
{
6269
throw new ChannelAvailabilityException("No open channels in pool and no recoverable channels");

0 commit comments

Comments
 (0)