Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions test/Grains/TestGrainInterfaces/ITimerGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public interface ITimerCallGrain : IGrainWithIntegerKey
Task StartTimer(string name, TimeSpan dueTime, string operationType);
Task RestartTimer(string name, TimeSpan dueTime);
Task RestartTimer(string name, TimeSpan dueTime, TimeSpan period);
Task TestTimerChangeArguments();
Task StopTimer(string name);
Task RunSelfDisposingTimer();
}
Expand Down
161 changes: 148 additions & 13 deletions test/Grains/TestInternalGrains/TimerGrain.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
using Orleans.Runtime;
using Orleans.Runtime.Scheduler;
Expand All @@ -13,6 +15,59 @@ internal static class TimerGrainTestConstants
public static readonly TimeSpan CallbackDelay = TimeSpan.FromMilliseconds(50);
}

public static class TimerGrainCallbackEvents
{
public const string ListenerName = "Orleans.Tests.TimerGrainCallbacks";

private static readonly DiagnosticListener Listener = new(ListenerName);

public static IObservable<CallbackEvent> AllEvents { get; } = new Observable();

public abstract class CallbackEvent(GrainId grainId)
{
public readonly GrainId GrainId = grainId;
}

public sealed class DelayScheduled(GrainId grainId) : CallbackEvent(grainId)
{
}

internal static void EmitDelayScheduled(GrainId grainId)
{
if (!Listener.IsEnabled(nameof(DelayScheduled)))
{
return;
}

Emit(grainId);

[MethodImpl(MethodImplOptions.NoInlining)]
static void Emit(GrainId grainId)
{
Listener.Write(nameof(DelayScheduled), new DelayScheduled(grainId));
}
}

private sealed class Observable : IObservable<CallbackEvent>
{
public IDisposable Subscribe(IObserver<CallbackEvent> observer) => Listener.Subscribe(new Observer(observer));

private sealed class Observer(IObserver<CallbackEvent> observer) : IObserver<KeyValuePair<string, object>>
{
public void OnCompleted() => observer.OnCompleted();
public void OnError(Exception error) => observer.OnError(error);

public void OnNext(KeyValuePair<string, object> value)
{
if (value.Value is CallbackEvent evt)
{
observer.OnNext(evt);
}
}
}
}
}

public class TimerGrain : Grain, ITimerGrain
{
private bool deactivating;
Expand Down Expand Up @@ -144,10 +199,12 @@ public class TimerCallGrain : Grain, ITimerCallGrain
private TaskScheduler activationTaskScheduler;

private readonly ILogger logger;
private readonly TimeProvider timeProvider;

public TimerCallGrain(ILoggerFactory loggerFactory)
public TimerCallGrain(ILoggerFactory loggerFactory, TimeProvider timeProvider)
{
this.logger = loggerFactory.CreateLogger($"{this.GetType().Name}-{this.IdentityString}");
this.timeProvider = timeProvider;
}

public Task<int> GetTickCount() { return Task.FromResult(tickCount); }
Expand Down Expand Up @@ -199,6 +256,35 @@ public Task RestartTimer(string name, TimeSpan delay, TimeSpan period)
return Task.CompletedTask;
}

public Task TestTimerChangeArguments()
{
ValidateTimerChangeArguments(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
ValidateTimerChangeArguments(TimeSpan.Zero, Timeout.InfiniteTimeSpan);
ValidateTimerChangeArguments(TimeSpan.FromMicroseconds(10), Timeout.InfiniteTimeSpan);
ValidateTimerChangeArguments(TimeSpan.Zero, TimeSpan.Zero);
ValidateTimerChangeArguments(TimeSpan.FromMicroseconds(10), TimeSpan.FromMicroseconds(10));
ValidateTimerChangeArguments(TimeSpan.FromMilliseconds(-0.4), Timeout.InfiniteTimeSpan);
ValidateTimerChangeArguments(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(-0.5));
ValidateTimerChangeArguments(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);

Assert.Throws<ArgumentOutOfRangeException>(() => ValidateTimerChangeArguments(TimeSpan.FromSeconds(-5), Timeout.InfiniteTimeSpan));
Assert.Throws<ArgumentOutOfRangeException>(() => ValidateTimerChangeArguments(TimeSpan.MaxValue, Timeout.InfiniteTimeSpan));
Assert.Throws<ArgumentOutOfRangeException>(() => ValidateTimerChangeArguments(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(-5)));
Assert.Throws<ArgumentOutOfRangeException>(() => ValidateTimerChangeArguments(TimeSpan.FromSeconds(1), TimeSpan.MaxValue));

return Task.CompletedTask;
}

private void ValidateTimerChangeArguments(TimeSpan dueTime, TimeSpan period)
{
using var validationTimer = this.RegisterGrainTimer(_ => Task.CompletedTask, new GrainTimerCreationOptions(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan)
{
Interleave = true
});

validationTimer.Change(dueTime, period);
}

public Task StopTimer(string name)
{
logger.LogInformation("StopTimer Name={Name}", name);
Expand All @@ -225,7 +311,9 @@ public async Task RunSelfDisposingTimer()
Assert.NotNull(timer[0]);
timer[0].Dispose();
Assert.True(ct.IsCancellationRequested);
await Task.Delay(100);
var delay = Task.Delay(TimeSpan.FromMilliseconds(100), timeProvider);
TimerGrainCallbackEvents.EmitDelayScheduled(context.GrainId);
await delay;
tcs.TrySetResult();
}
catch (Exception ex)
Expand Down Expand Up @@ -303,7 +391,9 @@ private async Task ProcessTimerTick(object data)
CheckRuntimeContext(step);

LogStatus("Before Delay");
await Task.Delay(TimerGrainTestConstants.CallbackDelay);
var delay = Task.Delay(TimerGrainTestConstants.CallbackDelay, timeProvider);
TimerGrainCallbackEvents.EmitDelayScheduled(context.GrainId);
await delay;
step = "After Delay";
LogStatus(step);
CheckRuntimeContext(step);
Expand Down Expand Up @@ -367,10 +457,12 @@ public class NonReentrantTimerCallGrain : Grain, INonReentrantTimerCallGrain
private Guid _tickId;

private readonly ILogger _logger;
private readonly TimeProvider _timeProvider;

public NonReentrantTimerCallGrain(ILoggerFactory loggerFactory)
public NonReentrantTimerCallGrain(ILoggerFactory loggerFactory, TimeProvider timeProvider)
{
_logger = loggerFactory.CreateLogger($"{GetType().Name}-{IdentityString}");
_timeProvider = timeProvider;
}

public Task<int> GetTickCount() => Task.FromResult(_tickCount);
Expand Down Expand Up @@ -450,7 +542,9 @@ private async Task ProcessTimerTick(object data, CancellationToken cancellationT
CheckReentrancy(step, expectedTickId);

LogStatus("Before Delay", timerName);
await Task.Delay(TimerGrainTestConstants.CallbackDelay);
var delay = Task.Delay(TimerGrainTestConstants.CallbackDelay, _timeProvider);
TimerGrainCallbackEvents.EmitDelayScheduled(_context.GrainId);
await delay;
step = "After Delay";
LogStatus(step, timerName);
CheckRuntimeContext(step);
Expand Down Expand Up @@ -797,13 +891,15 @@ public class PocoTimerCallGrain : IGrainBase, IPocoTimerCallGrain

private readonly ILogger logger;
private readonly IGrainFactory _grainFactory;
private readonly TimeProvider _timeProvider;

public IGrainContext GrainContext { get; }

public PocoTimerCallGrain(ILoggerFactory loggerFactory, IGrainContext grainContext, IGrainFactory grainFactory)
public PocoTimerCallGrain(ILoggerFactory loggerFactory, IGrainContext grainContext, IGrainFactory grainFactory, TimeProvider timeProvider)
{
GrainContext = grainContext;
_grainFactory = grainFactory;
_timeProvider = timeProvider;
this.logger = loggerFactory.CreateLogger($"{this.GetType().Name}-{this.GrainContext.GrainId}");
}

Expand All @@ -821,7 +917,7 @@ public Task StartTimer(string name, TimeSpan delay)
{
logger.LogInformation("StartTimer Name={Name} Delay={Delay}", name, delay);
if (timer is not null) throw new InvalidOperationException("Expected timer to be null");
this.timer = this.RegisterGrainTimer(TimerTick, name, new(delay, Timeout.InfiniteTimeSpan)); // One shot timer
this.timer = this.RegisterGrainTimer(TimerTick, name, new(delay, Timeout.InfiniteTimeSpan) { Interleave = true }); // One shot timer
this.timerName = name;

return Task.CompletedTask;
Expand All @@ -832,7 +928,7 @@ public Task StartTimer(string name, TimeSpan delay, string operationType)
logger.LogInformation("StartTimer Name={Name} Delay={Delay}", name, delay);
if (timer is not null) throw new InvalidOperationException("Expected timer to be null");
var state = Tuple.Create<string, object>(operationType, name);
this.timer = this.RegisterGrainTimer(TimerTickAdvanced, state, delay, Timeout.InfiniteTimeSpan); // One shot timer
this.timer = this.RegisterGrainTimer(TimerTickAdvanced, state, new(delay, Timeout.InfiniteTimeSpan) { Interleave = true }); // One shot timer
this.timerName = name;

return Task.CompletedTask;
Expand All @@ -856,6 +952,35 @@ public Task RestartTimer(string name, TimeSpan delay, TimeSpan period)
return Task.CompletedTask;
}

public Task TestTimerChangeArguments()
{
ValidateTimerChangeArguments(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
ValidateTimerChangeArguments(TimeSpan.Zero, Timeout.InfiniteTimeSpan);
ValidateTimerChangeArguments(TimeSpan.FromMicroseconds(10), Timeout.InfiniteTimeSpan);
ValidateTimerChangeArguments(TimeSpan.Zero, TimeSpan.Zero);
ValidateTimerChangeArguments(TimeSpan.FromMicroseconds(10), TimeSpan.FromMicroseconds(10));
ValidateTimerChangeArguments(TimeSpan.FromMilliseconds(-0.4), Timeout.InfiniteTimeSpan);
ValidateTimerChangeArguments(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(-0.5));
ValidateTimerChangeArguments(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);

Assert.Throws<ArgumentOutOfRangeException>(() => ValidateTimerChangeArguments(TimeSpan.FromSeconds(-5), Timeout.InfiniteTimeSpan));
Assert.Throws<ArgumentOutOfRangeException>(() => ValidateTimerChangeArguments(TimeSpan.MaxValue, Timeout.InfiniteTimeSpan));
Assert.Throws<ArgumentOutOfRangeException>(() => ValidateTimerChangeArguments(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(-5)));
Assert.Throws<ArgumentOutOfRangeException>(() => ValidateTimerChangeArguments(TimeSpan.FromSeconds(1), TimeSpan.MaxValue));

return Task.CompletedTask;
}

private void ValidateTimerChangeArguments(TimeSpan dueTime, TimeSpan period)
{
using var validationTimer = this.RegisterGrainTimer(_ => Task.CompletedTask, new GrainTimerCreationOptions(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan)
{
Interleave = true
});

validationTimer.Change(dueTime, period);
}

public Task StopTimer(string name)
{
logger.LogInformation("StopTimer Name={Name}", name);
Expand All @@ -882,7 +1007,9 @@ public async Task RunSelfDisposingTimer()
Assert.NotNull(timer[0]);
timer[0].Dispose();
Assert.True(ct.IsCancellationRequested);
await Task.Delay(100);
var delay = Task.Delay(TimeSpan.FromMilliseconds(100), _timeProvider);
TimerGrainCallbackEvents.EmitDelayScheduled(context.GrainId);
await delay;
tcs.TrySetResult();
}
catch (Exception ex)
Expand Down Expand Up @@ -960,7 +1087,9 @@ private async Task ProcessTimerTick(object data)
CheckRuntimeContext(step);

LogStatus("Before Delay");
await Task.Delay(TimerGrainTestConstants.CallbackDelay);
var delay = Task.Delay(TimerGrainTestConstants.CallbackDelay, _timeProvider);
TimerGrainCallbackEvents.EmitDelayScheduled(context.GrainId);
await delay;
step = "After Delay";
LogStatus(step);
CheckRuntimeContext(step);
Expand Down Expand Up @@ -1176,13 +1305,15 @@ public class PocoNonReentrantTimerCallGrain : IGrainBase, IPocoNonReentrantTimer

private readonly ILogger _logger;
private readonly IGrainFactory _grainFactory;
private readonly TimeProvider _timeProvider;

public IGrainContext GrainContext { get; }

public PocoNonReentrantTimerCallGrain(ILoggerFactory loggerFactory, IGrainContext grainContext, IGrainFactory grainFactory)
public PocoNonReentrantTimerCallGrain(ILoggerFactory loggerFactory, IGrainContext grainContext, IGrainFactory grainFactory, TimeProvider timeProvider)
{
GrainContext = grainContext;
_grainFactory = grainFactory;
_timeProvider = timeProvider;
_logger = loggerFactory.CreateLogger($"{GetType().Name}-{GrainContext.GrainId}");
}

Expand Down Expand Up @@ -1212,7 +1343,9 @@ public async Task RunSelfDisposingTimer()
Assert.NotNull(timer[0]);
timer[0].Dispose();
tcs.TrySetResult();
await Task.Delay(100);
var delay = Task.Delay(TimeSpan.FromMilliseconds(100), _timeProvider);
TimerGrainCallbackEvents.EmitDelayScheduled(_context.GrainId);
await delay;
}
catch (Exception ex)
{
Expand Down Expand Up @@ -1289,7 +1422,9 @@ private async Task ProcessTimerTick(object data, CancellationToken cancellationT
CheckReentrancy(step, expectedTickId);

LogStatus("Before Delay", timerName);
await Task.Delay(TimerGrainTestConstants.CallbackDelay);
var delay = Task.Delay(TimerGrainTestConstants.CallbackDelay, _timeProvider);
TimerGrainCallbackEvents.EmitDelayScheduled(_context.GrainId);
await delay;
step = "After Delay";
LogStatus(step, timerName);
CheckRuntimeContext(step);
Expand Down
Loading
Loading