Skip to content

Commit af3b945

Browse files
committed
事件总线异步处理支持取消令牌,避免长时间阻塞操作无法被取消
1 parent 64a66b5 commit af3b945

File tree

3 files changed

+43
-32
lines changed

3 files changed

+43
-32
lines changed

NewLife.Core/Caching/QueueEventBus.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ protected virtual void Init()
3232
/// <summary>发布消息到消息队列</summary>
3333
/// <param name="event">事件</param>
3434
/// <param name="context">上下文</param>
35-
public override Task<Int32> PublishAsync(TEvent @event, IEventContext<TEvent>? context = null)
35+
/// <param name="cancellationToken">取消令牌</param>
36+
public override Task<Int32> PublishAsync(TEvent @event, IEventContext<TEvent>? context = null, CancellationToken cancellationToken = default)
3637
{
3738
Init();
3839
var rs = _queue.Add(@event);
@@ -73,7 +74,7 @@ protected virtual async Task ConsumeMessage(CancellationTokenSource source)
7374
if (msg != null)
7475
{
7576
// 发布到事件总线
76-
await base.PublishAsync(msg).ConfigureAwait(false);
77+
await DispatchAsync(msg, null, cancellationToken).ConfigureAwait(false);
7778
}
7879
else
7980
{

NewLife.Core/Messaging/IEventBus.cs

+39-29
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
#if !NET45
2-
using TaskEx = System.Threading.Tasks.Task;
3-
#endif
4-
5-
using System.Collections.Concurrent;
1+
using System.Collections.Concurrent;
62
using NewLife.Collections;
73
using NewLife.Data;
4+
#if !NET45
5+
using TaskEx = System.Threading.Tasks.Task;
6+
#endif
87

98
namespace NewLife.Messaging;
109

@@ -24,13 +23,14 @@ namespace NewLife.Messaging;
2423
/// <typeparam name="TEvent"></typeparam>
2524
public interface IEventBus<TEvent>
2625
{
27-
/// <summary>发布消息</summary>
26+
/// <summary>发布事件</summary>
2827
/// <param name="event">事件</param>
2928
/// <param name="context">上下文</param>
30-
Task<Int32> PublishAsync(TEvent @event, IEventContext<TEvent>? context = null);
29+
/// <param name="cancellationToken">取消令牌</param>
30+
Task<Int32> PublishAsync(TEvent @event, IEventContext<TEvent>? context = null, CancellationToken cancellationToken = default);
3131

32-
/// <summary>订阅消息</summary>
33-
/// <param name="handler">处理器</param>
32+
/// <summary>订阅事件</summary>
33+
/// <param name="handler">事件处理器</param>
3434
/// <param name="clientId">客户标识。每个客户只能订阅一次,重复订阅将会挤掉前一次订阅</param>
3535
Boolean Subscribe(IEventHandler<TEvent> handler, String clientId = "");
3636

@@ -44,21 +44,30 @@ public interface IEventBus<TEvent>
4444
public interface IEventHandler<TEvent>
4545
{
4646
/// <summary>处理事件</summary>
47-
/// <param name="event"></param>
48-
/// <param name="context"></param>
47+
/// <param name="event">事件</param>
48+
/// <param name="context">上下文</param>
49+
/// <param name="cancellationToken">取消令牌</param>
4950
/// <returns></returns>
50-
Task HandleAsync(TEvent @event, IEventContext<TEvent>? context);
51+
Task HandleAsync(TEvent @event, IEventContext<TEvent>? context, CancellationToken cancellationToken);
5152
}
5253

5354
/// <summary>事件总线</summary>
5455
public class EventBus<TEvent> : DisposeBase, IEventBus<TEvent>
5556
{
56-
private ConcurrentDictionary<String, IEventHandler<TEvent>> _handlers = [];
57+
private readonly ConcurrentDictionary<String, IEventHandler<TEvent>> _handlers = [];
5758

58-
/// <summary>发布消息</summary>
59+
/// <summary>发布事件</summary>
5960
/// <param name="event">事件</param>
6061
/// <param name="context">上下文</param>
61-
public virtual async Task<Int32> PublishAsync(TEvent @event, IEventContext<TEvent>? context = null)
62+
/// <param name="cancellationToken">取消令牌</param>
63+
public virtual Task<Int32> PublishAsync(TEvent @event, IEventContext<TEvent>? context = null, CancellationToken cancellationToken = default) => DispatchAsync(@event, context, cancellationToken);
64+
65+
/// <summary>分发事件给各个处理器。进程内分发</summary>
66+
/// <param name="event"></param>
67+
/// <param name="context"></param>
68+
/// <param name="cancellationToken"></param>
69+
/// <returns></returns>
70+
protected virtual async Task<Int32> DispatchAsync(TEvent @event, IEventContext<TEvent>? context, CancellationToken cancellationToken)
6271
{
6372
var rs = 0;
6473

@@ -67,7 +76,7 @@ public virtual async Task<Int32> PublishAsync(TEvent @event, IEventContext<TEven
6776
foreach (var item in _handlers)
6877
{
6978
var handler = item.Value;
70-
await handler.HandleAsync(@event, context).ConfigureAwait(false);
79+
await handler.HandleAsync(@event, context, cancellationToken).ConfigureAwait(false);
7180
rs++;
7281
}
7382

@@ -94,31 +103,31 @@ public static class EventBusExtensions
94103
{
95104
/// <summary>订阅事件</summary>
96105
/// <typeparam name="TEvent"></typeparam>
97-
/// <param name="bus"></param>
98-
/// <param name="action"></param>
106+
/// <param name="bus">事件总线</param>
107+
/// <param name="action">事件处理方法</param>
99108
/// <param name="clientId">客户标识。每个客户只能订阅一次,重复订阅将会挤掉前一次订阅</param>
100109
public static void Subscribe<TEvent>(this IEventBus<TEvent> bus, Action<TEvent> action, String clientId = "") => bus.Subscribe(new DelegateEventHandler<TEvent>(action), clientId);
101110

102111
/// <summary>订阅事件</summary>
103112
/// <typeparam name="TEvent"></typeparam>
104-
/// <param name="bus"></param>
105-
/// <param name="action"></param>
113+
/// <param name="bus">事件总线</param>
114+
/// <param name="action">事件处理方法</param>
106115
/// <param name="clientId">客户标识。每个客户只能订阅一次,重复订阅将会挤掉前一次订阅</param>
107116
public static void Subscribe<TEvent>(this IEventBus<TEvent> bus, Action<TEvent, IEventContext<TEvent>> action, String clientId = "") => bus.Subscribe(new DelegateEventHandler<TEvent>(action), clientId);
108117

109118
/// <summary>订阅事件</summary>
110119
/// <typeparam name="TEvent"></typeparam>
111-
/// <param name="bus"></param>
112-
/// <param name="action"></param>
120+
/// <param name="bus">事件总线</param>
121+
/// <param name="action">事件处理方法</param>
113122
/// <param name="clientId">客户标识。每个客户只能订阅一次,重复订阅将会挤掉前一次订阅</param>
114123
public static void Subscribe<TEvent>(this IEventBus<TEvent> bus, Func<TEvent, Task> action, String clientId = "") => bus.Subscribe(new DelegateEventHandler<TEvent>(action), clientId);
115124

116125
/// <summary>订阅事件</summary>
117126
/// <typeparam name="TEvent"></typeparam>
118-
/// <param name="bus"></param>
119-
/// <param name="action"></param>
127+
/// <param name="bus">事件总线</param>
128+
/// <param name="action">事件处理方法</param>
120129
/// <param name="clientId">客户标识。每个客户只能订阅一次,重复订阅将会挤掉前一次订阅</param>
121-
public static void Subscribe<TEvent>(this IEventBus<TEvent> bus, Func<TEvent, IEventContext<TEvent>, Task> action, String clientId = "") => bus.Subscribe(new DelegateEventHandler<TEvent>(action), clientId);
130+
public static void Subscribe<TEvent>(this IEventBus<TEvent> bus, Func<TEvent, IEventContext<TEvent>, CancellationToken, Task> action, String clientId = "") => bus.Subscribe(new DelegateEventHandler<TEvent>(action), clientId);
122131
}
123132

124133
/// <summary>事件上下文接口</summary>
@@ -151,14 +160,15 @@ public class EventContext<TEvent>(IEventBus<TEvent> bus) : IEventContext<TEvent>
151160
public class DelegateEventHandler<TEvent>(Delegate method) : IEventHandler<TEvent>
152161
{
153162
/// <summary>处理事件</summary>
154-
/// <param name="event"></param>
155-
/// <param name="context"></param>
163+
/// <param name="event">事件</param>
164+
/// <param name="context">上下文</param>
165+
/// <param name="cancellationToken">取消令牌</param>
156166
/// <returns></returns>
157167
/// <exception cref="NotSupportedException"></exception>
158-
public Task HandleAsync(TEvent @event, IEventContext<TEvent>? context)
168+
public Task HandleAsync(TEvent @event, IEventContext<TEvent>? context, CancellationToken cancellationToken = default)
159169
{
160170
if (method is Func<TEvent, Task> func) return func(@event);
161-
if (method is Func<TEvent, IEventContext<TEvent>?, Task> func2) return func2(@event, context);
171+
if (method is Func<TEvent, IEventContext<TEvent>?, CancellationToken, Task> func2) return func2(@event, context, cancellationToken);
162172

163173
if (method is Action<TEvent> act)
164174
act(@event);

XUnitTest.Core/Messaging/EventBusTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ private class TestEventHandler : IEventHandler<TestEvent>
1111
{
1212
public String HandledMessage { get; private set; } = String.Empty;
1313

14-
public Task HandleAsync(TestEvent @event, IEventContext<TestEvent> context)
14+
public Task HandleAsync(TestEvent @event, IEventContext<TestEvent> context, CancellationToken cancellationToken)
1515
{
1616
HandledMessage = @event.Message;
1717
return Task.CompletedTask;

0 commit comments

Comments
 (0)