Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove CancellationTokenSource from DispatcherChannelBase #1606

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
Expand All @@ -14,11 +13,11 @@ internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency)
{
}

protected override async Task ProcessChannelAsync(CancellationToken token)
protected override async Task ProcessChannelAsync()
{
try
{
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
while (await _reader.WaitToReadAsync().ConfigureAwait(false))
{
while (_reader.TryRead(out WorkStruct work))
{
Expand Down Expand Up @@ -54,7 +53,7 @@ protected override async Task ProcessChannelAsync(CancellationToken token)
}
catch (OperationCanceledException)
{
if (false == token.IsCancellationRequested)
if (false == _reader.Completion.IsCompleted)
{
throw;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
Expand All @@ -14,11 +13,11 @@ internal ConsumerDispatcher(ChannelBase channel, int concurrency)
{
}

protected override async Task ProcessChannelAsync(CancellationToken token)
protected override async Task ProcessChannelAsync()
{
try
{
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
while (await _reader.WaitToReadAsync().ConfigureAwait(false))
{
while (_reader.TryRead(out WorkStruct work))
{
Expand Down Expand Up @@ -60,7 +59,7 @@ await consumer.HandleBasicDeliverAsync(
}
catch (OperationCanceledException)
{
if (false == token.IsCancellationRequested)
if (false == _reader.Completion.IsCompleted)
{
throw;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ namespace RabbitMQ.Client.ConsumerDispatching
#nullable enable
internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, IConsumerDispatcher
{
protected readonly CancellationTokenSource _consumerDispatcherCts = new CancellationTokenSource();
protected readonly CancellationToken _consumerDispatcherToken;

protected readonly ChannelBase _channel;
protected readonly ChannelReader<WorkStruct> _reader;
private readonly ChannelWriter<WorkStruct> _writer;
Expand All @@ -23,7 +20,6 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,

internal ConsumerDispatcherChannelBase(ChannelBase channel, int concurrency)
{
_consumerDispatcherToken = _consumerDispatcherCts.Token;
_channel = channel;
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions
{
Expand All @@ -34,18 +30,17 @@ internal ConsumerDispatcherChannelBase(ChannelBase channel, int concurrency)
_reader = workChannel.Reader;
_writer = workChannel.Writer;

Func<Task> loopStart =
() => ProcessChannelAsync(_consumerDispatcherToken);
Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
{
_worker = Task.Run(loopStart, _consumerDispatcherToken);
_worker = Task.Run(loopStart);
}
else
{
var tasks = new Task[concurrency];
for (int i = 0; i < concurrency; i++)
{
tasks[i] = Task.Run(loopStart, _consumerDispatcherToken);
tasks[i] = Task.Run(loopStart);
}
_worker = Task.WhenAll(tasks);
}
Expand Down Expand Up @@ -122,21 +117,6 @@ public void Quiesce()
_quiesce = true;
}

private bool IsCancellationRequested
{
get
{
try
{
return _consumerDispatcherCts.IsCancellationRequested;
}
catch (ObjectDisposedException)
{
return true;
}
}
}

public void WaitForShutdown()
{
if (_disposed)
Expand All @@ -146,40 +126,37 @@ public void WaitForShutdown()

if (_quiesce)
{
if (IsCancellationRequested)
try
{
try
if (false == _reader.Completion.Wait(TimeSpan.FromSeconds(2)))
{
if (false == _reader.Completion.Wait(TimeSpan.FromSeconds(2)))
{
ESLog.Warn("consumer dispatcher did not shut down in a timely fashion (sync)");
}
if (false == _worker.Wait(TimeSpan.FromSeconds(2)))
{
ESLog.Warn("consumer dispatcher did not shut down in a timely fashion (sync)");
}
ESLog.Warn("consumer dispatcher did not shut down in a timely fashion (sync)");
}
catch (AggregateException aex)
if (false == _worker.Wait(TimeSpan.FromSeconds(2)))
{
AggregateException aexf = aex.Flatten();
bool foundUnexpectedException = false;
foreach (Exception innerAexf in aexf.InnerExceptions)
{
if (false == (innerAexf is OperationCanceledException))
{
foundUnexpectedException = true;
break;
}
}
if (foundUnexpectedException)
ESLog.Warn("consumer dispatcher did not shut down in a timely fashion (sync)");
}
}
catch (AggregateException aex)
{
AggregateException aexf = aex.Flatten();
bool foundUnexpectedException = false;
foreach (Exception innerAexf in aexf.InnerExceptions)
{
if (false == (innerAexf is OperationCanceledException))
{
ESLog.Warn("consumer dispatcher task had unexpected exceptions");
foundUnexpectedException = true;
break;
}
}
catch (OperationCanceledException)
if (foundUnexpectedException)
{
ESLog.Warn("consumer dispatcher task had unexpected exceptions");
}
}
catch (OperationCanceledException)
{
}
}
else
{
Expand Down Expand Up @@ -238,17 +215,15 @@ protected sealed override void ShutdownConsumer(IBasicConsumer consumer, Shutdow
protected override void InternalShutdown()
{
_writer.Complete();
CancelConsumerDispatcherCts();
}

protected override Task InternalShutdownAsync()
{
_writer.Complete();
CancelConsumerDispatcherCts();
return _worker;
}

protected abstract Task ProcessChannelAsync(CancellationToken token);
protected abstract Task ProcessChannelAsync();

protected readonly struct WorkStruct : IDisposable
{
Expand Down Expand Up @@ -334,17 +309,6 @@ protected enum WorkType : byte
ConsumeOk
}

protected void CancelConsumerDispatcherCts()
{
try
{
_consumerDispatcherCts.Cancel();
}
catch (ObjectDisposedException)
{
}
}

protected virtual void Dispose(bool disposing)
{
if (!_disposed)
Expand All @@ -354,8 +318,6 @@ protected virtual void Dispose(bool disposing)
if (disposing)
{
Quiesce();
CancelConsumerDispatcherCts();
_consumerDispatcherCts.Dispose();
}
}
catch
Expand Down