Skip to content

feat(events): 添加增强事件总线支持过滤器和统计功能#83

Merged
GeWuYou merged 3 commits into
mainfrom
feat/events-enhanced-bus
Mar 6, 2026
Merged

feat(events): 添加增强事件总线支持过滤器和统计功能#83
GeWuYou merged 3 commits into
mainfrom
feat/events-enhanced-bus

Conversation

@GeWuYou

@GeWuYou GeWuYou commented Mar 6, 2026

Copy link
Copy Markdown
Owner
  • 实现 EnhancedEventBus 支持过滤器、弱引用事件和统计功能
  • 添加 FilterableEvent 和 WeakEvent 支持事件过滤和弱引用订阅
  • 实现 PredicateEventFilter 和 SamplingEventFilter 事件过滤器
  • 添加 EventStatistics 统计事件发布、处理和失败次数
  • 实现完整的单元测试验证过滤器和统计功能

Summary by Sourcery

引入增强版事件总线,支持可过滤事件与弱事件,并可选地对事件发布与处理进行统计。

新特性:

  • 新增 EnhancedEventBus,支持标准、优先级、可过滤和弱事件的发布与订阅,并提供可选的统计功能。
  • 引入可过滤事件,支持可插拔的 IEventFilter 实现,包括基于谓词的过滤器和采样过滤器。
  • 新增事件统计抽象及实现,用于跟踪发布/处理/失败计数和监听器指标,并支持生成统计报告。

测试:

  • 新增全面的单元测试覆盖弱事件行为,包括与 GC 的交互、清理、注销以及与统计功能的集成。
  • 新增基于谓词和采样事件过滤器的测试,包括多个过滤器、移除/清空行为以及对无效配置的校验。
  • 新增事件统计测试,用于验证计数器、按事件类型的指标、重置行为、报告输出以及线程安全性。
Original summary in English

Summary by Sourcery

Introduce an enhanced event bus with filterable and weak events plus optional statistics for event publishing and handling.

New Features:

  • Add EnhancedEventBus supporting standard, priority, filterable, and weak event publishing and subscription with optional statistics.
  • Introduce filterable events with pluggable IEventFilter implementations, including predicate-based and sampling filters.
  • Add event statistics abstraction and implementation to track publish/handle/failure counts and listener metrics, with report generation.

Tests:

  • Add comprehensive unit tests covering weak event behavior, including GC interaction, cleanup, unregistering, and statistics integration.
  • Add tests for predicate and sampling event filters, including multiple filters, removal/clearing behavior, and validation of invalid configuration.
  • Add tests for event statistics to verify counters, per-type metrics, reset behavior, reporting output, and thread safety.

- 实现 EnhancedEventBus 支持过滤器、弱引用事件和统计功能
- 添加 FilterableEvent 和 WeakEvent 支持事件过滤和弱引用订阅
- 实现 PredicateEventFilter 和 SamplingEventFilter 事件过滤器
- 添加 EventStatistics 统计事件发布、处理和失败次数
- 实现完整的单元测试验证过滤器和统计功能
@deepsource-io

deepsource-io Bot commented Mar 6, 2026

Copy link
Copy Markdown

DeepSource Code Review

We reviewed changes in 1b92bda...c5bd08a on this pull request. Below is the summary for the review, and you can see the individual issues we found as inline review comments.

See full review on DeepSource ↗

PR Report Card

Overall Grade   Security  

Reliability  

Complexity  

Hygiene  

Code Review Summary

Analyzer Status Updated (UTC) Details
C# Mar 6, 2026 8:58a.m. Review ↗
Secrets Mar 6, 2026 8:58a.m. Review ↗

@sourcery-ai

sourcery-ai Bot commented Mar 6, 2026

Copy link
Copy Markdown

评审者指南

引入一个 EnhancedEventBus,在现有的 EasyEvents/PriorityEvent 基础设施之上分层支持可过滤事件和弱事件,增加可插拔事件过滤器,并集中进行事件统计跟踪,同时为弱事件、过滤器和统计提供全面的单元测试。

发送带谓词过滤器与采样过滤器并带统计信息的可过滤事件的序列图

sequenceDiagram
    participant Client
    participant EnhancedEventBus
    participant FilterableEvent_T_
    participant PredicateEventFilter_T_
    participant SamplingEventFilter_T_
    participant EventStatistics
    participant Listener

    Client->>EnhancedEventBus: RegisterFilterable_T_(onEvent)
    EnhancedEventBus->>FilterableEvent_T_: GetOrAdd TestEvent
    FilterableEvent_T_->>EventStatistics: UpdateListenerCount(TestEvent)
    FilterableEvent_T_-->>EnhancedEventBus: IUnRegister
    EnhancedEventBus-->>Client: IUnRegister

    Client->>EnhancedEventBus: AddFilter_T_(PredicateEventFilter_T_)
    EnhancedEventBus->>FilterableEvent_T_: AddFilter(predicateFilter)
    Client->>EnhancedEventBus: AddFilter_T_(SamplingEventFilter_T_)
    EnhancedEventBus->>FilterableEvent_T_: AddFilter(samplingFilter)

    Client->>EnhancedEventBus: SendFilterable_T_(data)
    EnhancedEventBus->>FilterableEvent_T_: Trigger(data)
    FilterableEvent_T_->>EventStatistics: RecordPublish(TestEvent)

    %% Apply filters
    FilterableEvent_T_->>PredicateEventFilter_T_: ShouldFilter(data)
    alt PredicateFilter blocks
        PredicateEventFilter_T_-->>FilterableEvent_T_: true
        FilterableEvent_T_-->>EnhancedEventBus: return (event filtered)
    else PredicateFilter passes
        PredicateEventFilter_T_-->>FilterableEvent_T_: false
        FilterableEvent_T_->>SamplingEventFilter_T_: ShouldFilter(data)
        alt SamplingFilter blocks
            SamplingEventFilter_T_-->>FilterableEvent_T_: true
            FilterableEvent_T_-->>EnhancedEventBus: return (event sampled out)
        else SamplingFilter passes
            SamplingEventFilter_T_-->>FilterableEvent_T_: false
            FilterableEvent_T_->>Listener: onEvent(data)
            Listener-->>FilterableEvent_T_: handled
            FilterableEvent_T_->>EventStatistics: RecordHandle()
        end
    end
Loading

发送带自动清理与统计信息的弱事件的序列图

sequenceDiagram
    participant Client
    participant EnhancedEventBus
    participant WeakEvent_T_
    participant EventStatistics
    participant AliveListener
    participant DeadListener

    Client->>EnhancedEventBus: RegisterWeak_T_(DeadListener.OnEvent)
    EnhancedEventBus->>WeakEvent_T_: Register(DeadListener)
    WeakEvent_T_->>EventStatistics: UpdateListenerCount(TestEvent)

    Client->>EnhancedEventBus: RegisterWeak_T_(AliveListener.OnEvent)
    EnhancedEventBus->>WeakEvent_T_: Register(AliveListener)
    WeakEvent_T_->>EventStatistics: UpdateListenerCount(TestEvent)

    Note over DeadListener: DeadListener 变为不可达
    Client->>Client: GC.Collect()

    Client->>EnhancedEventBus: SendWeak_T_(data)
    EnhancedEventBus->>WeakEvent_T_: Trigger(data)
    WeakEvent_T_->>EventStatistics: RecordPublish(TestEvent)

    WeakEvent_T_->>WeakEvent_T_: Collect alive handlers
    WeakEvent_T_->>DeadListener: TryGetTarget()
    DeadListener-->>WeakEvent_T_: collected (removed)
    WeakEvent_T_->>AliveListener: TryGetTarget()
    AliveListener-->>WeakEvent_T_: alive

    WeakEvent_T_->>AliveListener: OnEvent(data)
    AliveListener-->>WeakEvent_T_: handled
    WeakEvent_T_->>EventStatistics: RecordHandle()
    WeakEvent_T_->>EventStatistics: UpdateListenerCount(TestEvent)
Loading

带过滤器、弱事件和统计信息的 EnhancedEventBus 类图

classDiagram
    class EnhancedEventBus {
        -EasyEvents _mEvents
        -ConcurrentDictionary~Type, object~ _mFilterableEvents
        -EasyEvents _mPriorityEvents
        -ConcurrentDictionary~Type, object~ _mWeakEvents
        -EventStatistics _statistics
        +EnhancedEventBus(enableStatistics bool)
        +IEventStatistics Statistics
        +void Send_T_()
        +void Send_T_(e T)
        +void Send_T_(e T, propagation EventPropagation)
        +IUnRegister Register_T_(onEvent Action_T_)
        +IUnRegister Register_T_(onEvent Action_T_, priority int)
        +void UnRegister_T_(onEvent Action_T_)
        +IUnRegister RegisterWithContext_T_(onEvent Action_EventContext_T__)
        +IUnRegister RegisterWithContext_T_(onEvent Action_EventContext_T__, priority int)
        +void SendFilterable_T_(e T)
        +IUnRegister RegisterFilterable_T_(onEvent Action_T_)
        +void AddFilter_T_(filter IEventFilter_T_)
        +void RemoveFilter_T_(filter IEventFilter_T_)
        +void ClearFilters_T_()
        +void SendWeak_T_(e T)
        +IUnRegister RegisterWeak_T_(onEvent Action_T_)
        +void CleanupWeak_T_()
    }

    class FilterableEvent_T_ {
        -List~IEventFilter_T_~ _filters
        -object _lock
        -EventStatistics _statistics
        -Action_T_ _onEvent
        +FilterableEvent_T_(statistics EventStatistics)
        +IUnRegister Register(onEvent Action_T_)
        +void UnRegister(onEvent Action_T_)
        +void Trigger(data T)
        +void AddFilter(filter IEventFilter_T_)
        +void RemoveFilter(filter IEventFilter_T_)
        +void ClearFilters()
        +int GetListenerCount()
    }

    class WeakEvent_T_ {
        -object _lock
        -EventStatistics _statistics
        -List~WeakReference~Action_T_~~ _weakHandlers
        +WeakEvent_T_(statistics EventStatistics)
        +IUnRegister Register(onEvent Action_T_)
        +void UnRegister(onEvent Action_T_)
        +void Trigger(data T)
        +void Cleanup()
        +int GetListenerCount()
    }

    class EventStatistics {
        -Dictionary~string, int~ _listenerCountByType
        -Dictionary~string, long~ _publishCountByType
        -object _lock
        -int _activeEventTypes
        -int _activeListeners
        -long _totalFailed
        -long _totalHandled
        -long _totalPublished
        +long TotalPublished
        +long TotalHandled
        +long TotalFailed
        +int ActiveEventTypes
        +int ActiveListeners
        +long GetPublishCount(eventType string)
        +int GetListenerCount(eventType string)
        +void Reset()
        +string GenerateReport()
        +void RecordPublish(eventType string)
        +void RecordHandle()
        +void RecordFailure()
        +void UpdateListenerCount(eventType string, count int)
    }

    class IEventStatistics {
        <<interface>>
        +long TotalPublished
        +long TotalHandled
        +long TotalFailed
        +int ActiveEventTypes
        +int ActiveListeners
        +long GetPublishCount(eventType string)
        +int GetListenerCount(eventType string)
        +void Reset()
        +string GenerateReport()
    }

    class IEventFilter_T_ {
        <<interface>>
        +bool ShouldFilter(eventData T)
    }

    class PredicateEventFilter_T_ {
        -Func_T_, bool_ _predicate
        +PredicateEventFilter_T_(predicate Func_T_, bool_)
        +bool ShouldFilter(eventData T)
    }

    class SamplingEventFilter_T_ {
        -double _samplingRate
        -long _counter
        +SamplingEventFilter_T_(samplingRate double)
        +bool ShouldFilter(eventData T)
    }

    class IUnRegister {
        <<interface>>
        +void UnRegister()
    }

    class DefaultUnRegister {
        -Action _onUnRegister
        +DefaultUnRegister(onUnRegister Action)
        +void UnRegister()
    }

    class IEventBus {
        <<interface>>
    }

    class EasyEvents
    class Event_T_
    class PriorityEvent_T_
    class EventContext_T_

    EnhancedEventBus ..|> IEventBus
    EventStatistics ..|> IEventStatistics

    EnhancedEventBus --> EventStatistics : uses
    EnhancedEventBus --> FilterableEvent_T_ : manages
    EnhancedEventBus --> WeakEvent_T_ : manages
    EnhancedEventBus --> EasyEvents : uses
    EnhancedEventBus --> PriorityEvent_T_ : uses

    FilterableEvent_T_ --> IEventFilter_T_ : aggregates
    FilterableEvent_T_ --> EventStatistics : updates
    WeakEvent_T_ --> EventStatistics : updates

    PredicateEventFilter_T_ ..|> IEventFilter_T_
    SamplingEventFilter_T_ ..|> IEventFilter_T_

    FilterableEvent_T_ --> IUnRegister : returns
    WeakEvent_T_ --> IUnRegister : returns
    DefaultUnRegister ..|> IUnRegister

    EasyEvents --> Event_T_ : manages
    EasyEvents --> PriorityEvent_T_ : manages
    PriorityEvent_T_ --> EventContext_T_
Loading

文件级变更

Change Details Files
添加 EnhancedEventBus 实现,将现有事件基础设施与新的可过滤 / 弱事件能力以及统计功能组合在一起。
  • 使用新的密封类 EnhancedEventBus 封装基于 EasyEvents/PriorityEvent 的现有 API(Send、Register 以及优先级/上下文变体),并实现 IEventBus。
  • 为可过滤事件和弱事件维护基于 ConcurrentDictionary 的注册表,以事件类型为键,确保线程安全的 GetOrAdd 语义。
  • 通过构造函数参数暴露可选的 IEventStatistics,并将统计实例传递给可过滤事件和弱事件实例。
  • 为可过滤事件提供专用 API(SendFilterable/RegisterFilterable/AddFilter/RemoveFilter/ClearFilters),为弱事件提供专用 API(SendWeak/RegisterWeak/CleanupWeak)。
GFramework.Core/events/EnhancedEventBus.cs
引入 EventStatistics,以线程安全的方式跟踪发布/处理/失败计数以及监听者度量,并通过 IEventStatistics 暴露出来。
  • 定义 IEventStatistics 接口,暴露总计数、活动数量、按类型统计、重置以及报告生成方法。
  • 使用 Interlocked 实现 EventStatistics 的原子计数器,并通过受锁保护的字典维护每种事件类型的发布次数和监听者数量。
  • 添加 RecordPublish/RecordHandle/RecordFailure/UpdateListenerCount 辅助方法供事件类型调用,以及用于诊断的格式化 GenerateReport。
  • 确保 Reset 将所有计数清零并清空按类型字典,同时将 ActiveEventTypes/ActiveListeners 作为原子属性暴露(当前仅作为数值持有者)。
GFramework.Core.Abstractions/events/IEventStatistics.cs
GFramework.Core/events/EventStatistics.cs
添加与统计集成的泛型 FilterableEvent 包装器,以支持事件过滤器。
  • 维护一个多播委托形式的处理程序集合,以及一个 IEventFilter<T> 列表,并通过锁来保证并发修改的安全性。
  • 在 Trigger 时,先记录一次发布事件,然后依次评估所有过滤器(若有任一 ShouldFilter 返回 true 则短路),再在锁外调用处理程序快照。
  • 在每次处理程序调用时,通过 EventStatistics 记录处理/失败事件,并将处理程序抛出的异常重新抛给调用方。
  • 在添加或移除处理程序时,更新 EventStatistics 中该类型的监听者数量,并通过 GetListenerCount 暴露当前监听者数以供外部查看。
  • 为事件实例提供 AddFilter/RemoveFilter/ClearFilters API 用于管理过滤器。
GFramework.Core/events/FilterableEvent.cs
GFramework.Core.Abstractions/events/IEventFilter.cs
添加泛型 WeakEvent 包装器,以弱引用方式存储处理程序并与统计系统集成。
  • 使用 List<WeakReference<Action<T>>> 存储处理程序,并通过锁实现修改与清理的同步。
  • 在 Register/UnRegister 时,添加/移除弱引用,并更新 EventStatistics 中该类型的监听者数量。
  • 在 Trigger 时,记录一次发布事件,收集存活处理程序并移除已失效引用,在锁外调用这些处理程序,同时跟踪处理/失败统计并重新抛出异常。
  • 提供 Cleanup 方法以主动移除已回收的引用并保持统计同步,并通过 GetListenerCount 查询当前存活处理程序数量。
GFramework.Core/events/WeakEvent.cs
添加基于谓词和采样的具体事件过滤器实现。
  • 实现 PredicateEventFilter<T>,封装 Func<T,bool> 谓词,并将返回 true 解释为“过滤掉”,同时校验谓词非空。
  • 实现 SamplingEventFilter<T>,使用采样率(0.0–1.0)通过基于 Interlocked 的计数器确定性地放行每第 N 个事件;校验采样率边界,并在 0/1 边界情况下处理为永远过滤/永远放行。
GFramework.Core/events/filters/PredicateEventFilter.cs
GFramework.Core/events/filters/SamplingEventFilter.cs
添加覆盖弱事件、事件过滤器和统计行为(包括边界情况与并发)的单元测试。
  • WeakEventTests 验证基本投递、基于 GC 的监听者回收、显式 Cleanup、手动 UnRegister、多监听者扇出、与统计的集成、失败跟踪,以及在 Trigger 期间的自动清理。
  • EventFilterTests 验证基于谓词的过滤、在不同采样率(0、0.5、1)下的采样行为、多过滤器的交互、过滤器的动态移除/清空,以及对于无效谓词/采样率的构造函数校验。
  • EventStatisticsTests 覆盖启用/禁用统计、已发布/已处理/失败事件的基础计数、按类型发布/监听者计数、Reset 语义、报告格式,以及在并发发布场景下的线程安全性。
GFramework.Core.Tests/events/WeakEventTests.cs
GFramework.Core.Tests/events/EventFilterTests.cs
GFramework.Core.Tests/events/EventStatisticsTests.cs

使用提示与命令

与 Sourcery 交互

  • 触发新评审: 在 Pull Request 中评论 @sourcery-ai review
  • 继续讨论: 直接回复 Sourcery 的评审评论。
  • 从评审评论生成 GitHub Issue: 在评审评论下回复,请求 Sourcery 从该评论创建一个 issue。你也可以直接在评审评论下回复 @sourcery-ai issue 来从中创建 issue。
  • 生成 Pull Request 标题: 在 Pull Request 标题的任意位置写入 @sourcery-ai 即可随时生成标题。你也可以在 Pull Request 中评论 @sourcery-ai title 以(重新)生成标题。
  • 生成 Pull Request 摘要: 在 Pull Request 正文任意位置写入 @sourcery-ai summary,即可在该位置生成 PR 摘要。你也可以在 Pull Request 中评论 @sourcery-ai summary 以(重新)生成摘要。
  • 生成评审者指南: 在 Pull Request 中评论 @sourcery-ai guide,即可随时(重新)生成评审者指南。
  • 一键解决所有 Sourcery 评论: 在 Pull Request 中评论 @sourcery-ai resolve,即可将所有 Sourcery 评论标记为已解决。如果你已经处理完所有评论且不希望再看到它们,这会很有用。
  • 撤销所有 Sourcery 评审: 在 Pull Request 中评论 @sourcery-ai dismiss,即可撤销所有现有的 Sourcery 评审。若你希望从一个全新的评审开始,这尤其有用——别忘了随后再评论 @sourcery-ai review 以触发新的评审!

自定义你的使用体验

访问你的 Dashboard 以:

  • 启用或禁用某些评审特性,例如 Sourcery 自动生成的 Pull Request 摘要、评审者指南等。
  • 修改评审语言。
  • 添加、移除或编辑自定义评审说明。
  • 调整其他评审相关设置。

获取帮助

Original review guide in English

Reviewer's Guide

Introduce an EnhancedEventBus that layers filterable and weak events on top of the existing EasyEvents/PriorityEvent infrastructure, adds pluggable event filters, and centralizes event statistics tracking, with comprehensive unit tests for weak events, filters, and statistics.

Sequence diagram for sending a filterable event with predicate and sampling filters plus statistics

sequenceDiagram
    participant Client
    participant EnhancedEventBus
    participant FilterableEvent_T_
    participant PredicateEventFilter_T_
    participant SamplingEventFilter_T_
    participant EventStatistics
    participant Listener

    Client->>EnhancedEventBus: RegisterFilterable_T_(onEvent)
    EnhancedEventBus->>FilterableEvent_T_: GetOrAdd TestEvent
    FilterableEvent_T_->>EventStatistics: UpdateListenerCount(TestEvent)
    FilterableEvent_T_-->>EnhancedEventBus: IUnRegister
    EnhancedEventBus-->>Client: IUnRegister

    Client->>EnhancedEventBus: AddFilter_T_(PredicateEventFilter_T_)
    EnhancedEventBus->>FilterableEvent_T_: AddFilter(predicateFilter)
    Client->>EnhancedEventBus: AddFilter_T_(SamplingEventFilter_T_)
    EnhancedEventBus->>FilterableEvent_T_: AddFilter(samplingFilter)

    Client->>EnhancedEventBus: SendFilterable_T_(data)
    EnhancedEventBus->>FilterableEvent_T_: Trigger(data)
    FilterableEvent_T_->>EventStatistics: RecordPublish(TestEvent)

    %% Apply filters
    FilterableEvent_T_->>PredicateEventFilter_T_: ShouldFilter(data)
    alt PredicateFilter blocks
        PredicateEventFilter_T_-->>FilterableEvent_T_: true
        FilterableEvent_T_-->>EnhancedEventBus: return (event filtered)
    else PredicateFilter passes
        PredicateEventFilter_T_-->>FilterableEvent_T_: false
        FilterableEvent_T_->>SamplingEventFilter_T_: ShouldFilter(data)
        alt SamplingFilter blocks
            SamplingEventFilter_T_-->>FilterableEvent_T_: true
            FilterableEvent_T_-->>EnhancedEventBus: return (event sampled out)
        else SamplingFilter passes
            SamplingEventFilter_T_-->>FilterableEvent_T_: false
            FilterableEvent_T_->>Listener: onEvent(data)
            Listener-->>FilterableEvent_T_: handled
            FilterableEvent_T_->>EventStatistics: RecordHandle()
        end
    end
Loading

Sequence diagram for sending a weak event with auto cleanup and statistics

sequenceDiagram
    participant Client
    participant EnhancedEventBus
    participant WeakEvent_T_
    participant EventStatistics
    participant AliveListener
    participant DeadListener

    Client->>EnhancedEventBus: RegisterWeak_T_(DeadListener.OnEvent)
    EnhancedEventBus->>WeakEvent_T_: Register(DeadListener)
    WeakEvent_T_->>EventStatistics: UpdateListenerCount(TestEvent)

    Client->>EnhancedEventBus: RegisterWeak_T_(AliveListener.OnEvent)
    EnhancedEventBus->>WeakEvent_T_: Register(AliveListener)
    WeakEvent_T_->>EventStatistics: UpdateListenerCount(TestEvent)

    Note over DeadListener: DeadListener becomes unreachable
    Client->>Client: GC.Collect()

    Client->>EnhancedEventBus: SendWeak_T_(data)
    EnhancedEventBus->>WeakEvent_T_: Trigger(data)
    WeakEvent_T_->>EventStatistics: RecordPublish(TestEvent)

    WeakEvent_T_->>WeakEvent_T_: Collect alive handlers
    WeakEvent_T_->>DeadListener: TryGetTarget()
    DeadListener-->>WeakEvent_T_: collected (removed)
    WeakEvent_T_->>AliveListener: TryGetTarget()
    AliveListener-->>WeakEvent_T_: alive

    WeakEvent_T_->>AliveListener: OnEvent(data)
    AliveListener-->>WeakEvent_T_: handled
    WeakEvent_T_->>EventStatistics: RecordHandle()
    WeakEvent_T_->>EventStatistics: UpdateListenerCount(TestEvent)
Loading

Class diagram for EnhancedEventBus with filters, weak events, and statistics

classDiagram
    class EnhancedEventBus {
        -EasyEvents _mEvents
        -ConcurrentDictionary~Type, object~ _mFilterableEvents
        -EasyEvents _mPriorityEvents
        -ConcurrentDictionary~Type, object~ _mWeakEvents
        -EventStatistics _statistics
        +EnhancedEventBus(enableStatistics bool)
        +IEventStatistics Statistics
        +void Send_T_()
        +void Send_T_(e T)
        +void Send_T_(e T, propagation EventPropagation)
        +IUnRegister Register_T_(onEvent Action_T_)
        +IUnRegister Register_T_(onEvent Action_T_, priority int)
        +void UnRegister_T_(onEvent Action_T_)
        +IUnRegister RegisterWithContext_T_(onEvent Action_EventContext_T__)
        +IUnRegister RegisterWithContext_T_(onEvent Action_EventContext_T__, priority int)
        +void SendFilterable_T_(e T)
        +IUnRegister RegisterFilterable_T_(onEvent Action_T_)
        +void AddFilter_T_(filter IEventFilter_T_)
        +void RemoveFilter_T_(filter IEventFilter_T_)
        +void ClearFilters_T_()
        +void SendWeak_T_(e T)
        +IUnRegister RegisterWeak_T_(onEvent Action_T_)
        +void CleanupWeak_T_()
    }

    class FilterableEvent_T_ {
        -List~IEventFilter_T_~ _filters
        -object _lock
        -EventStatistics _statistics
        -Action_T_ _onEvent
        +FilterableEvent_T_(statistics EventStatistics)
        +IUnRegister Register(onEvent Action_T_)
        +void UnRegister(onEvent Action_T_)
        +void Trigger(data T)
        +void AddFilter(filter IEventFilter_T_)
        +void RemoveFilter(filter IEventFilter_T_)
        +void ClearFilters()
        +int GetListenerCount()
    }

    class WeakEvent_T_ {
        -object _lock
        -EventStatistics _statistics
        -List~WeakReference~Action_T_~~ _weakHandlers
        +WeakEvent_T_(statistics EventStatistics)
        +IUnRegister Register(onEvent Action_T_)
        +void UnRegister(onEvent Action_T_)
        +void Trigger(data T)
        +void Cleanup()
        +int GetListenerCount()
    }

    class EventStatistics {
        -Dictionary~string, int~ _listenerCountByType
        -Dictionary~string, long~ _publishCountByType
        -object _lock
        -int _activeEventTypes
        -int _activeListeners
        -long _totalFailed
        -long _totalHandled
        -long _totalPublished
        +long TotalPublished
        +long TotalHandled
        +long TotalFailed
        +int ActiveEventTypes
        +int ActiveListeners
        +long GetPublishCount(eventType string)
        +int GetListenerCount(eventType string)
        +void Reset()
        +string GenerateReport()
        +void RecordPublish(eventType string)
        +void RecordHandle()
        +void RecordFailure()
        +void UpdateListenerCount(eventType string, count int)
    }

    class IEventStatistics {
        <<interface>>
        +long TotalPublished
        +long TotalHandled
        +long TotalFailed
        +int ActiveEventTypes
        +int ActiveListeners
        +long GetPublishCount(eventType string)
        +int GetListenerCount(eventType string)
        +void Reset()
        +string GenerateReport()
    }

    class IEventFilter_T_ {
        <<interface>>
        +bool ShouldFilter(eventData T)
    }

    class PredicateEventFilter_T_ {
        -Func_T_, bool_ _predicate
        +PredicateEventFilter_T_(predicate Func_T_, bool_)
        +bool ShouldFilter(eventData T)
    }

    class SamplingEventFilter_T_ {
        -double _samplingRate
        -long _counter
        +SamplingEventFilter_T_(samplingRate double)
        +bool ShouldFilter(eventData T)
    }

    class IUnRegister {
        <<interface>>
        +void UnRegister()
    }

    class DefaultUnRegister {
        -Action _onUnRegister
        +DefaultUnRegister(onUnRegister Action)
        +void UnRegister()
    }

    class IEventBus {
        <<interface>>
    }

    class EasyEvents
    class Event_T_
    class PriorityEvent_T_
    class EventContext_T_

    EnhancedEventBus ..|> IEventBus
    EventStatistics ..|> IEventStatistics

    EnhancedEventBus --> EventStatistics : uses
    EnhancedEventBus --> FilterableEvent_T_ : manages
    EnhancedEventBus --> WeakEvent_T_ : manages
    EnhancedEventBus --> EasyEvents : uses
    EnhancedEventBus --> PriorityEvent_T_ : uses

    FilterableEvent_T_ --> IEventFilter_T_ : aggregates
    FilterableEvent_T_ --> EventStatistics : updates
    WeakEvent_T_ --> EventStatistics : updates

    PredicateEventFilter_T_ ..|> IEventFilter_T_
    SamplingEventFilter_T_ ..|> IEventFilter_T_

    FilterableEvent_T_ --> IUnRegister : returns
    WeakEvent_T_ --> IUnRegister : returns
    DefaultUnRegister ..|> IUnRegister

    EasyEvents --> Event_T_ : manages
    EasyEvents --> PriorityEvent_T_ : manages
    PriorityEvent_T_ --> EventContext_T_
Loading

File-Level Changes

Change Details Files
Add EnhancedEventBus implementation that composes existing event infrastructure with new filterable/weak-event and statistics capabilities.
  • Wrap existing EasyEvents/PriorityEvent-based APIs (Send, Register, priority/context variants) into a new sealed EnhancedEventBus implementing IEventBus.
  • Maintain ConcurrentDictionary-backed registries for filterable and weak events keyed by event type, ensuring thread-safe GetOrAdd semantics.
  • Expose optional IEventStatistics via constructor flag and pass the statistics instance into filterable and weak event instances.
  • Provide dedicated APIs for filterable events (SendFilterable/RegisterFilterable/AddFilter/RemoveFilter/ClearFilters) and weak events (SendWeak/RegisterWeak/CleanupWeak).
GFramework.Core/events/EnhancedEventBus.cs
Introduce EventStatistics to track publish/handle/failure counts and listener metrics in a thread-safe way, surfaced via IEventStatistics.
  • Define IEventStatistics interface exposing totals, active counts, per-type metrics, reset, and report generation methods.
  • Implement EventStatistics using Interlocked for atomic counters and lock-protected dictionaries for per-type publish and listener counts.
  • Add helper methods RecordPublish/RecordHandle/RecordFailure/UpdateListenerCount used by event types, and a formatted GenerateReport for diagnostics.
  • Ensure Reset zeros all counters and clears per-type dictionaries, and expose ActiveEventTypes/ActiveListeners as atomic properties (currently only value holders).
GFramework.Core.Abstractions/events/IEventStatistics.cs
GFramework.Core/events/EventStatistics.cs
Add FilterableEvent generic wrapper that supports event filters and integrates with statistics.
  • Maintain a multicast delegate of handlers and a list of IEventFilter guarded by a lock for safe concurrent modifications.
  • On Trigger, record a publish event, evaluate all filters (short-circuit if any ShouldFilter returns true), then invoke a snapshot of handlers outside the lock.
  • On each handler invocation, record handle/failure events with EventStatistics, rethrowing handler exceptions to callers.
  • Update per-type listener counts in EventStatistics whenever handlers are added or removed, and expose GetListenerCount for external inspection.
  • Provide AddFilter/RemoveFilter/ClearFilters APIs for managing filters on the event instance.
GFramework.Core/events/FilterableEvent.cs
GFramework.Core.Abstractions/events/IEventFilter.cs
Add WeakEvent generic wrapper that stores handlers as weak references and integrates with statistics.
  • Store handlers as List<WeakReference<Action>> with lock-based synchronization for modifications and cleanup.
  • On Register/UnRegister, add/remove weak references and update per-type listener counts in EventStatistics.
  • On Trigger, record a publish event, collect live handlers while removing dead references, invoke them outside the lock, tracking handle/failure stats and rethrowing exceptions.
  • Provide Cleanup to proactively remove collected references and keep statistics in sync, and GetListenerCount to query the current live handler count.
GFramework.Core/events/WeakEvent.cs
Add concrete event filters for predicate-based and sampling-based filtering.
  • Implement PredicateEventFilter that wraps a Func<T,bool> predicate and treats true as "filter out"; validate predicate is non-null.
  • Implement SamplingEventFilter that uses a samplingRate (0.0–1.0) to deterministically allow every Nth event via an Interlocked-based counter; validate rate bounds and handle 0/1 edge cases as always-filter/always-pass.
GFramework.Core/events/filters/PredicateEventFilter.cs
GFramework.Core/events/filters/SamplingEventFilter.cs
Add unit tests covering weak events, event filters, and statistics behavior, including edge cases and concurrency.
  • WeakEventTests verify basic delivery, GC-based listener collection, explicit Cleanup, manual UnRegister, multi-listener fan-out, statistics integration, failure tracking, and auto-cleanup during Trigger.
  • EventFilterTests validate predicate-based filtering, sampling behavior at various rates (0, 0.5, 1), interaction of multiple filters, dynamic removal/clearing of filters, and constructor validation for invalid predicates/sampling rates.
  • EventStatisticsTests cover enabling/disabling statistics, basic counters for published/handled/failed events, per-type publish/listener counts, Reset semantics, report formatting, and thread safety under concurrent publishing.
GFramework.Core.Tests/events/WeakEventTests.cs
GFramework.Core.Tests/events/EventFilterTests.cs
GFramework.Core.Tests/events/EventStatisticsTests.cs

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了两个问题,并给出了一些整体性的反馈:

  • 目前 EventStatisticsActiveEventTypesActiveListeners 从未被更新(具体实现类上的 setter 是 public 的但完全没有使用),因此这些属性将始终保持其默认值;建议在 RecordPublish / UpdateListenerCount 内更新它们,并将 setter 设为 private,以保证统计数据的一致性。
  • 新增的统计目前只追踪 FilterableEventWeakEvent 的流程;如果你的目标是提供全局的总线统计信息,你可能还需要在现有使用 Event<T>PriorityEvent<T>Send / Register 重载中集成 EventStatistics,以避免统计数据被低估。
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Currently `EventStatistics`'s `ActiveEventTypes` and `ActiveListeners` are never updated (and the setters on the concrete class are public but unused), so these properties will always stay at their default values; consider updating them inside `RecordPublish`/`UpdateListenerCount` and making the setters private to keep the statistics consistent.
- The new statistics only track `FilterableEvent` and `WeakEvent` flows; if the intent is to provide global bus statistics, you may want to also integrate `EventStatistics` into the existing `Send`/`Register` overloads that use `Event<T>` and `PriorityEvent<T>` to avoid under-reporting.

## Individual Comments

### Comment 1
<location path="GFramework.Core/events/EventStatistics.cs" line_range="15-24" />
<code_context>
+    private int _activeEventTypes;
</code_context>
<issue_to_address>
**issue (bug_risk):** ActiveEventTypes/ActiveListeners fields are never updated based on per-type stats, making them misleading.

These fields are only backed by `_activeEventTypes` and `_activeListeners`, which are never incremented/decremented anywhere in this class (including `UpdateListenerCount`); only `Reset` clears them. That means these properties remain at their default values unless set externally, so they don’t reflect the real aggregate state.

Given you already track `_listenerCountByType` and `_publishCountByType`, consider either computing these values on demand in the property getters from those dictionaries, or updating `_activeEventTypes`/`_activeListeners` inside `UpdateListenerCount` whenever per-type counts change. If neither is needed, it may be clearer to remove these fields and treat the metrics as computed, read-only values.
</issue_to_address>

### Comment 2
<location path="GFramework.Core/events/FilterableEvent.cs" line_range="59-33" />
<code_context>
+    ///     触发事件
+    /// </summary>
+    /// <param name="data">事件数据</param>
+    public void Trigger(T data)
+    {
+        // 记录发布统计
+        _statistics?.RecordPublish(typeof(T).Name);
+
+        // 应用过滤器
+        lock (_lock)
+        {
+            // 事件被过滤,不触发监听器
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Trigger performs multiple locks and runs filter predicates under the lock, which can increase contention and risk reentrancy issues.

`Trigger` currently:
1. Holds `_lock` while executing `_filters.Any(filter => filter.ShouldFilter(data))`, and
2. Reacquires `_lock` to snapshot `_onEvent`.

Executing arbitrary filter logic while holding the lock risks deadlocks/reentrancy if filters call back into this class, and the double locking increases contention.

Instead, under a single lock, snapshot `_filters` and `_onEvent` (e.g. `var filters = _filters.ToArray(); var handlers = _onEvent;`), then release the lock and run filters/handlers on those snapshots. This minimizes the critical section and avoids running user code under the lock while preserving behavior.

Suggested implementation:

```csharp
    /// <summary>
    ///     触发事件
    /// </summary>
    /// <param name="data">事件数据</param>
    public void Trigger(T data)
    {
        // 记录发布统计
        _statistics?.RecordPublish(typeof(T).Name);

        Action<T>? handlers;
        var filtersSnapshot = Array.Empty<IFilter<T>>();

        // 在单个锁中快照过滤器和监听器
        lock (_lock)
        {
            if (_filters.Count > 0)
            {
                filtersSnapshot = _filters.ToArray();
            }

            handlers = _onEvent;
        }

        // 在锁外执行过滤逻辑
        for (var i = 0; i < filtersSnapshot.Length; i++)
        {
            if (filtersSnapshot[i].ShouldFilter(data))
                return;
        }

        // 在锁外执行事件处理程序
        handlers?.Invoke(data);

```

1. Ensure that `IFilter<T>` is the correct interface type for elements stored in `_filters`. If `_filters` uses a different generic interface or concrete type, replace `IFilter<T>` with that type.
2. If `System` or `System.Linq` is not already imported where `Array.Empty<T>()` or `ToArray()` are used, add the appropriate `using` directives at the top of the file:
   - `using System;`
   - `using System.Linq;`
3. If `_filters` is not a collection with `.Count` and `.ToArray()` (e.g. it might be an `IEnumerable<...>`), adjust the snapshot logic accordingly (e.g. `var filtersSnapshot = _filters?.ToArray() ?? Array.Empty<...>();` inside the lock).
</issue_to_address>

Sourcery 对开源项目免费 —— 如果你觉得我们的评审有帮助,请考虑分享给更多人 ✨
帮我变得更有用!请在每条评论上点击 👍 或 👎,我会根据你的反馈改进后续的评审。
Original comment in English

Hey - I've found 2 issues, and left some high level feedback:

  • Currently EventStatistics's ActiveEventTypes and ActiveListeners are never updated (and the setters on the concrete class are public but unused), so these properties will always stay at their default values; consider updating them inside RecordPublish/UpdateListenerCount and making the setters private to keep the statistics consistent.
  • The new statistics only track FilterableEvent and WeakEvent flows; if the intent is to provide global bus statistics, you may want to also integrate EventStatistics into the existing Send/Register overloads that use Event<T> and PriorityEvent<T> to avoid under-reporting.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Currently `EventStatistics`'s `ActiveEventTypes` and `ActiveListeners` are never updated (and the setters on the concrete class are public but unused), so these properties will always stay at their default values; consider updating them inside `RecordPublish`/`UpdateListenerCount` and making the setters private to keep the statistics consistent.
- The new statistics only track `FilterableEvent` and `WeakEvent` flows; if the intent is to provide global bus statistics, you may want to also integrate `EventStatistics` into the existing `Send`/`Register` overloads that use `Event<T>` and `PriorityEvent<T>` to avoid under-reporting.

## Individual Comments

### Comment 1
<location path="GFramework.Core/events/EventStatistics.cs" line_range="15-24" />
<code_context>
+    private int _activeEventTypes;
</code_context>
<issue_to_address>
**issue (bug_risk):** ActiveEventTypes/ActiveListeners fields are never updated based on per-type stats, making them misleading.

These fields are only backed by `_activeEventTypes` and `_activeListeners`, which are never incremented/decremented anywhere in this class (including `UpdateListenerCount`); only `Reset` clears them. That means these properties remain at their default values unless set externally, so they don’t reflect the real aggregate state.

Given you already track `_listenerCountByType` and `_publishCountByType`, consider either computing these values on demand in the property getters from those dictionaries, or updating `_activeEventTypes`/`_activeListeners` inside `UpdateListenerCount` whenever per-type counts change. If neither is needed, it may be clearer to remove these fields and treat the metrics as computed, read-only values.
</issue_to_address>

### Comment 2
<location path="GFramework.Core/events/FilterableEvent.cs" line_range="59-33" />
<code_context>
+    ///     触发事件
+    /// </summary>
+    /// <param name="data">事件数据</param>
+    public void Trigger(T data)
+    {
+        // 记录发布统计
+        _statistics?.RecordPublish(typeof(T).Name);
+
+        // 应用过滤器
+        lock (_lock)
+        {
+            // 事件被过滤,不触发监听器
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Trigger performs multiple locks and runs filter predicates under the lock, which can increase contention and risk reentrancy issues.

`Trigger` currently:
1. Holds `_lock` while executing `_filters.Any(filter => filter.ShouldFilter(data))`, and
2. Reacquires `_lock` to snapshot `_onEvent`.

Executing arbitrary filter logic while holding the lock risks deadlocks/reentrancy if filters call back into this class, and the double locking increases contention.

Instead, under a single lock, snapshot `_filters` and `_onEvent` (e.g. `var filters = _filters.ToArray(); var handlers = _onEvent;`), then release the lock and run filters/handlers on those snapshots. This minimizes the critical section and avoids running user code under the lock while preserving behavior.

Suggested implementation:

```csharp
    /// <summary>
    ///     触发事件
    /// </summary>
    /// <param name="data">事件数据</param>
    public void Trigger(T data)
    {
        // 记录发布统计
        _statistics?.RecordPublish(typeof(T).Name);

        Action<T>? handlers;
        var filtersSnapshot = Array.Empty<IFilter<T>>();

        // 在单个锁中快照过滤器和监听器
        lock (_lock)
        {
            if (_filters.Count > 0)
            {
                filtersSnapshot = _filters.ToArray();
            }

            handlers = _onEvent;
        }

        // 在锁外执行过滤逻辑
        for (var i = 0; i < filtersSnapshot.Length; i++)
        {
            if (filtersSnapshot[i].ShouldFilter(data))
                return;
        }

        // 在锁外执行事件处理程序
        handlers?.Invoke(data);

```

1. Ensure that `IFilter<T>` is the correct interface type for elements stored in `_filters`. If `_filters` uses a different generic interface or concrete type, replace `IFilter<T>` with that type.
2. If `System` or `System.Linq` is not already imported where `Array.Empty<T>()` or `ToArray()` are used, add the appropriate `using` directives at the top of the file:
   - `using System;`
   - `using System.Linq;`
3. If `_filters` is not a collection with `.Count` and `.ToArray()` (e.g. it might be an `IEnumerable<...>`), adjust the snapshot logic accordingly (e.g. `var filtersSnapshot = _filters?.ToArray() ?? Array.Empty<...>();` inside the lock).
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread GFramework.Core/events/EventStatistics.cs Outdated
Comment thread GFramework.Core/events/FilterableEvent.cs
- 在EasyEventGeneric中添加GetListenerCount方法获取监听器数量
- 重新排列EnhancedEventBus中字段顺序以优化内存布局
- 为所有Send方法添加发布统计记录功能
- 实现事件处理器包装以添加统计功能,包括成功处理和失败记录
- 添加监听器数量统计更新机制,自动跟踪注册和注销事件
- 重构FilterableEvent中的过滤逻辑以减少锁持有时间
- 在PriorityEvent中添加GetListenerCount方法获取总监听器数量
- 移除EventStatistics中的过时字段并优化ActiveEventTypes和ActiveListeners计算
- 清理StatisticsEventDecorator中的并发问题和实现装饰器模式
Comment thread GFramework.Core/events/FilterableEvent.cs
- 将优先级事件的传播模式从 Continue 修改为 All
- 确保事件能够正确传递到所有监听器
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant