Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions src/Orleans.Reminders/Diagnostics/ReminderEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,31 @@ public static class ReminderEvents
/// </summary>
public static IObservable<ReminderEvent> AllEvents { get; } = new Observable();

/// <summary>
/// Gets an observable sequence of reminder service events.
/// </summary>
public static IObservable<ReminderServiceEvent> ServiceEvents { get; } = new ServiceObservable();

/// <summary>
/// The base class used for reminder service diagnostic events.
/// </summary>
/// <param name="siloAddress">The address of the silo associated with the event, if any.</param>
public abstract class ReminderServiceEvent(SiloAddress? siloAddress)
{
/// <summary>
/// The address of the silo associated with the event, if any.
/// </summary>
public readonly SiloAddress? SiloAddress = siloAddress;
}

/// <summary>
/// Event payload for when a reminder service completes startup and is ready for reminder operations.
/// </summary>
/// <param name="siloAddress">The address of the silo whose reminder service started.</param>
public sealed class ReminderServiceStarted(SiloAddress? siloAddress) : ReminderServiceEvent(siloAddress)
{
}

/// <summary>
/// The base class used for reminder diagnostic events.
/// </summary>
Expand Down Expand Up @@ -269,6 +294,22 @@ static void Emit(GrainId grainId, string reminderName, SiloAddress? siloAddress)
}
}

internal static void EmitReminderServiceStarted(SiloAddress? siloAddress)
{
if (!Listener.IsEnabled(nameof(ReminderServiceStarted)))
{
return;
}

Emit(siloAddress);

[MethodImpl(MethodImplOptions.NoInlining)]
static void Emit(SiloAddress? siloAddress)
{
Listener.Write(nameof(ReminderServiceStarted), new ReminderServiceStarted(siloAddress));
}
}

internal static void EmitUnregistered(GrainId grainId, string reminderName, SiloAddress? siloAddress)
{
if (!Listener.IsEnabled(nameof(Unregistered)))
Expand Down Expand Up @@ -458,4 +499,23 @@ public void OnNext(KeyValuePair<string, object?> value)
}
}
}

private sealed class ServiceObservable : IObservable<ReminderServiceEvent>
{
public IDisposable Subscribe(IObserver<ReminderServiceEvent> observer) => Listener.Subscribe(new Observer(observer));

private sealed class Observer(IObserver<ReminderServiceEvent> observer) : IObserver<KeyValuePair<string, object?>>
{
public void OnCompleted() => observer.OnCompleted();
public void OnError(Exception error) => observer.OnError(error);

public void OnNext(KeyValuePair<string, object?> value)
{
if (value.Value is ReminderServiceEvent evt)
{
observer.OnNext(evt);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ private async Task DoInitialReadAndUpdateReminders()

Status = GrainServiceStatus.Started;
startedTask.TrySetResult(true);
ReminderEvents.EmitReminderServiceStarted(Silo);
}
catch (Exception ex)
{
Expand Down
2 changes: 2 additions & 0 deletions src/Orleans.Runtime/Catalog/ActivationCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public int GetNumRecentlyUsed(TimeSpan recencyPeriod)
/// <returns>A <see cref="Task"/> representing the work performed.</returns>
public Task CollectActivations(TimeSpan ageLimit, CancellationToken cancellationToken) => CollectActivationsImpl(false, ageLimit, cancellationToken);

internal Task CollectStaleActivations(CancellationToken cancellationToken) => CollectActivationsImpl(scanStale: true, ageLimit: default, cancellationToken: cancellationToken);

/// <summary>
/// Schedules the provided grain context for collection if it becomes idle for the specified duration.
/// </summary>
Expand Down
16 changes: 16 additions & 0 deletions src/Orleans.Testing.Reminders/ReminderDiagnosticObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ public sealed class ReminderDiagnosticObserver : IDisposable
{
private readonly object _lock = new();
private readonly IConnectableObservable<ReminderEvents.ReminderEvent> _events;
private readonly IConnectableObservable<ReminderEvents.ReminderServiceEvent> _serviceEvents;
private readonly IDisposable _connection;
private readonly IDisposable _serviceConnection;
private readonly IDisposable _storageSubscription;
private readonly Dictionary<GrainId, int> _tickCountsByGrain = [];
private readonly Dictionary<ReminderTickKey, int> _tickCountsByReminder = [];
Expand All @@ -49,8 +51,10 @@ public static ReminderDiagnosticObserver Create()
public ReminderDiagnosticObserver()
{
_events = ReminderEvents.AllEvents.Replay();
_serviceEvents = ReminderEvents.ServiceEvents.Replay();
_storageSubscription = _events.Subscribe(StoreEvent);
_connection = _events.Connect();
_serviceConnection = _serviceEvents.Connect();
}

private void StoreEvent(ReminderEvents.ReminderEvent value)
Expand Down Expand Up @@ -199,6 +203,17 @@ public async Task WaitForTickConditionAsync(GrainId grainId, Func<CancellationTo
.ToTask(cancellationToken);
}

/// <summary>
/// Waits for a reminder service to complete startup.
/// </summary>
public Task<ReminderEvents.ReminderServiceStarted> WaitForReminderServiceStartedAsync(CancellationToken cancellationToken, SiloAddress? siloAddress = null)
{
return _serviceEvents
.OfType<ReminderEvents.ReminderServiceStarted>()
.FirstAsync(e => siloAddress is null || Equals(e.SiloAddress, siloAddress))
.ToTask(cancellationToken);
}

/// <summary>
/// Waits for a reminder to be unregistered.
/// </summary>
Expand Down Expand Up @@ -521,6 +536,7 @@ public void Dispose()
{
_storageSubscription.Dispose();
_connection.Dispose();
_serviceConnection.Dispose();
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/api/Orleans.Reminders/Orleans.Reminders.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public static partial class ReminderEvents
{
public const string ListenerName = "Orleans.Reminders";
public static System.IObservable<ReminderEvent> AllEvents { get { throw null; } }
public static System.IObservable<ReminderServiceEvent> ServiceEvents { get { throw null; } }

public sealed partial class LocalReminderScheduleChanged : ReminderEvent
{
Expand Down Expand Up @@ -216,6 +217,17 @@ public abstract partial class ReminderEvent
protected ReminderEvent(Runtime.GrainId grainId, string reminderName, Runtime.SiloAddress? siloAddress) { }
}

public abstract partial class ReminderServiceEvent
{
public readonly Runtime.SiloAddress? SiloAddress;
protected ReminderServiceEvent(Runtime.SiloAddress? siloAddress) { }
}

public sealed partial class ReminderServiceStarted : ReminderServiceEvent
{
public ReminderServiceStarted(Runtime.SiloAddress? siloAddress) : base(default) { }
}

public sealed partial class TickCompleted : ReminderEvent
{
public readonly Runtime.TickStatus Status;
Expand Down
36 changes: 33 additions & 3 deletions test/Orleans.Reminders.Tests/MinimalReminderTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Orleans.Internal;
using Orleans.Testing.Reminders;
using Orleans.TestingHost;
using TestExtensions;
using UnitTests.GrainInterfaces;
Expand All @@ -14,10 +16,24 @@ public class MinimalReminderTests : IClassFixture<MinimalReminderTests.Fixture>

public class Fixture : BaseTestClusterFixture
{
public ReminderDiagnosticObserver ReminderObserver { get; } = ReminderDiagnosticObserver.Create();

protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
builder.AddSiloBuilderConfigurator<SiloConfiguration>();
}

public override async Task DisposeAsync()
{
try
{
await base.DisposeAsync();
}
finally
{
ReminderObserver.Dispose();
}
}
}

public class SiloConfiguration : ISiloConfigurator
Expand All @@ -40,13 +56,27 @@ public async Task MinimalReminderInterval()
{
var grainGuid = Guid.NewGuid();
const string reminderName = "minimal_reminder";
var period = TimeSpan.FromMilliseconds(100);

var reminderGrain = this.fixture.GrainFactory.GetGrain<IReminderTestGrain2>(grainGuid);
_ = await reminderGrain.StartReminder(reminderName, TimeSpan.FromMilliseconds(100), true);
var grainId = reminderGrain.GetGrainId();
var observer = this.fixture.ReminderObserver;

using var cts = new CancellationTokenSource(TestConstants.InitTimeout);
foreach (var silo in this.fixture.HostedCluster.Silos)
{
await observer.WaitForReminderServiceStartedAsync(cts.Token, silo.SiloAddress);
}

var r = await reminderGrain.GetReminderObject(reminderName);
await reminderGrain.StopReminder(r);
var registeredTask = observer.WaitForReminderRegisteredAsync(grainId, reminderName, cts.Token);
var reminder = await reminderGrain.StartReminder(reminderName, period, true).WaitAsync(cts.Token);
await registeredTask;
await observer.WaitForLocalReminderScheduleAsync(grainId, reminderName, cts.Token);

var unregisteredTask = observer.WaitForReminderUnregisteredAsync(grainId, reminderName, cts.Token);
await reminderGrain.StopReminder(reminder).WaitAsync(cts.Token);
await unregisteredTask;
await observer.WaitForReminderQuiescenceAsync(grainId, reminderName, cts.Token);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class ActivationCollectorTests : OrleansTestingBase, IAsyncLifetime
private static readonly TimeSpan DEFAULT_COLLECTION_QUANTUM = TimeSpan.FromSeconds(10);
private static readonly TimeSpan DEFAULT_IDLE_TIMEOUT = DEFAULT_COLLECTION_QUANTUM + TimeSpan.FromSeconds(1);
private static readonly TimeSpan WAIT_TIME = DEFAULT_IDLE_TIMEOUT.Multiply(3.0);
private static readonly TimeSpan COLLECTION_SPECIFIC_AGE_LIMIT = TimeSpan.FromSeconds(12);

private TestCluster testCluster;

Expand Down Expand Up @@ -57,7 +58,7 @@ public void Configure(IHostBuilder hostBuilder)
{
[typeof(IdleActivationGcTestGrain2).FullName] = DEFAULT_IDLE_TIMEOUT,
[typeof(BusyActivationGcTestGrain2).FullName] = DEFAULT_IDLE_TIMEOUT,
[typeof(CollectionSpecificAgeLimitForTenSecondsActivationGcTestGrain).FullName] = TimeSpan.FromSeconds(12),
[typeof(CollectionSpecificAgeLimitForTenSecondsActivationGcTestGrain).FullName] = COLLECTION_SPECIFIC_AGE_LIMIT,
};
});
});
Expand Down Expand Up @@ -495,9 +496,8 @@ async Task workerFunc()
[Fact, TestCategory("ActivationCollector"), TestCategory("Functional")]
public async Task ActivationCollectorShouldCollectByCollectionSpecificAgeLimitForTwelveSeconds()
{
var waitTime = TimeSpan.FromSeconds(30);
var defaultCollectionAge = waitTime.Multiply(2);
//make sure defaultCollectionAge value won't cause activation collection in wait time
var defaultCollectionAge = COLLECTION_SPECIFIC_AGE_LIMIT.Multiply(4);
//make sure defaultCollectionAge value won't cause activation collection before the per-type limit
await Initialize(defaultCollectionAge);

const int grainCount = 1000;
Expand All @@ -518,15 +518,36 @@ public async Task ActivationCollectorShouldCollectByCollectionSpecificAgeLimitFo
Assert.Equal(grainCount, activationsCreated);

logger.LogInformation(
"ActivationCollectorShouldCollectByCollectionSpecificAgeLimit: grains activated; waiting {WaitSeconds} sec (activation GC idle timeout is {DefaultIdleTimeout} sec).",
WAIT_TIME.TotalSeconds,
DEFAULT_IDLE_TIMEOUT.TotalSeconds);

// Some time is required for GC to collect all of the Grains)
await Task.Delay(waitTime);
"ActivationCollectorShouldCollectByCollectionSpecificAgeLimit: grains activated; waiting for activation count to converge to zero using class-specific age limit {CollectionSpecificAgeLimit} sec.",
COLLECTION_SPECIFIC_AGE_LIMIT.TotalSeconds);

int activationsNotCollected = await TestUtils.GetActivationCount(this.testCluster.GrainFactory, fullGrainTypeName);
Assert.Equal(0, activationsNotCollected);
var activationCollector = this.testCluster.GetSiloServiceProvider().GetRequiredService<ActivationCollector>();
await activationCollector.CollectStaleActivations(CancellationToken.None);
int activationsAfterDefaultCollection = await TestUtils.GetActivationCount(this.testCluster.GrainFactory, fullGrainTypeName);
Assert.Equal(grainCount, activationsAfterDefaultCollection);

await WaitForActivationCountToConverge(fullGrainTypeName, activationCollector, expectedCount: 0);
}

private async Task WaitForActivationCountToConverge(string grainTypeName, ActivationCollector activationCollector, int expectedCount)
{
var deadline = DateTime.UtcNow + TestConstants.InitTimeout;
var activationCount = await TestUtils.GetActivationCount(this.testCluster.GrainFactory, grainTypeName);
Comment thread
ReubenBond marked this conversation as resolved.

while (activationCount != expectedCount && DateTime.UtcNow < deadline)
{
await activationCollector.CollectStaleActivations(CancellationToken.None);
activationCount = await TestUtils.GetActivationCount(this.testCluster.GrainFactory, grainTypeName);

if (activationCount == expectedCount)
{
return;
}

await Task.Delay(TimeSpan.FromMilliseconds(250));
}

Assert.Equal(expectedCount, activationCount);
}

[Fact, TestCategory("ActivationCollector"), TestCategory("Functional")]
Expand Down
Loading