Skip to content

Commit

Permalink
adding get actor reminder API (#1103)
Browse files Browse the repository at this point in the history
* get actor reminder API

Signed-off-by: Shivam Kumar <[email protected]>

* handling serialization better

Signed-off-by: Shivam Kumar <[email protected]>

---------

Signed-off-by: Shivam Kumar <[email protected]>
  • Loading branch information
shivamkm07 authored Jun 9, 2023
1 parent a4f5fc0 commit 787cba6
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 2 deletions.
13 changes: 13 additions & 0 deletions examples/Actor/ActorClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public static async Task Main(string[] args)
receivedData = await proxy.GetData();
Console.WriteLine($"Received data is {receivedData}.");

Console.WriteLine("Getting details of the registered reminder");
var reminder = await proxy.GetReminder();
Console.WriteLine($"Received reminder is {reminder}.");

Console.WriteLine("Deregistering timer. Timers would any way stop if the actor is deactivated as part of Dapr garbage collection.");
await proxy.UnregisterTimer();
Console.WriteLine("Deregistering reminder. Reminders are durable and would not stop until an explicit deregistration or the actor is deleted.");
Expand All @@ -105,14 +109,23 @@ public static async Task Main(string[] args)
await proxy.RegisterReminderWithRepetitions(3);
Console.WriteLine("Waiting so the reminder can be triggered");
await Task.Delay(5000);
Console.WriteLine("Getting details of the registered reminder");
reminder = await proxy.GetReminder();
Console.WriteLine($"Received reminder is {reminder}.");
Console.WriteLine("Registering reminder with ttl and repetitions, i.e. reminder stops when either condition is met - The reminder will repeat 2 times.");
await proxy.RegisterReminderWithTtlAndRepetitions(TimeSpan.FromSeconds(5), 2);
Console.WriteLine("Getting details of the registered reminder");
reminder = await proxy.GetReminder();
Console.WriteLine($"Received reminder is {reminder}.");
Console.WriteLine("Deregistering reminder. Reminders are durable and would not stop until an explicit deregistration or the actor is deleted.");
await proxy.UnregisterReminder();

Console.WriteLine("Registering reminder and Timer with TTL - The reminder will self delete after 10 seconds.");
await proxy.RegisterReminderWithTtl(TimeSpan.FromSeconds(10));
await proxy.RegisterTimerWithTtl(TimeSpan.FromSeconds(10));
Console.WriteLine("Getting details of the registered reminder");
reminder = await proxy.GetReminder();
Console.WriteLine($"Received reminder is {reminder}.");

// Track the reminder.
var timer = new Timer(async state => Console.WriteLine($"Received data: {await proxy.GetData()}"), null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
Expand Down
5 changes: 5 additions & 0 deletions examples/Actor/DemoActor/DemoActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public async Task RegisterReminderWithTtlAndRepetitions(TimeSpan ttl, int repeti
await this.RegisterReminderAsync("TestReminder", null, TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(1), repetitions, ttl);
}

public async Task<IActorReminder> GetReminder()
{
return await this.GetReminderAsync("TestReminder");
}

public Task UnregisterReminder()
{
return this.UnregisterReminderAsync("TestReminder");
Expand Down
8 changes: 8 additions & 0 deletions examples/Actor/IDemoActor/IDemoActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace IDemoActorInterface
using System;
using System.Threading.Tasks;
using Dapr.Actors;
using Dapr.Actors.Runtime;

/// <summary>
/// Interface for Actor method.
Expand Down Expand Up @@ -94,6 +95,13 @@ public interface IDemoActor : IActor
/// <returns>A task that represents the asynchronous save operation.</returns>
Task RegisterReminderWithTtlAndRepetitions(TimeSpan ttl, int repetitions);

/// <summary>
/// Gets the registered reminder.
/// </summary>
/// <param name="reminderName">The name of the reminder.</param>
/// <returns>A task that returns the reminder after completion.</returns>
Task<IActorReminder> GetReminder();

/// <summary>
/// Unregisters the registered timer.
/// </summary>
Expand Down
17 changes: 17 additions & 0 deletions src/Dapr.Actors/DaprHttpInteractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,23 @@ HttpRequestMessage RequestFunc()
return this.SendAsync(RequestFunc, relativeUrl, cancellationToken);
}

public async Task<Stream> GetReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default)
{
var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorReminderRelativeUrlFormat, actorType, actorId, reminderName);

HttpRequestMessage RequestFunc()
{
var request = new HttpRequestMessage()
{
Method = HttpMethod.Get,
};
return request;
}

var response = await this.SendAsync(RequestFunc, relativeUrl, cancellationToken);
return await response.Content.ReadAsStreamAsync();
}

public Task UnregisterReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default)
{
var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorReminderRelativeUrlFormat, actorType, actorId, reminderName);
Expand Down
10 changes: 10 additions & 0 deletions src/Dapr.Actors/IDaprInteractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ internal interface IDaprInteractor
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
Task RegisterReminderAsync(string actorType, string actorId, string reminderName, string data, CancellationToken cancellationToken = default);

/// <summary>
/// Gets a reminder.
/// </summary>
/// <param name="actorType">Type of actor.</param>
/// <param name="actorId">ActorId.</param>
/// <param name="reminderName">Name of reminder to unregister.</param>
/// <param name="cancellationToken">Cancels the operation.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
Task<Stream> GetReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default);

/// <summary>
/// Unregisters a reminder.
/// </summary>
Expand Down
12 changes: 12 additions & 0 deletions src/Dapr.Actors/Runtime/Actor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,18 @@ internal async Task<IActorReminder> RegisterReminderAsync(ActorReminderOptions o
return reminder;
}

/// <summary>
/// Gets a reminder previously registered using <see cref="Dapr.Actors.Runtime.Actor.RegisterReminderAsync(ActorReminderOptions)" />.
/// </summary>
/// <param name="reminderName">The name of the reminder to get.</param>
/// <returns>
/// Returns a task that represents the asynchronous get operation. The result of the task contains the reminder if it exists, otherwise null.
/// </returns>
protected async Task<IActorReminder> GetReminderAsync(string reminderName)
{
return await this.Host.TimerManager.GetReminderAsync(new ActorReminderToken(this.actorTypeName, this.Id, reminderName));
}

/// <summary>
/// Unregisters a reminder previously registered using <see cref="Dapr.Actors.Runtime.Actor.RegisterReminderAsync(ActorReminderOptions)" />.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions src/Dapr.Actors/Runtime/ActorTestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public override Task RegisterTimerAsync(ActorTimer timer)
throw new NotImplementedException(Message);
}

public override Task<IActorReminder> GetReminderAsync(ActorReminderToken reminder)
{
throw new NotImplementedException(Message);
}

public override Task UnregisterReminderAsync(ActorReminderToken reminder)
{
throw new NotImplementedException(Message);
Expand Down
7 changes: 7 additions & 0 deletions src/Dapr.Actors/Runtime/ActorTimerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ public abstract class ActorTimerManager
/// <returns>A task which will complete when the operation completes.</returns>
public abstract Task RegisterReminderAsync(ActorReminder reminder);

/// <summary>
/// Gets a reminder previously registered using
/// </summary>
/// <param name="reminder">The <see cref="ActorReminderToken" /> to unregister.</param>
/// <returns>A task which will complete when the operation completes.</returns>
public abstract Task<IActorReminder> GetReminderAsync(ActorReminderToken reminder);

/// <summary>
/// Unregisters the provided reminder with the runtime.
/// </summary>
Expand Down
30 changes: 30 additions & 0 deletions src/Dapr.Actors/Runtime/DefaultActorTimerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
using System;
using System.Text.Json;
using System.Threading.Tasks;
using System.IO;
using System.Text;

namespace Dapr.Actors.Runtime
{
Expand All @@ -37,6 +39,18 @@ public override async Task RegisterReminderAsync(ActorReminder reminder)
await this.interactor.RegisterReminderAsync(reminder.ActorType, reminder.ActorId.ToString(), reminder.Name, serialized);
}

public override async Task<IActorReminder> GetReminderAsync(ActorReminderToken token)
{
if (token == null)
{
throw new ArgumentNullException(nameof(token));
}

var responseStream = await this.interactor.GetReminderAsync(token.ActorType, token.ActorId.ToString(), token.Name);
var reminder = await DeserializeReminderAsync(responseStream, token);
return reminder;
}

public override async Task UnregisterReminderAsync(ActorReminderToken reminder)
{
if (reminder == null)
Expand Down Expand Up @@ -77,5 +91,21 @@ private async ValueTask<string> SerializeReminderAsync(ActorReminder reminder)
reminder.Ttl);
return await info.SerializeAsync();
}

private async ValueTask<ActorReminder> DeserializeReminderAsync(Stream stream, ActorReminderToken token)
{
if (stream == null)
{
throw new ArgumentNullException(nameof(stream));
}
var info = await ReminderInfo.DeserializeAsync(stream);
if(info == null)
{
return null;
}
var reminder = new ActorReminder(token.ActorType, token.ActorId, token.Name, info.Data, info.DueTime,
info.Period);
return reminder;
}
}
}
7 changes: 5 additions & 2 deletions src/Dapr.Actors/Runtime/ReminderInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace Dapr.Actors.Runtime
using System.Threading.Tasks;

// represents the wire format used by Dapr to store reminder info with the runtime
internal struct ReminderInfo
internal class ReminderInfo
{
public ReminderInfo(
byte[] data,
Expand Down Expand Up @@ -49,13 +49,16 @@ public ReminderInfo(
internal static async Task<ReminderInfo> DeserializeAsync(Stream stream)
{
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);
if(json.ValueKind == JsonValueKind.Null)
{
return null;
}

var dueTime = default(TimeSpan);
var period = default(TimeSpan);
var data = default(byte[]);
int? repetition = null;
TimeSpan? ttl = null;

if (json.TryGetProperty("dueTime", out var dueTimeProperty))
{
var dueTimeString = dueTimeProperty.GetString();
Expand Down
13 changes: 13 additions & 0 deletions test/Dapr.Actors.Test/ActorUnitTestTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public async Task CanTestStartingAndStoppingTimer()
public async Task CanTestStartingAndStoppinReminder()
{
var reminders = new List<ActorReminder>();
IActorReminder getReminder = null;

var timerManager = new Mock<ActorTimerManager>(MockBehavior.Strict);
timerManager
Expand All @@ -84,6 +85,9 @@ public async Task CanTestStartingAndStoppinReminder()
.Setup(tm => tm.UnregisterReminderAsync(It.IsAny<ActorReminderToken>()))
.Callback<ActorReminderToken>(reminder => reminders.RemoveAll(t => t.Name == reminder.Name))
.Returns(Task.CompletedTask);
timerManager
.Setup(tm => tm.GetReminderAsync(It.IsAny<ActorReminderToken>()))
.Returns(() => Task.FromResult(getReminder));

var host = ActorHost.CreateForTest<CoolTestActor>(new ActorTestOptions(){ TimerManager = timerManager.Object, });
var actor = new CoolTestActor(host);
Expand All @@ -109,6 +113,10 @@ public async Task CanTestStartingAndStoppinReminder()
await actor.ReceiveReminderAsync(reminder.Name, reminder.State, reminder.DueTime, reminder.Period);
}

getReminder = reminder;
var reminderFromGet = await actor.GetReminderAsync();
Assert.Equal(reminder, reminderFromGet);

// Stop the reminder
await actor.StopReminderAsync();
Assert.Empty(reminders);
Expand Down Expand Up @@ -148,6 +156,11 @@ public async Task StartReminderAsync(Message message)
await this.RegisterReminderAsync("record", bytes, dueTime: TimeSpan.Zero, period: TimeSpan.FromSeconds(5));
}

public async Task<IActorReminder> GetReminderAsync()
{
return await this.GetReminderAsync("record");
}

public async Task StopReminderAsync()
{
await this.UnregisterReminderAsync("record");
Expand Down
14 changes: 14 additions & 0 deletions test/Dapr.Actors.Test/TestDaprInteractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ Task<IActorResponseMessage> IDaprInteractor.InvokeActorMethodWithRemotingAsync(A
throw new System.NotImplementedException();
}

/// <summary>
/// Gets a reminder.
/// </summary>
/// <param name="actorType">Type of actor.</param>
/// <param name="actorId">ActorId.</param>
/// <param name="reminderName">Name of reminder to unregister.</param>
/// <param name="cancellationToken">Cancels the operation.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
public Task<Stream> GetReminderAsync(string actorType, string actorId, string reminderName,
CancellationToken cancellationToken = default)
{
throw new System.NotImplementedException();
}

/// <summary>
/// Unregisters a reminder.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ public interface IReminderActor : IPingActor, IActor
Task StartReminderWithTtlAndRepetitions(TimeSpan ttl, int repetitions);

Task<State> GetState();

Task<String> GetReminder();
}
}
6 changes: 6 additions & 0 deletions test/Dapr.E2E.Test.App/Actors/ReminderActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public async Task StartReminder(StartReminderOptions options)
await this.StateManager.SetStateAsync<State>("reminder-state", new State(){ IsReminderRunning = true, });
}

public async Task<String> GetReminder(){
var reminder = await this.GetReminderAsync("test-reminder");
var reminderString = JsonSerializer.Serialize(reminder, this.Host.JsonSerializerOptions);
return reminderString;
}

public async Task StartReminderWithTtl(TimeSpan ttl)
{
var options = new StartReminderOptions()
Expand Down
53 changes: 53 additions & 0 deletions test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
namespace Dapr.E2E.Test
{
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Actors;
Expand Down Expand Up @@ -49,6 +50,58 @@ public async Task ActorCanStartAndStopReminder()
Assert.Equal(10, state.Count);
}

[Fact]
public async Task ActorCanStartAndStopAndGetReminder()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
var proxy = this.ProxyFactory.CreateActorProxy<IReminderActor>(ActorId.CreateRandom(), "ReminderActor");

await WaitForActorRuntimeAsync(proxy, cts.Token);

// Get reminder before starting it, should return null.
var reminder = await proxy.GetReminder();
Assert.Equal("null", reminder);

// Start reminder, to count up to 10
await proxy.StartReminder(new StartReminderOptions(){ Total = 10, });

State state = new State();
var countGetReminder = 0;
while (true)
{
cts.Token.ThrowIfCancellationRequested();

reminder = await proxy.GetReminder();
Assert.NotNull(reminder);

// If reminder is null then it means the reminder has been stopped.
if (reminder != "null")
{
countGetReminder++;
var reminderJson = JsonSerializer.Deserialize<JsonElement>(reminder);
var name = reminderJson.GetProperty("name").ToString();
var period = reminderJson.GetProperty("period").ToString();
var dueTime = reminderJson.GetProperty("dueTime").ToString();

Assert.Equal("test-reminder", name);
Assert.Equal(TimeSpan.FromMilliseconds(50).ToString(), period);
Assert.Equal(TimeSpan.Zero.ToString(), dueTime);
}

state = await proxy.GetState();
this.Output.WriteLine($"Got Count: {state.Count} IsReminderRunning: {state.IsReminderRunning}");
if (!state.IsReminderRunning)
{
break;
}
}

// Should count up to exactly 10
Assert.Equal(10, state.Count);
// Should be able to Get Reminder at least once.
Assert.True(countGetReminder > 0);
}

[Fact]
public async Task ActorCanStartReminderWithRepetitions()
{
Expand Down

0 comments on commit 787cba6

Please sign in to comment.