22// The .NET Foundation licenses this file to you under the MIT license.
33
44using System . Diagnostics ;
5+ using Aspire . Dashboard . Utils ;
56
67namespace Aspire . Dashboard . Otlp . Storage ;
78
@@ -10,18 +11,10 @@ public sealed class Subscription : IDisposable
1011{
1112 private static int s_subscriptionId ;
1213
13- private readonly Func < Task > _callback ;
14- private readonly ExecutionContext ? _executionContext ;
15- private readonly TelemetryRepository _telemetryRepository ;
16- private readonly CancellationTokenSource _cts ;
17- private readonly CancellationToken _cancellationToken ;
14+ private readonly CallbackThrottler _callbackThrottler ;
1815 private readonly Action _unsubscribe ;
19- private readonly SemaphoreSlim _lock = new SemaphoreSlim ( 1 , 1 ) ;
20- private ILogger Logger => _telemetryRepository . _otlpContext . Logger ;
2116 private readonly int _subscriptionId = Interlocked . Increment ( ref s_subscriptionId ) ;
2217
23- private DateTime ? _lastExecute ;
24-
2518 public int SubscriptionId => _subscriptionId ;
2619 public ApplicationKey ? ApplicationKey { get ; }
2720 public SubscriptionType SubscriptionType { get ; }
@@ -32,87 +25,18 @@ public Subscription(string name, ApplicationKey? applicationKey, SubscriptionTyp
3225 Name = name ;
3326 ApplicationKey = applicationKey ;
3427 SubscriptionType = subscriptionType ;
35- _callback = callback ;
28+ _callbackThrottler = new CallbackThrottler ( name , telemetryRepository . _otlpContext . Logger , telemetryRepository . _subscriptionMinExecuteInterval , callback , executionContext ) ;
3629 _unsubscribe = unsubscribe ;
37- _executionContext = executionContext ;
38- _telemetryRepository = telemetryRepository ;
39- _cts = new CancellationTokenSource ( ) ;
40- _cancellationToken = _cts . Token ;
41- }
42-
43- private async Task < bool > TryQueueAsync ( CancellationToken cancellationToken )
44- {
45- var success = _lock . Wait ( 0 , cancellationToken ) ;
46- if ( ! success )
47- {
48- Logger . LogDebug ( "Subscription '{Name}' update already queued." , Name ) ;
49- return false ;
50- }
51-
52- try
53- {
54- var lastExecute = _lastExecute ;
55- if ( lastExecute != null )
56- {
57- var minExecuteInterval = _telemetryRepository . _subscriptionMinExecuteInterval ;
58- var s = lastExecute . Value . Add ( minExecuteInterval ) - DateTime . UtcNow ;
59- if ( s > TimeSpan . Zero )
60- {
61- Logger . LogTrace ( "Subscription '{Name}' minimum execute interval of {MinExecuteInterval} hit. Waiting {DelayInterval}." , Name , minExecuteInterval , s ) ;
62- await Task . Delay ( s , cancellationToken ) . ConfigureAwait ( false ) ;
63- }
64- }
65-
66- _lastExecute = DateTime . UtcNow ;
67- return true ;
68- }
69- finally
70- {
71- _lock . Release ( ) ;
72- }
7330 }
7431
7532 public void Execute ( )
7633 {
77- // Execute the subscription callback on a background thread.
78- // The caller doesn't want to wait while the subscription is running or receive exceptions.
79- _ = Task . Run ( async ( ) =>
80- {
81- // Try to queue the subscription callback.
82- // If another caller is already in the queue then exit without calling the callback.
83- if ( ! await TryQueueAsync ( _cancellationToken ) . ConfigureAwait ( false ) )
84- {
85- return ;
86- }
87-
88- try
89- {
90- // Set the execution context to the one captured when the subscription was created.
91- // This ensures that the callback runs in the same context as the subscription was created.
92- // For example, the request culture is used to format content in the callback.
93- //
94- // No need to restore back to the original context because the callback is running on
95- // a background task. The task finishes immediately after the callback.
96- if ( _executionContext != null )
97- {
98- ExecutionContext . Restore ( _executionContext ) ;
99- }
100-
101- Logger . LogTrace ( "Subscription '{Name}' executing." , Name ) ;
102- await _callback ( ) . ConfigureAwait ( false ) ;
103- }
104- catch ( Exception ex )
105- {
106- Logger . LogError ( ex , "Error in subscription callback" ) ;
107- }
108- } ) ;
34+ _callbackThrottler . Execute ( ) ;
10935 }
11036
11137 public void Dispose ( )
11238 {
11339 _unsubscribe ( ) ;
114- _cts . Cancel ( ) ;
115- _cts . Dispose ( ) ;
116- _lock . Dispose ( ) ;
40+ _callbackThrottler . Dispose ( ) ;
11741 }
11842}
0 commit comments