Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions examples/Actor/ActorClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public static async Task Main(string[] args)
receivedData = await proxy.GetData();
Console.WriteLine($"Received data is {receivedData}.");

Console.WriteLine("Getting details of the registered reminder");
var reminder = await proxy.GetReminder();
Console.WriteLine($"Received reminder is {reminder}.");

Console.WriteLine("Deregistering timer. Timers would any way stop if the actor is deactivated as part of Dapr garbage collection.");
await proxy.UnregisterTimer();
Console.WriteLine("Deregistering reminder. Reminders are durable and would not stop until an explicit deregistration or the actor is deleted.");
Expand All @@ -105,14 +109,23 @@ public static async Task Main(string[] args)
await proxy.RegisterReminderWithRepetitions(3);
Console.WriteLine("Waiting so the reminder can be triggered");
await Task.Delay(5000);
Console.WriteLine("Getting details of the registered reminder");
reminder = await proxy.GetReminder();
Console.WriteLine($"Received reminder is {reminder}.");
Console.WriteLine("Registering reminder with ttl and repetitions, i.e. reminder stops when either condition is met - The reminder will repeat 2 times.");
await proxy.RegisterReminderWithTtlAndRepetitions(TimeSpan.FromSeconds(5), 2);
Console.WriteLine("Getting details of the registered reminder");
reminder = await proxy.GetReminder();
Console.WriteLine($"Received reminder is {reminder}.");
Console.WriteLine("Deregistering reminder. Reminders are durable and would not stop until an explicit deregistration or the actor is deleted.");
await proxy.UnregisterReminder();

Console.WriteLine("Registering reminder and Timer with TTL - The reminder will self delete after 10 seconds.");
await proxy.RegisterReminderWithTtl(TimeSpan.FromSeconds(10));
await proxy.RegisterTimerWithTtl(TimeSpan.FromSeconds(10));
Console.WriteLine("Getting details of the registered reminder");
reminder = await proxy.GetReminder();
Console.WriteLine($"Received reminder is {reminder}.");

// Track the reminder.
var timer = new Timer(async state => Console.WriteLine($"Received data: {await proxy.GetData()}"), null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
Expand Down
5 changes: 5 additions & 0 deletions examples/Actor/DemoActor/DemoActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public async Task RegisterReminderWithTtlAndRepetitions(TimeSpan ttl, int repeti
await this.RegisterReminderAsync("TestReminder", null, TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(1), repetitions, ttl);
}

public async Task<IActorReminder> GetReminder()
{
return await this.GetReminderAsync("TestReminder");
}

public Task UnregisterReminder()
{
return this.UnregisterReminderAsync("TestReminder");
Expand Down
8 changes: 8 additions & 0 deletions examples/Actor/IDemoActor/IDemoActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace IDemoActorInterface
using System;
using System.Threading.Tasks;
using Dapr.Actors;
using Dapr.Actors.Runtime;

/// <summary>
/// Interface for Actor method.
Expand Down Expand Up @@ -94,6 +95,13 @@ public interface IDemoActor : IActor
/// <returns>A task that represents the asynchronous save operation.</returns>
Task RegisterReminderWithTtlAndRepetitions(TimeSpan ttl, int repetitions);

/// <summary>
/// Gets the registered reminder.
/// </summary>
/// <param name="reminderName">The name of the reminder.</param>
/// <returns>A task that returns the reminder after completion.</returns>
Task<IActorReminder> GetReminder();

/// <summary>
/// Unregisters the registered timer.
/// </summary>
Expand Down
17 changes: 17 additions & 0 deletions src/Dapr.Actors/DaprHttpInteractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,23 @@ HttpRequestMessage RequestFunc()
return this.SendAsync(RequestFunc, relativeUrl, cancellationToken);
}

public async Task<Stream> GetReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default)
{
var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorReminderRelativeUrlFormat, actorType, actorId, reminderName);

HttpRequestMessage RequestFunc()
{
var request = new HttpRequestMessage()
{
Method = HttpMethod.Get,
};
return request;
}

var response = await this.SendAsync(RequestFunc, relativeUrl, cancellationToken);
return await response.Content.ReadAsStreamAsync();
}

public Task UnregisterReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default)
{
var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorReminderRelativeUrlFormat, actorType, actorId, reminderName);
Expand Down
10 changes: 10 additions & 0 deletions src/Dapr.Actors/IDaprInteractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ internal interface IDaprInteractor
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
Task RegisterReminderAsync(string actorType, string actorId, string reminderName, string data, CancellationToken cancellationToken = default);

/// <summary>
/// Gets a reminder.
/// </summary>
/// <param name="actorType">Type of actor.</param>
/// <param name="actorId">ActorId.</param>
/// <param name="reminderName">Name of reminder to unregister.</param>
/// <param name="cancellationToken">Cancels the operation.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
Task<Stream> GetReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default);

/// <summary>
/// Unregisters a reminder.
/// </summary>
Expand Down
12 changes: 12 additions & 0 deletions src/Dapr.Actors/Runtime/Actor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,18 @@ internal async Task<IActorReminder> RegisterReminderAsync(ActorReminderOptions o
return reminder;
}

/// <summary>
/// Gets a reminder previously registered using <see cref="Dapr.Actors.Runtime.Actor.RegisterReminderAsync(ActorReminderOptions)" />.
/// </summary>
/// <param name="reminderName">The name of the reminder to get.</param>
/// <returns>
/// Returns a task that represents the asynchronous get operation. The result of the task contains the reminder if it exists, otherwise null.
/// </returns>
protected async Task<IActorReminder> GetReminderAsync(string reminderName)
{
return await this.Host.TimerManager.GetReminderAsync(new ActorReminderToken(this.actorTypeName, this.Id, reminderName));
}

/// <summary>
/// Unregisters a reminder previously registered using <see cref="Dapr.Actors.Runtime.Actor.RegisterReminderAsync(ActorReminderOptions)" />.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions src/Dapr.Actors/Runtime/ActorTestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public override Task RegisterTimerAsync(ActorTimer timer)
throw new NotImplementedException(Message);
}

public override Task<IActorReminder> GetReminderAsync(ActorReminderToken reminder)
{
throw new NotImplementedException(Message);
}

public override Task UnregisterReminderAsync(ActorReminderToken reminder)
{
throw new NotImplementedException(Message);
Expand Down
7 changes: 7 additions & 0 deletions src/Dapr.Actors/Runtime/ActorTimerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ public abstract class ActorTimerManager
/// <returns>A task which will complete when the operation completes.</returns>
public abstract Task RegisterReminderAsync(ActorReminder reminder);

/// <summary>
/// Gets a reminder previously registered using
/// </summary>
/// <param name="reminder">The <see cref="ActorReminderToken" /> to unregister.</param>
/// <returns>A task which will complete when the operation completes.</returns>
public abstract Task<IActorReminder> GetReminderAsync(ActorReminderToken reminder);

/// <summary>
/// Unregisters the provided reminder with the runtime.
/// </summary>
Expand Down
30 changes: 30 additions & 0 deletions src/Dapr.Actors/Runtime/DefaultActorTimerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
using System;
using System.Text.Json;
using System.Threading.Tasks;
using System.IO;
using System.Text;

namespace Dapr.Actors.Runtime
{
Expand All @@ -37,6 +39,18 @@ public override async Task RegisterReminderAsync(ActorReminder reminder)
await this.interactor.RegisterReminderAsync(reminder.ActorType, reminder.ActorId.ToString(), reminder.Name, serialized);
}

public override async Task<IActorReminder> GetReminderAsync(ActorReminderToken token)
{
if (token == null)
{
throw new ArgumentNullException(nameof(token));
}

var responseStream = await this.interactor.GetReminderAsync(token.ActorType, token.ActorId.ToString(), token.Name);
var reminder = await DeserializeReminderAsync(responseStream, token);
return reminder;
}

public override async Task UnregisterReminderAsync(ActorReminderToken reminder)
{
if (reminder == null)
Expand Down Expand Up @@ -77,5 +91,21 @@ private async ValueTask<string> SerializeReminderAsync(ActorReminder reminder)
reminder.Ttl);
return await info.SerializeAsync();
}

private async ValueTask<ActorReminder> DeserializeReminderAsync(Stream stream, ActorReminderToken token)
{
if (stream == null)
{
throw new ArgumentNullException(nameof(stream));
}
var info = await ReminderInfo.DeserializeAsync(stream);
if(info == null)
{
return null;
}
var reminder = new ActorReminder(token.ActorType, token.ActorId, token.Name, info.Data, info.DueTime,
info.Period);
return reminder;
}
}
}
7 changes: 5 additions & 2 deletions src/Dapr.Actors/Runtime/ReminderInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace Dapr.Actors.Runtime
using System.Threading.Tasks;

// represents the wire format used by Dapr to store reminder info with the runtime
internal struct ReminderInfo
internal class ReminderInfo
{
public ReminderInfo(
byte[] data,
Expand Down Expand Up @@ -49,13 +49,16 @@ public ReminderInfo(
internal static async Task<ReminderInfo> DeserializeAsync(Stream stream)
{
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);
if(json.ValueKind == JsonValueKind.Null)
{
return null;
}

var dueTime = default(TimeSpan);
var period = default(TimeSpan);
var data = default(byte[]);
int? repetition = null;
TimeSpan? ttl = null;

if (json.TryGetProperty("dueTime", out var dueTimeProperty))
{
var dueTimeString = dueTimeProperty.GetString();
Expand Down
13 changes: 13 additions & 0 deletions test/Dapr.Actors.Test/ActorUnitTestTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public async Task CanTestStartingAndStoppingTimer()
public async Task CanTestStartingAndStoppinReminder()
{
var reminders = new List<ActorReminder>();
IActorReminder getReminder = null;

var timerManager = new Mock<ActorTimerManager>(MockBehavior.Strict);
timerManager
Expand All @@ -84,6 +85,9 @@ public async Task CanTestStartingAndStoppinReminder()
.Setup(tm => tm.UnregisterReminderAsync(It.IsAny<ActorReminderToken>()))
.Callback<ActorReminderToken>(reminder => reminders.RemoveAll(t => t.Name == reminder.Name))
.Returns(Task.CompletedTask);
timerManager
.Setup(tm => tm.GetReminderAsync(It.IsAny<ActorReminderToken>()))
.Returns(() => Task.FromResult(getReminder));

var host = ActorHost.CreateForTest<CoolTestActor>(new ActorTestOptions(){ TimerManager = timerManager.Object, });
var actor = new CoolTestActor(host);
Expand All @@ -109,6 +113,10 @@ public async Task CanTestStartingAndStoppinReminder()
await actor.ReceiveReminderAsync(reminder.Name, reminder.State, reminder.DueTime, reminder.Period);
}

getReminder = reminder;
var reminderFromGet = await actor.GetReminderAsync();
Assert.Equal(reminder, reminderFromGet);

// Stop the reminder
await actor.StopReminderAsync();
Assert.Empty(reminders);
Expand Down Expand Up @@ -148,6 +156,11 @@ public async Task StartReminderAsync(Message message)
await this.RegisterReminderAsync("record", bytes, dueTime: TimeSpan.Zero, period: TimeSpan.FromSeconds(5));
}

public async Task<IActorReminder> GetReminderAsync()
{
return await this.GetReminderAsync("record");
}

public async Task StopReminderAsync()
{
await this.UnregisterReminderAsync("record");
Expand Down
14 changes: 14 additions & 0 deletions test/Dapr.Actors.Test/TestDaprInteractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ Task<IActorResponseMessage> IDaprInteractor.InvokeActorMethodWithRemotingAsync(A
throw new System.NotImplementedException();
}

/// <summary>
/// Gets a reminder.
/// </summary>
/// <param name="actorType">Type of actor.</param>
/// <param name="actorId">ActorId.</param>
/// <param name="reminderName">Name of reminder to unregister.</param>
/// <param name="cancellationToken">Cancels the operation.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
public Task<Stream> GetReminderAsync(string actorType, string actorId, string reminderName,
CancellationToken cancellationToken = default)
{
throw new System.NotImplementedException();
}

/// <summary>
/// Unregisters a reminder.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ public interface IReminderActor : IPingActor, IActor
Task StartReminderWithTtlAndRepetitions(TimeSpan ttl, int repetitions);

Task<State> GetState();

Task<String> GetReminder();
}
}
6 changes: 6 additions & 0 deletions test/Dapr.E2E.Test.App/Actors/ReminderActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public async Task StartReminder(StartReminderOptions options)
await this.StateManager.SetStateAsync<State>("reminder-state", new State(){ IsReminderRunning = true, });
}

public async Task<String> GetReminder(){
var reminder = await this.GetReminderAsync("test-reminder");
var reminderString = JsonSerializer.Serialize(reminder, this.Host.JsonSerializerOptions);
return reminderString;
}

public async Task StartReminderWithTtl(TimeSpan ttl)
{
var options = new StartReminderOptions()
Expand Down
53 changes: 53 additions & 0 deletions test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
namespace Dapr.E2E.Test
{
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Actors;
Expand Down Expand Up @@ -49,6 +50,58 @@ public async Task ActorCanStartAndStopReminder()
Assert.Equal(10, state.Count);
}

[Fact]
public async Task ActorCanStartAndStopAndGetReminder()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
var proxy = this.ProxyFactory.CreateActorProxy<IReminderActor>(ActorId.CreateRandom(), "ReminderActor");

await WaitForActorRuntimeAsync(proxy, cts.Token);

// Get reminder before starting it, should return null.
var reminder = await proxy.GetReminder();
Assert.Equal("null", reminder);

// Start reminder, to count up to 10
await proxy.StartReminder(new StartReminderOptions(){ Total = 10, });

State state = new State();
var countGetReminder = 0;
while (true)
{
cts.Token.ThrowIfCancellationRequested();

reminder = await proxy.GetReminder();
Assert.NotNull(reminder);

// If reminder is null then it means the reminder has been stopped.
if (reminder != "null")
{
countGetReminder++;
var reminderJson = JsonSerializer.Deserialize<JsonElement>(reminder);
var name = reminderJson.GetProperty("name").ToString();
var period = reminderJson.GetProperty("period").ToString();
var dueTime = reminderJson.GetProperty("dueTime").ToString();

Assert.Equal("test-reminder", name);
Assert.Equal(TimeSpan.FromMilliseconds(50).ToString(), period);
Assert.Equal(TimeSpan.Zero.ToString(), dueTime);
}

state = await proxy.GetState();
this.Output.WriteLine($"Got Count: {state.Count} IsReminderRunning: {state.IsReminderRunning}");
if (!state.IsReminderRunning)
{
break;
}
}

// Should count up to exactly 10
Assert.Equal(10, state.Count);
// Should be able to Get Reminder at least once.
Assert.True(countGetReminder > 0);
}

[Fact]
public async Task ActorCanStartReminderWithRepetitions()
{
Expand Down