diff --git a/Akka.Reminders.slnx b/Akka.Reminders.slnx index 4f04631..0c5bf9f 100644 --- a/Akka.Reminders.slnx +++ b/Akka.Reminders.slnx @@ -7,9 +7,12 @@ + + + - \ No newline at end of file + diff --git a/Directory.Packages.props b/Directory.Packages.props index 1505fae..e44f238 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -9,6 +9,7 @@ + @@ -29,4 +30,4 @@ - \ No newline at end of file + diff --git a/README.md b/README.md index 228a7ab..429a8ed 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Akka.Reminders provides a reliable way to schedule reminders (time-delayed messa ## Features - ✅ **Single and recurring reminders** - Schedule one-time or repeating time-based messages -- ✅ **SQL storage backends** - Production-ready SQL Server and PostgreSQL support +- ✅ **SQL storage backends** - Production-ready SQL Server, PostgreSQL, and SQLite support - ✅ **Automatic retries** - Failed deliveries retry with exponential backoff - ✅ **Cluster singleton** - Reminder scheduler with automatic failover - ✅ **Akka.Hosting integration** - First-class configuration API @@ -30,6 +30,7 @@ Akka.Reminders provides a reliable way to schedule reminders (time-delayed messa - [Configuration](#configuration) - [SQL Server Storage](#sql-server-storage) - [PostgreSQL Storage](#postgresql-storage) + - [SQLite Storage](#sqlite-storage) - [In-Memory Storage](#in-memory-storage) - [Reminder Settings](#reminder-settings) - [Usage Examples](#usage-examples) @@ -47,11 +48,21 @@ dotnet add package Aaron.Akka.Reminders **SQL Server Storage:** ```bash -dotnet add package Aaron.Akka.Reminders.Sql +dotnet add package Aaron.Akka.Reminders.SqlServer ``` **PostgreSQL Storage:** ```bash +dotnet add package Aaron.Akka.Reminders.PostgreSql +``` + +**SQLite Storage:** +```bash +dotnet add package Aaron.Akka.Reminders.Sqlite +``` + +**Legacy Compatibility Package (all providers):** +```bash dotnet add package Aaron.Akka.Reminders.Sql ``` @@ -59,9 +70,12 @@ dotnet add package Aaron.Akka.Reminders.Sql | Storage Backend | Package | Auto-Initialize | Documentation | |----------------|---------|-----------------|---------------| -| In-Memory | `Akka.Reminders` | N/A | Built-in (development/testing only) | -| SQL Server | `Akka.Reminders.Sql` | Yes | [SQL Server Schema](src/Akka.Reminders.Sql/Scripts/SqlServer-Create.sql) | -| PostgreSQL | `Akka.Reminders.Sql` | Yes | [PostgreSQL Schema](src/Akka.Reminders.Sql/Scripts/PostgreSql-Create.sql) | +| In-Memory | `Aaron.Akka.Reminders` | N/A | Built-in (development/testing only) | +| SQL Server | `Aaron.Akka.Reminders.SqlServer` | Yes | [SQL Server Schema](src/Akka.Reminders.SqlServer/Scripts/SqlServer-Create.sql) | +| PostgreSQL | `Aaron.Akka.Reminders.PostgreSql` | Yes | [PostgreSQL Schema](src/Akka.Reminders.PostgreSql/Scripts/PostgreSql-Create.sql) | +| SQLite | `Aaron.Akka.Reminders.Sqlite` | Yes | [SQLite Schema](src/Akka.Reminders.Sqlite/Scripts/Sqlite-Create.sql) | + +> **Compatibility:** `Aaron.Akka.Reminders.Sql` remains available and fully functional as a compatibility package, but new projects should prefer provider-specific packages (`SqlServer`, `PostgreSql`, or `Sqlite`). > **Note:** For production deployments, you can manually create database schemas using the provided SQL scripts instead of using auto-initialization. @@ -95,6 +109,7 @@ builder.Services.AddAkka("MySystem", (configBuilder, provider) => ```csharp using Akka.Hosting; using Akka.Reminders; +using Akka.Reminders.SqlServer.Hosting; var builder = WebApplication.CreateBuilder(args); @@ -166,6 +181,9 @@ public class MyEntityActor : ReceiveActor ### SQL Server Storage +`using Akka.Reminders.SqlServer.Hosting;` +`using Akka.Reminders.SqlServer.Configuration;` + The `WithSqlServerStorage` extension method configures SQL Server as the reminder storage backend: ```csharp @@ -179,21 +197,21 @@ The `WithSqlServerStorage` extension method configures SQL Server as the reminde **Parameters:** - `connectionString`: SQL Server connection string -- `schemaName`: Database schema name (default: "dbo") -- `tableName`: Table name for reminders (default: "reminders") +- `schemaName`: Database schema name (default: "reminders") +- `tableName`: Table name for reminders (default: "scheduled_reminders") - `autoInitialize`: Auto-create schema/table if missing (default: true) **Advanced Configuration:** ```csharp .WithReminders("reminder-host", reminders => reminders - .WithSqlServerStorage(settings => + .WithSqlServerStorage(new SqlServerReminderStorageSettings { - settings.ConnectionString = "Server=localhost;..."; - settings.SchemaName = "custom_schema"; - settings.TableName = "my_reminders"; - settings.CommandTimeout = TimeSpan.FromSeconds(60); - settings.AutoInitialize = false; // Manual schema management + ConnectionString = "Server=localhost;...", + SchemaName = "custom_schema", + TableName = "my_reminders", + CommandTimeout = TimeSpan.FromSeconds(60), + AutoInitialize = false // Manual schema management })) ``` @@ -201,7 +219,7 @@ The `WithSqlServerStorage` extension method configures SQL Server as the reminde For production environments, you may prefer to manually create the database schema. Use the provided SQL script: -📄 [SQL Server Schema Script](src/Akka.Reminders.Sql/Scripts/SqlServer-Create.sql) +📄 [SQL Server Schema Script](src/Akka.Reminders.SqlServer/Scripts/SqlServer-Create.sql) ```sql -- Run this script against your database @@ -210,6 +228,9 @@ For production environments, you may prefer to manually create the database sche ### PostgreSQL Storage +`using Akka.Reminders.PostgreSql.Hosting;` +`using Akka.Reminders.PostgreSql.Configuration;` + The `WithPostgreSqlStorage` extension method configures PostgreSQL as the reminder storage backend: ```csharp @@ -223,21 +244,21 @@ The `WithPostgreSqlStorage` extension method configures PostgreSQL as the remind **Parameters:** - `connectionString`: PostgreSQL connection string -- `schemaName`: Database schema name (default: "public") -- `tableName`: Table name for reminders (default: "reminders") +- `schemaName`: Database schema name (default: "reminders") +- `tableName`: Table name for reminders (default: "scheduled_reminders") - `autoInitialize`: Auto-create schema/table if missing (default: true) **Advanced Configuration:** ```csharp .WithReminders("reminder-host", reminders => reminders - .WithPostgreSqlStorage(settings => + .WithPostgreSqlStorage(new PostgreSqlReminderStorageSettings { - settings.ConnectionString = "Host=localhost;..."; - settings.SchemaName = "custom_schema"; - settings.TableName = "my_reminders"; - settings.CommandTimeout = TimeSpan.FromSeconds(60); - settings.AutoInitialize = false; // Manual schema management + ConnectionString = "Host=localhost;...", + SchemaName = "custom_schema", + TableName = "my_reminders", + CommandTimeout = TimeSpan.FromSeconds(60), + AutoInitialize = false // Manual schema management })) ``` @@ -245,13 +266,36 @@ The `WithPostgreSqlStorage` extension method configures PostgreSQL as the remind For production environments, you may prefer to manually create the database schema. Use the provided SQL script: -📄 [PostgreSQL Schema Script](src/Akka.Reminders.Sql/Scripts/PostgreSql-Create.sql) +📄 [PostgreSQL Schema Script](src/Akka.Reminders.PostgreSql/Scripts/PostgreSql-Create.sql) ```sql -- Run this script against your database -- Creates schema, table, and indexes ``` +### SQLite Storage + +`using Akka.Reminders.Sqlite.Hosting;` + +The `WithSqliteStorage` extension method configures SQLite as the reminder storage backend: + +```csharp +.WithReminders("reminder-host", reminders => reminders + .WithSqliteStorage( + connectionString: "Data Source=reminders.db;Mode=ReadWriteCreate;Cache=Shared", + tableName: "akka_reminders", + autoInitialize: true)) +``` + +**Parameters:** +- `connectionString`: SQLite connection string +- `tableName`: Table name for reminders (default: "scheduled_reminders") +- `autoInitialize`: Auto-create table if missing (default: true) + +**Manual Schema Setup:** + +📄 [SQLite Schema Script](src/Akka.Reminders.Sqlite/Scripts/Sqlite-Create.sql) + ### In-Memory Storage For development and testing, use the built-in in-memory storage: diff --git a/src/Akka.Reminders.Benchmarks/Akka.Reminders.Benchmarks.csproj b/src/Akka.Reminders.Benchmarks/Akka.Reminders.Benchmarks.csproj index 33cfd4d..83a7439 100644 --- a/src/Akka.Reminders.Benchmarks/Akka.Reminders.Benchmarks.csproj +++ b/src/Akka.Reminders.Benchmarks/Akka.Reminders.Benchmarks.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/Akka.Reminders.Benchmarks/SqlReminderBenchmarkBase.cs b/src/Akka.Reminders.Benchmarks/SqlReminderBenchmarkBase.cs index f425806..c44691e 100644 --- a/src/Akka.Reminders.Benchmarks/SqlReminderBenchmarkBase.cs +++ b/src/Akka.Reminders.Benchmarks/SqlReminderBenchmarkBase.cs @@ -1,6 +1,6 @@ using Akka.Actor; -using Akka.Reminders.Sql; -using Akka.Reminders.Sql.Configuration; +using Akka.Reminders.PostgreSql; +using Akka.Reminders.PostgreSql.Configuration; using BenchmarkDotNet.Attributes; using Npgsql; using NpgsqlTypes; @@ -17,15 +17,15 @@ public abstract class SqlReminderBenchmarkBase "Host=localhost;Port=5432;Database=reminders_bench;Username=postgres;Password=postgres"; private ActorSystem? _system; - protected SqlReminderStorage Storage { get; private set; } = null!; + protected PostgreSqlReminderStorage Storage { get; private set; } = null!; [GlobalSetup] public async Task GlobalSetup() { _system = ActorSystem.Create("benchmark-system"); - var settings = SqlReminderStorageSettings.CreatePostgreSql(ConnectionString); - Storage = new SqlReminderStorage(settings, _system); + var settings = PostgreSqlReminderStorageSettings.Create(ConnectionString); + Storage = new PostgreSqlReminderStorage(settings, _system); // Force table creation via auto-initialize await Storage.GetRemindersOverviewAsync(DateTimeOffset.UtcNow); diff --git a/src/Akka.Reminders.PostgreSql/Akka.Reminders.PostgreSql.csproj b/src/Akka.Reminders.PostgreSql/Akka.Reminders.PostgreSql.csproj new file mode 100644 index 0000000..9b61afe --- /dev/null +++ b/src/Akka.Reminders.PostgreSql/Akka.Reminders.PostgreSql.csproj @@ -0,0 +1,25 @@ + + + + net9.0 + enable + enable + Aaron.Akka.Reminders.PostgreSql + PostgreSQL storage implementation for Akka.Reminders. + akka;akkadotnet;akka.reminders;sql;postgresql; + true + + + + + + + + + + + + + + + diff --git a/src/Akka.Reminders.PostgreSql/Configuration/PostgreSqlReminderStorageSettings.cs b/src/Akka.Reminders.PostgreSql/Configuration/PostgreSqlReminderStorageSettings.cs new file mode 100644 index 0000000..35e0f2a --- /dev/null +++ b/src/Akka.Reminders.PostgreSql/Configuration/PostgreSqlReminderStorageSettings.cs @@ -0,0 +1,72 @@ +namespace Akka.Reminders.PostgreSql.Configuration; + +/// +/// Configuration settings for PostgreSQL reminder storage. +/// +public sealed record PostgreSqlReminderStorageSettings +{ + /// + /// The PostgreSQL connection string. + /// + public required string ConnectionString { get; init; } + + /// + /// The schema name for the reminders table. + /// Default: "reminders" + /// + public string SchemaName { get; init; } = "reminders"; + + /// + /// The table name for storing reminders. + /// Default: "scheduled_reminders" + /// + public string TableName { get; init; } = "scheduled_reminders"; + + /// + /// Whether to automatically create the schema and table if they don't exist. + /// Default: true + /// + public bool AutoInitialize { get; init; } = true; + + /// + /// The timeout for database operations. + /// Default: 30 seconds + /// + public TimeSpan CommandTimeout { get; init; } = TimeSpan.FromSeconds(30); + + /// + /// Creates settings for PostgreSQL. + /// + public static PostgreSqlReminderStorageSettings Create( + string connectionString, + string? schemaName = null, + string? tableName = null, + bool? autoInitialize = null) + { + return new PostgreSqlReminderStorageSettings + { + ConnectionString = connectionString, + SchemaName = schemaName ?? "reminders", + TableName = tableName ?? "scheduled_reminders", + AutoInitialize = autoInitialize ?? true + }; + } + + /// + /// Validates the settings and throws if invalid. + /// + public void Validate() + { + if (string.IsNullOrWhiteSpace(ConnectionString)) + throw new ArgumentException("ConnectionString cannot be null or empty.", nameof(ConnectionString)); + + if (string.IsNullOrWhiteSpace(SchemaName)) + throw new ArgumentException("SchemaName cannot be null or empty.", nameof(SchemaName)); + + if (string.IsNullOrWhiteSpace(TableName)) + throw new ArgumentException("TableName cannot be null or empty.", nameof(TableName)); + + if (CommandTimeout <= TimeSpan.Zero) + throw new ArgumentException("CommandTimeout must be positive.", nameof(CommandTimeout)); + } +} diff --git a/src/Akka.Reminders.PostgreSql/Hosting/PostgreSqlStorageExtensions.cs b/src/Akka.Reminders.PostgreSql/Hosting/PostgreSqlStorageExtensions.cs new file mode 100644 index 0000000..fbd1f35 --- /dev/null +++ b/src/Akka.Reminders.PostgreSql/Hosting/PostgreSqlStorageExtensions.cs @@ -0,0 +1,44 @@ +using Akka.Actor; +using Akka.Reminders.PostgreSql.Configuration; + +namespace Akka.Reminders.PostgreSql.Hosting; + +/// +/// Extension methods for configuring PostgreSQL reminder storage. +/// +public static class PostgreSqlStorageExtensions +{ + public static ReminderConfigurationBuilder WithPostgreSqlStorage( + this ReminderConfigurationBuilder builder, + string connectionString, + string? schemaName = null, + string? tableName = null, + bool? autoInitialize = null) + { + var settings = PostgreSqlReminderStorageSettings.Create( + connectionString, + schemaName, + tableName, + autoInitialize); + + return builder.WithStorage(system => new PostgreSqlReminderStorage(settings, system)); + } + + public static ReminderConfigurationBuilder WithPostgreSqlStorage( + this ReminderConfigurationBuilder builder, + PostgreSqlReminderStorageSettings settings) + { + return builder.WithStorage(system => new PostgreSqlReminderStorage(settings, system)); + } + + public static ReminderConfigurationBuilder WithPostgreSqlStorage( + this ReminderConfigurationBuilder builder, + Func settingsFactory) + { + return builder.WithStorage(system => + { + var settings = settingsFactory(system); + return new PostgreSqlReminderStorage(settings, system); + }); + } +} diff --git a/src/Akka.Reminders.PostgreSql/Internal/ISqlDialect.cs b/src/Akka.Reminders.PostgreSql/Internal/ISqlDialect.cs new file mode 100644 index 0000000..52947be --- /dev/null +++ b/src/Akka.Reminders.PostgreSql/Internal/ISqlDialect.cs @@ -0,0 +1,19 @@ +using System.Data.Common; + +namespace Akka.Reminders.PostgreSql.Internal; + +internal interface ISqlDialect +{ + string GetCreateTableSql(string schemaName, string tableName); + string GetUpsertReminderSql(string schemaName, string tableName); + string GetSelectDueRemindersSql(string schemaName, string tableName, int maxCount); + string GetMarkCompletedSql(string schemaName, string tableName); + string GetBatchMarkCompletedSql(string schemaName, string tableName, int count); + string GetCleanupSql(string schemaName, string tableName); + string GetOverviewSql(string schemaName, string tableName); + string GetCancelReminderSql(string schemaName, string tableName); + string GetCancelAllRemindersSql(string schemaName, string tableName); + string GetFetchRemindersSql(string schemaName, string tableName); + DbConnection CreateConnection(string connectionString); + void AddParameter(DbCommand command, string name, object value); +} diff --git a/src/Akka.Reminders.Sql/Internal/PostgreSqlDialect.cs b/src/Akka.Reminders.PostgreSql/Internal/PostgreSqlDialect.cs similarity index 89% rename from src/Akka.Reminders.Sql/Internal/PostgreSqlDialect.cs rename to src/Akka.Reminders.PostgreSql/Internal/PostgreSqlDialect.cs index 99682d1..520cb7f 100644 --- a/src/Akka.Reminders.Sql/Internal/PostgreSqlDialect.cs +++ b/src/Akka.Reminders.PostgreSql/Internal/PostgreSqlDialect.cs @@ -3,12 +3,8 @@ using Npgsql; using NpgsqlTypes; -namespace Akka.Reminders.Sql.Internal; +namespace Akka.Reminders.PostgreSql.Internal; -/// -/// PostgreSQL implementation of the SQL dialect for reminder storage. -/// Uses PostgreSQL-specific features like INSERT ON CONFLICT and TIMESTAMP WITH TIME ZONE. -/// internal sealed class PostgreSqlDialect : ISqlDialect { public static readonly PostgreSqlDialect Instance = new(); @@ -40,12 +36,10 @@ completion_status VARCHAR(20) NOT NULL DEFAULT 'Pending', CONSTRAINT pk_{tableName} PRIMARY KEY (shard_region_name, entity_id, reminder_key) ); - -- Filtered index for efficient queries on pending reminders CREATE INDEX IF NOT EXISTS ix_{tableName}_due_reminders ON {fullTableName} (when_utc, shard_region_name, entity_id) WHERE is_completed = FALSE; - -- Index for cleanup operations CREATE INDEX IF NOT EXISTS ix_{tableName}_cleanup ON {fullTableName} (completed_at_utc) WHERE is_completed = TRUE; @@ -117,7 +111,6 @@ public string GetBatchMarkCompletedSql(string schemaName, string tableName, int { var fullTableName = $"\"{schemaName}\".\"{tableName}\""; - // Build VALUES list: (@sr0, @eid0, @rk0), (@sr1, @eid1, @rk1), ... var values = string.Join(",\n ", Enumerable.Range(0, count).Select(i => $"(@sr{i}::varchar, @eid{i}::varchar, @rk{i}::varchar)")); @@ -172,9 +165,6 @@ public string GetCancelReminderSql(string schemaName, string tableName) AND entity_id = @EntityId AND reminder_key = @ReminderKey AND is_completed = FALSE; - - -- PostgreSQL doesn't have @@ROWCOUNT, use GET DIAGNOSTICS - -- This will be handled in the storage implementation """; } @@ -190,9 +180,6 @@ public string GetCancelAllRemindersSql(string schemaName, string tableName) WHERE shard_region_name = @ShardRegionName AND entity_id = @EntityId AND is_completed = FALSE; - - -- PostgreSQL doesn't have @@ROWCOUNT, use GET DIAGNOSTICS - -- This will be handled in the storage implementation """; } @@ -220,26 +207,20 @@ public void AddParameter(DbCommand command, string name, object value) { var npgsqlCommand = (NpgsqlCommand)command; - // Handle null values if (value == null) { npgsqlCommand.Parameters.AddWithValue(name, DBNull.Value); return; } - // Handle specific types with proper PostgreSQL types switch (value) { case DateTimeOffset dto: - // PostgreSQL TimestampTz has microsecond precision (6 decimals), .NET has 100ns precision (7 decimals) - // Truncate to microseconds to avoid precision loss on round-trip - // 1 microsecond = 10 ticks, so truncate to the nearest 10 ticks var ticksToRemove = dto.Ticks % 10; var truncatedDto = ticksToRemove == 0 ? dto : new DateTimeOffset(dto.Ticks - ticksToRemove, dto.Offset); npgsqlCommand.Parameters.Add(new NpgsqlParameter(name, NpgsqlDbType.TimestampTz) { Value = truncatedDto }); break; case DateTime dt: - // Truncate to microseconds for consistency var dtTicksToRemove = dt.Ticks % 10; var truncatedDt = dtTicksToRemove == 0 ? dt : new DateTime(dt.Ticks - dtTicksToRemove, dt.Kind); npgsqlCommand.Parameters.Add(new NpgsqlParameter(name, NpgsqlDbType.TimestampTz) { Value = truncatedDt }); diff --git a/src/Akka.Reminders.PostgreSql/PostgreSqlReminderStorage.cs b/src/Akka.Reminders.PostgreSql/PostgreSqlReminderStorage.cs new file mode 100644 index 0000000..5e66a1f --- /dev/null +++ b/src/Akka.Reminders.PostgreSql/PostgreSqlReminderStorage.cs @@ -0,0 +1,494 @@ +using System.Data; +using Akka.Actor; +using Akka.Reminders.PostgreSql.Configuration; +using Akka.Reminders.PostgreSql.Internal; +using Akka.Reminders.Storage; + +namespace Akka.Reminders.PostgreSql; + +/// +/// PostgreSQL implementation of . +/// +public sealed class PostgreSqlReminderStorage : IReminderStorage +{ + private readonly PostgreSqlReminderStorageSettings _settings; + private readonly ISqlDialect _dialect; + private readonly Akka.Serialization.Serialization _serialization; + private readonly object _initLock = new(); + private volatile bool _initialized; + + public PostgreSqlReminderStorage(PostgreSqlReminderStorageSettings settings, ActorSystem system) + { + _settings = settings ?? throw new ArgumentNullException(nameof(settings)); + _settings.Validate(); + + _serialization = system.Serialization; + _dialect = PostgreSqlDialect.Instance; + } + + private static DateTimeOffset TruncateToMicroseconds(DateTimeOffset dto) + { + var ticksToRemove = dto.Ticks % 10; + return ticksToRemove == 0 ? dto : new DateTimeOffset(dto.Ticks - ticksToRemove, dto.Offset); + } + + public async Task ScheduleReminderAsync( + ScheduledReminder reminder, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var truncatedWhen = TruncateToMicroseconds(reminder.When); + var adjustedReminder = reminder with { When = truncatedWhen }; + + try + { + var (serializerId, manifest, payload) = SerializeMessage(adjustedReminder.Message); + + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetUpsertReminderSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@ShardRegionName", adjustedReminder.Entity.ShardRegionName); + _dialect.AddParameter(command, "@EntityId", adjustedReminder.Entity.EntityId); + _dialect.AddParameter(command, "@ReminderKey", adjustedReminder.Key.Name); + _dialect.AddParameter(command, "@WhenUtc", adjustedReminder.When); + _dialect.AddParameter(command, "@RepeatIntervalTicks", adjustedReminder.RepeatInterval?.Ticks ?? (object)DBNull.Value); + _dialect.AddParameter(command, "@SerializerId", serializerId); + _dialect.AddParameter(command, "@Manifest", manifest ?? (object)DBNull.Value); + _dialect.AddParameter(command, "@Payload", payload); + _dialect.AddParameter(command, "@AttemptCount", adjustedReminder.AttemptCount); + _dialect.AddParameter(command, "@LastFailureReason", adjustedReminder.LastFailureReason ?? (object)DBNull.Value); + + await command.ExecuteNonQueryAsync(cancellationToken); + + return new ReminderProtocol.ReminderScheduled( + adjustedReminder.ToScheduleReminder(), + ReminderScheduleResponseCode.Success); + } + catch (Exception ex) + { + return new ReminderProtocol.ReminderScheduled( + adjustedReminder.ToScheduleReminder(), + ReminderScheduleResponseCode.Error, + ex.Message); + } + } + + public async Task GetNextRemindersAsync( + DateTimeOffset untilDeadline, + DateTimeOffset now, + ReminderBatchSize maxCount, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var reminders = new List(); + + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetSelectDueRemindersSql( + _settings.SchemaName, + _settings.TableName, + maxCount.Value); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@UntilDeadline", untilDeadline.UtcDateTime); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + + while (await reader.ReadAsync(cancellationToken)) + { + var reminder = ReadReminderFromReader(reader); + reminders.Add(reminder); + } + + var overview = await GetRemindersOverviewAsync(now, cancellationToken); + + var fetchedKeys = new HashSet<(string, string, string)>( + reminders.Select(r => (r.Entity.ShardRegionName, r.Entity.EntityId, r.Key.Name))); + + await using var conn2 = _dialect.CreateConnection(_settings.ConnectionString); + await conn2.OpenAsync(cancellationToken); + await using var cmd2 = conn2.CreateCommand(); + cmd2.CommandText = _dialect.GetOverviewSql(_settings.SchemaName, _settings.TableName); + cmd2.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + await using var reader2 = await cmd2.ExecuteReaderAsync(cancellationToken); + + var remainingReminders = new List(); + while (await reader2.ReadAsync(cancellationToken)) + { + var isCompleted = reader2.GetBoolean(reader2.GetOrdinal("is_completed")); + if (!isCompleted) + { + var reminder = ReadReminderFromReader(reader2); + var key = (reminder.Entity.ShardRegionName, reminder.Entity.EntityId, reminder.Key.Name); + if (!fetchedKeys.Contains(key)) + { + remainingReminders.Add(reminder); + } + } + } + + var nextReminder = remainingReminders.OrderBy(r => r.When).FirstOrDefault(); + var timeUntilNext = nextReminder != null ? nextReminder.When - now : TimeSpan.MaxValue; + + var adjustedOverview = new ReminderOverview + { + TimeUntilNext = timeUntilNext, + TotalPendingReminders = remainingReminders.Count + }; + + return new PendingRemindersWithSummary(reminders, adjustedOverview); + } + + private const int MaxRemindersPerStatement = 500; + + public async Task MarkRemindersAsCompletedAsync( + IEnumerable completedReminders, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var remindersList = completedReminders.ToList(); + if (remindersList.Count == 0) + return true; + + try + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + var groups = remindersList.GroupBy(r => (r.Status, r.When)); + + foreach (var group in groups) + { + var items = group.ToList(); + + for (var offset = 0; offset < items.Count; offset += MaxRemindersPerStatement) + { + var chunk = items.Skip(offset).Take(MaxRemindersPerStatement).ToList(); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetBatchMarkCompletedSql( + _settings.SchemaName, _settings.TableName, chunk.Count); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@CompletedAtUtc", group.Key.When.UtcDateTime); + _dialect.AddParameter(command, "@CompletionStatus", group.Key.Status.ToString()); + + for (var i = 0; i < chunk.Count; i++) + { + _dialect.AddParameter(command, $"@sr{i}", chunk[i].Entity.ShardRegionName); + _dialect.AddParameter(command, $"@eid{i}", chunk[i].Entity.EntityId); + _dialect.AddParameter(command, $"@rk{i}", chunk[i].Key.Name); + } + + await command.ExecuteNonQueryAsync(cancellationToken); + } + } + + return true; + } + catch (Exception) + { + return false; + } + } + + public async Task GetRemindersOverviewAsync( + DateTimeOffset now, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var allReminders = new List(); + + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetOverviewSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + + while (await reader.ReadAsync(cancellationToken)) + { + var isCompleted = reader.GetBoolean(reader.GetOrdinal("is_completed")); + + if (!isCompleted) + { + var reminder = ReadReminderFromReader(reader); + allReminders.Add(reminder); + } + } + + var nextReminder = allReminders.OrderBy(r => r.When).FirstOrDefault(); + var timeUntilNext = nextReminder != null ? nextReminder.When - now : TimeSpan.MaxValue; + + return new ReminderOverview + { + TimeUntilNext = timeUntilNext, + TotalPendingReminders = allReminders.Count + }; + } + + public async Task CleanUpCompletedRemindersAsync( + DateTimeOffset olderThan, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + try + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetCleanupSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@OlderThan", olderThan.UtcDateTime); + + await command.ExecuteNonQueryAsync(cancellationToken); + return true; + } + catch (Exception) + { + return false; + } + } + + public async Task CancelReminderAsync( + ReminderEntity entity, + ReminderKey key, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + try + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetCancelReminderSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@ShardRegionName", entity.ShardRegionName); + _dialect.AddParameter(command, "@EntityId", entity.EntityId); + _dialect.AddParameter(command, "@ReminderKey", key.Name); + _dialect.AddParameter(command, "@CompletedAtUtc", DateTimeOffset.UtcNow.UtcDateTime); + + var count = await command.ExecuteNonQueryAsync(cancellationToken); + + if (count > 0) + { + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.Success, + new List { key }, + null); + } + + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.NotFound, + new List(), + null); + } + catch (Exception ex) + { + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.Error, + new List(), + ex.Message); + } + } + + public async Task CancelAllRemindersForEntityAsync( + ReminderEntity entity, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + try + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + var cancelledKeys = new List(); + + await using (var selectCommand = connection.CreateCommand()) + { + selectCommand.CommandText = _dialect.GetFetchRemindersSql(_settings.SchemaName, _settings.TableName); + selectCommand.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(selectCommand, "@ShardRegionName", entity.ShardRegionName); + _dialect.AddParameter(selectCommand, "@EntityId", entity.EntityId); + + await using var reader = await selectCommand.ExecuteReaderAsync(cancellationToken); + while (await reader.ReadAsync(cancellationToken)) + { + var reminderKey = reader.GetString(reader.GetOrdinal("reminder_key")); + var isCompleted = reader.GetBoolean(reader.GetOrdinal("is_completed")); + + if (!isCompleted) + { + cancelledKeys.Add(new ReminderKey(reminderKey)); + } + } + } + + if (cancelledKeys.Count == 0) + { + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.NotFound, + new List(), + null); + } + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetCancelAllRemindersSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@ShardRegionName", entity.ShardRegionName); + _dialect.AddParameter(command, "@EntityId", entity.EntityId); + _dialect.AddParameter(command, "@CompletedAtUtc", DateTimeOffset.UtcNow.UtcDateTime); + + await command.ExecuteNonQueryAsync(cancellationToken); + + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.Success, + cancelledKeys, + null); + } + catch (Exception ex) + { + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.Error, + new List(), + ex.Message); + } + } + + public async Task> GetRemindersForEntityAsync( + ReminderEntity entity, + int take = 10, + int skip = 0, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var reminders = new List(); + + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetFetchRemindersSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@ShardRegionName", entity.ShardRegionName); + _dialect.AddParameter(command, "@EntityId", entity.EntityId); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + + while (await reader.ReadAsync(cancellationToken)) + { + var reminder = ReadReminderFromReader(reader); + reminders.Add(reminder); + } + + return reminders.Skip(skip).Take(take).ToList(); + } + + private Task EnsureInitializedAsync(CancellationToken cancellationToken) + { + if (_initialized || !_settings.AutoInitialize) + return Task.CompletedTask; + + lock (_initLock) + { + if (_initialized) + return Task.CompletedTask; + + Task.Run(async () => + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetCreateTableSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + await command.ExecuteNonQueryAsync(cancellationToken); + }, cancellationToken).Wait(cancellationToken); + + _initialized = true; + } + + return Task.CompletedTask; + } + + private (int serializerId, string? manifest, byte[] payload) SerializeMessage(object message) + { + var serializer = _serialization.FindSerializerFor(message); + var manifest = Akka.Serialization.Serialization.ManifestFor(serializer, message); + var payload = serializer.ToBinary(message); + + return (serializer.Identifier, manifest, payload); + } + + private object DeserializeMessage(int serializerId, string? manifest, byte[] payload) + { + return _serialization.Deserialize(payload, serializerId, manifest ?? string.Empty); + } + + private ScheduledReminder ReadReminderFromReader(IDataReader reader) + { + var shardRegionName = reader.GetString(reader.GetOrdinal("shard_region_name")); + var entityId = reader.GetString(reader.GetOrdinal("entity_id")); + var reminderKey = reader.GetString(reader.GetOrdinal("reminder_key")); + var whenUtc = reader.GetDateTime(reader.GetOrdinal("when_utc")); + + var repeatIntervalTicksOrdinal = reader.GetOrdinal("repeat_interval_ticks"); + var repeatInterval = reader.IsDBNull(repeatIntervalTicksOrdinal) + ? (TimeSpan?)null + : TimeSpan.FromTicks(reader.GetInt64(repeatIntervalTicksOrdinal)); + + var serializerId = reader.GetInt32(reader.GetOrdinal("serializer_id")); + + var manifestOrdinal = reader.GetOrdinal("manifest"); + var manifest = reader.IsDBNull(manifestOrdinal) ? null : reader.GetString(manifestOrdinal); + + var payload = (byte[])reader.GetValue(reader.GetOrdinal("payload")); + var attemptCount = reader.GetInt32(reader.GetOrdinal("attempt_count")); + + var lastFailureReasonOrdinal = reader.GetOrdinal("last_failure_reason"); + var lastFailureReason = reader.IsDBNull(lastFailureReasonOrdinal) + ? null + : reader.GetString(lastFailureReasonOrdinal); + + var message = DeserializeMessage(serializerId, manifest, payload); + + return new ScheduledReminder( + new ReminderEntity(shardRegionName, entityId), + new ReminderKey(reminderKey), + new DateTimeOffset(DateTime.SpecifyKind(whenUtc, DateTimeKind.Utc)), + message, + repeatInterval, + attemptCount, + lastFailureReason); + } +} diff --git a/src/Akka.Reminders.Sql/Scripts/PostgreSql-Create.sql b/src/Akka.Reminders.PostgreSql/Scripts/PostgreSql-Create.sql similarity index 100% rename from src/Akka.Reminders.Sql/Scripts/PostgreSql-Create.sql rename to src/Akka.Reminders.PostgreSql/Scripts/PostgreSql-Create.sql diff --git a/src/Akka.Reminders.Sql/Akka.Reminders.Sql.csproj b/src/Akka.Reminders.Sql/Akka.Reminders.Sql.csproj index 10944f2..7b41b31 100644 --- a/src/Akka.Reminders.Sql/Akka.Reminders.Sql.csproj +++ b/src/Akka.Reminders.Sql/Akka.Reminders.Sql.csproj @@ -5,23 +5,16 @@ enable enable Aaron.Akka.Reminders.Sql - SQL Server and PostgreSQL storage implementation for Akka.Reminders. - akka;akkadotnet;akka.reminders;sql;sqlserver;postgresql; + Compatibility package for SQL Server, PostgreSQL, and SQLite storage in Akka.Reminders. + akka;akkadotnet;akka.reminders;sql;sqlserver;postgresql;sqlite;compatibility; true - - - - - - - - - - + + + diff --git a/src/Akka.Reminders.Sql/AkkaHostingExtensions.cs b/src/Akka.Reminders.Sql/AkkaHostingExtensions.cs index bbc40cb..27c52e5 100644 --- a/src/Akka.Reminders.Sql/AkkaHostingExtensions.cs +++ b/src/Akka.Reminders.Sql/AkkaHostingExtensions.cs @@ -140,4 +140,47 @@ public static ReminderConfigurationBuilder WithPostgreSqlStorage( return new SqlReminderStorage(settings, system); }); } + + /// + /// Configures the reminder system to use SQLite storage. + /// + /// The reminder configuration builder. + /// The SQLite connection string. + /// Optional table name (defaults to "reminders"). + /// Whether to automatically create the table if it doesn't exist (defaults to true). + /// The builder for method chaining. + public static ReminderConfigurationBuilder WithSqliteStorage( + this ReminderConfigurationBuilder builder, + string connectionString, + string tableName = "reminders", + bool autoInitialize = true) + { + return builder.WithStorage(system => + { + var settings = SqlReminderStorageSettings.CreateSqlite( + connectionString, + tableName, + autoInitialize); + return new SqlReminderStorage(settings, system); + }); + } + + /// + /// Configures the reminder system to use SQLite storage with custom settings. + /// + /// The reminder configuration builder. + /// Action to configure the SQLite storage settings. + /// The builder for method chaining. + public static ReminderConfigurationBuilder WithSqliteStorage( + this ReminderConfigurationBuilder builder, + Action configure) + { + return builder.WithStorage(system => + { + var settings = SqlReminderStorageSettings.CreateSqlite(""); + configure(settings); + settings.Validate(); + return new SqlReminderStorage(settings, system); + }); + } } diff --git a/src/Akka.Reminders.Sql/Configuration/SqlReminderStorageSettings.cs b/src/Akka.Reminders.Sql/Configuration/SqlReminderStorageSettings.cs index db8ee69..be3e28b 100644 --- a/src/Akka.Reminders.Sql/Configuration/SqlReminderStorageSettings.cs +++ b/src/Akka.Reminders.Sql/Configuration/SqlReminderStorageSettings.cs @@ -1,8 +1,11 @@ +using Akka.Reminders.PostgreSql.Configuration; +using Akka.Reminders.Sqlite.Configuration; +using Akka.Reminders.SqlServer.Configuration; + namespace Akka.Reminders.Sql.Configuration; /// -/// Configuration settings for SQL-based reminder storage. -/// Controls database connection, schema, and initialization behavior. +/// Compatibility settings for SQL-based reminder storage. /// public sealed record SqlReminderStorageSettings { @@ -13,12 +16,12 @@ public sealed record SqlReminderStorageSettings /// /// The database provider name. - /// Supported values: "SqlServer", "PostgreSql" + /// Supported values: "SqlServer", "PostgreSql", "Sqlite" /// public required string ProviderName { get; init; } /// - /// The schema name for the reminders table. + /// The schema name for providers that support schemas. /// Default: "reminders" /// public string SchemaName { get; init; } = "reminders"; @@ -30,8 +33,8 @@ public sealed record SqlReminderStorageSettings public string TableName { get; init; } = "scheduled_reminders"; /// - /// Whether to automatically create the schema and table if they don't exist. - /// Default: true (similar to Akka.Persistence behavior) + /// Whether to automatically create schema/table if they don't exist. + /// Default: true /// public bool AutoInitialize { get; init; } = true; @@ -41,9 +44,6 @@ public sealed record SqlReminderStorageSettings /// public TimeSpan CommandTimeout { get; init; } = TimeSpan.FromSeconds(30); - /// - /// Creates settings for SQL Server. - /// public static SqlReminderStorageSettings CreateSqlServer( string connectionString, string? schemaName = null, @@ -60,9 +60,6 @@ public static SqlReminderStorageSettings CreateSqlServer( }; } - /// - /// Creates settings for PostgreSQL. - /// public static SqlReminderStorageSettings CreatePostgreSql( string connectionString, string? schemaName = null, @@ -79,9 +76,20 @@ public static SqlReminderStorageSettings CreatePostgreSql( }; } - /// - /// Validates the settings and throws if invalid. - /// + public static SqlReminderStorageSettings CreateSqlite( + string connectionString, + string? tableName = null, + bool? autoInitialize = null) + { + return new SqlReminderStorageSettings + { + ConnectionString = connectionString, + ProviderName = "Sqlite", + TableName = tableName ?? "scheduled_reminders", + AutoInitialize = autoInitialize ?? true + }; + } + public void Validate() { if (string.IsNullOrWhiteSpace(ConnectionString)) @@ -90,10 +98,12 @@ public void Validate() if (string.IsNullOrWhiteSpace(ProviderName)) throw new ArgumentException("ProviderName cannot be null or empty.", nameof(ProviderName)); - if (ProviderName != "SqlServer" && ProviderName != "PostgreSql") - throw new ArgumentException($"Unsupported provider: {ProviderName}. Supported providers: SqlServer, PostgreSql", nameof(ProviderName)); + if (ProviderName != "SqlServer" && ProviderName != "PostgreSql" && ProviderName != "Sqlite") + throw new ArgumentException( + $"Unsupported provider: {ProviderName}. Supported providers: SqlServer, PostgreSql, Sqlite", + nameof(ProviderName)); - if (string.IsNullOrWhiteSpace(SchemaName)) + if (ProviderName != "Sqlite" && string.IsNullOrWhiteSpace(SchemaName)) throw new ArgumentException("SchemaName cannot be null or empty.", nameof(SchemaName)); if (string.IsNullOrWhiteSpace(TableName)) @@ -102,4 +112,39 @@ public void Validate() if (CommandTimeout <= TimeSpan.Zero) throw new ArgumentException("CommandTimeout must be positive.", nameof(CommandTimeout)); } + + internal SqlServerReminderStorageSettings ToSqlServerSettings() + { + return new SqlServerReminderStorageSettings + { + ConnectionString = ConnectionString, + SchemaName = SchemaName, + TableName = TableName, + AutoInitialize = AutoInitialize, + CommandTimeout = CommandTimeout + }; + } + + internal PostgreSqlReminderStorageSettings ToPostgreSqlSettings() + { + return new PostgreSqlReminderStorageSettings + { + ConnectionString = ConnectionString, + SchemaName = SchemaName, + TableName = TableName, + AutoInitialize = AutoInitialize, + CommandTimeout = CommandTimeout + }; + } + + internal SqliteReminderStorageSettings ToSqliteSettings() + { + return new SqliteReminderStorageSettings + { + ConnectionString = ConnectionString, + TableName = TableName, + AutoInitialize = AutoInitialize, + CommandTimeout = CommandTimeout + }; + } } diff --git a/src/Akka.Reminders.Sql/Hosting/SqlStorageExtensions.cs b/src/Akka.Reminders.Sql/Hosting/SqlStorageExtensions.cs index f6ced74..2ffd65c 100644 --- a/src/Akka.Reminders.Sql/Hosting/SqlStorageExtensions.cs +++ b/src/Akka.Reminders.Sql/Hosting/SqlStorageExtensions.cs @@ -1,5 +1,11 @@ using Akka.Actor; using Akka.Reminders.Sql.Configuration; +using Akka.Reminders.Sqlite; +using Akka.Reminders.Sqlite.Configuration; +using Akka.Reminders.SqlServer; +using Akka.Reminders.SqlServer.Configuration; +using Akka.Reminders.PostgreSql; +using Akka.Reminders.PostgreSql.Configuration; namespace Akka.Reminders.Sql.Hosting; @@ -8,15 +14,6 @@ namespace Akka.Reminders.Sql.Hosting; /// public static class SqlStorageExtensions { - /// - /// Configures SQL Server storage for reminders. - /// - /// The reminder configuration builder. - /// The SQL Server connection string. - /// Optional schema name (default: "reminders"). - /// Optional table name (default: "scheduled_reminders"). - /// Whether to auto-create schema/table (default: true). - /// The builder for method chaining. public static ReminderConfigurationBuilder WithSqlServerStorage( this ReminderConfigurationBuilder builder, string connectionString, @@ -24,24 +21,15 @@ public static ReminderConfigurationBuilder WithSqlServerStorage( string? tableName = null, bool? autoInitialize = null) { - var settings = SqlReminderStorageSettings.CreateSqlServer( + var settings = SqlServerReminderStorageSettings.Create( connectionString, schemaName, tableName, autoInitialize); - return builder.WithStorage(system => new SqlReminderStorage(settings, system)); + return builder.WithStorage(system => new SqlServerReminderStorage(settings, system)); } - /// - /// Configures PostgreSQL storage for reminders. - /// - /// The reminder configuration builder. - /// The PostgreSQL connection string. - /// Optional schema name (default: "reminders"). - /// Optional table name (default: "scheduled_reminders"). - /// Whether to auto-create schema/table (default: true). - /// The builder for method chaining. public static ReminderConfigurationBuilder WithPostgreSqlStorage( this ReminderConfigurationBuilder builder, string connectionString, @@ -49,22 +37,29 @@ public static ReminderConfigurationBuilder WithPostgreSqlStorage( string? tableName = null, bool? autoInitialize = null) { - var settings = SqlReminderStorageSettings.CreatePostgreSql( + var settings = PostgreSqlReminderStorageSettings.Create( connectionString, schemaName, tableName, autoInitialize); - return builder.WithStorage(system => new SqlReminderStorage(settings, system)); + return builder.WithStorage(system => new PostgreSqlReminderStorage(settings, system)); + } + + public static ReminderConfigurationBuilder WithSqliteStorage( + this ReminderConfigurationBuilder builder, + string connectionString, + string? tableName = null, + bool? autoInitialize = null) + { + var settings = SqliteReminderStorageSettings.Create( + connectionString, + tableName, + autoInitialize); + + return builder.WithStorage(system => new SqliteReminderStorage(settings, system)); } - /// - /// Configures SQL storage for reminders with custom settings. - /// Allows full control over all storage settings. - /// - /// The reminder configuration builder. - /// The SQL storage settings. - /// The builder for method chaining. public static ReminderConfigurationBuilder WithSqlStorage( this ReminderConfigurationBuilder builder, SqlReminderStorageSettings settings) @@ -72,13 +67,6 @@ public static ReminderConfigurationBuilder WithSqlStorage( return builder.WithStorage(system => new SqlReminderStorage(settings, system)); } - /// - /// Configures SQL storage for reminders with a settings factory. - /// Allows settings to be created based on the actor system configuration. - /// - /// The reminder configuration builder. - /// Factory function to create settings from the actor system. - /// The builder for method chaining. public static ReminderConfigurationBuilder WithSqlStorage( this ReminderConfigurationBuilder builder, Func settingsFactory) diff --git a/src/Akka.Reminders.Sql/Internal/ISqlDialect.cs b/src/Akka.Reminders.Sql/Internal/ISqlDialect.cs deleted file mode 100644 index 0e0341c..0000000 --- a/src/Akka.Reminders.Sql/Internal/ISqlDialect.cs +++ /dev/null @@ -1,81 +0,0 @@ -using System.Data.Common; - -namespace Akka.Reminders.Sql.Internal; - -/// -/// Defines database-specific SQL operations for reminder storage. -/// Abstracts differences between SQL Server, PostgreSQL, etc. -/// -internal interface ISqlDialect -{ - /// - /// Gets the SQL statement to create the reminders table with appropriate indexes. - /// - string GetCreateTableSql(string schemaName, string tableName); - - /// - /// Gets the SQL statement to upsert (insert or update) a reminder. - /// Uses MERGE for SQL Server, INSERT ON CONFLICT for PostgreSQL. - /// - string GetUpsertReminderSql(string schemaName, string tableName); - - /// - /// Gets the SQL statement to select reminders that are due by the specified deadline. - /// Only returns reminders that are not completed. - /// - /// Database schema name - /// Table name - /// Maximum number of rows returned - string GetSelectDueRemindersSql(string schemaName, string tableName, int maxCount); - - /// - /// Gets the SQL statement to mark a single reminder as completed. - /// - string GetMarkCompletedSql(string schemaName, string tableName); - - /// - /// Gets the SQL statement to mark multiple reminders as completed in a single round-trip. - /// Uses a VALUES join with numbered parameters (@sr0, @eid0, @rk0, @sr1, ...). - /// - /// Database schema name - /// Table name - /// Number of reminders to mark completed - string GetBatchMarkCompletedSql(string schemaName, string tableName, int count); - - /// - /// Gets the SQL statement to clean up old completed reminders. - /// Physically deletes completed reminders older than the specified threshold. - /// - string GetCleanupSql(string schemaName, string tableName); - - /// - /// Gets the SQL statement to get an overview of all reminders. - /// Returns summary information about pending and completed reminders. - /// - string GetOverviewSql(string schemaName, string tableName); - - /// - /// Gets the SQL statement to cancel a specific reminder by marking it as completed. - /// - string GetCancelReminderSql(string schemaName, string tableName); - - /// - /// Gets the SQL statement to cancel all reminders for a specific entity. - /// - string GetCancelAllRemindersSql(string schemaName, string tableName); - - /// - /// Gets the SQL statement to fetch all reminders for a specific entity. - /// - string GetFetchRemindersSql(string schemaName, string tableName); - - /// - /// Creates a database connection for this dialect. - /// - DbConnection CreateConnection(string connectionString); - - /// - /// Adds a parameter to the command with the appropriate database-specific type. - /// - void AddParameter(DbCommand command, string name, object value); -} diff --git a/src/Akka.Reminders.Sql/SqlReminderStorage.cs b/src/Akka.Reminders.Sql/SqlReminderStorage.cs index ed6c389..0b12ce4 100644 --- a/src/Akka.Reminders.Sql/SqlReminderStorage.cs +++ b/src/Akka.Reminders.Sql/SqlReminderStorage.cs @@ -1,547 +1,74 @@ -using System.Data; using Akka.Actor; +using Akka.Reminders.PostgreSql; using Akka.Reminders.Sql.Configuration; -using Akka.Reminders.Sql.Internal; +using Akka.Reminders.Sqlite; +using Akka.Reminders.SqlServer; using Akka.Reminders.Storage; namespace Akka.Reminders.Sql; /// -/// SQL-based implementation of . -/// Stores reminders in a SQL database with support for SQL Server and PostgreSQL. -/// Uses Akka.NET serialization system for message persistence. +/// Compatibility wrapper for SQL storage providers. /// public sealed class SqlReminderStorage : IReminderStorage { - private readonly SqlReminderStorageSettings _settings; - private readonly ISqlDialect _dialect; - private readonly Akka.Serialization.Serialization _serialization; - private readonly object _initLock = new(); - private volatile bool _initialized; + private readonly IReminderStorage _storage; public SqlReminderStorage(SqlReminderStorageSettings settings, ActorSystem system) { - _settings = settings ?? throw new ArgumentNullException(nameof(settings)); - _settings.Validate(); + settings.Validate(); - _serialization = system.Serialization; - - // Select the appropriate SQL dialect - _dialect = settings.ProviderName switch + _storage = settings.ProviderName switch { - "SqlServer" => SqlServerDialect.Instance, - "PostgreSql" => PostgreSqlDialect.Instance, - _ => throw new ArgumentException($"Unsupported provider: {settings.ProviderName}") + "SqlServer" => new SqlServerReminderStorage(settings.ToSqlServerSettings(), system), + "PostgreSql" => new PostgreSqlReminderStorage(settings.ToPostgreSqlSettings(), system), + "Sqlite" => new SqliteReminderStorage(settings.ToSqliteSettings(), system), + _ => throw new ArgumentException($"Unsupported provider: {settings.ProviderName}", nameof(settings)) }; } - /// - /// Truncates a DateTimeOffset to microsecond precision (6 decimal places) for PostgreSQL compatibility. - /// PostgreSQL TIMESTAMPTZ has microsecond precision while .NET DateTimeOffset has 100-nanosecond precision. - /// - private static DateTimeOffset TruncateToMicroseconds(DateTimeOffset dto) - { - var ticksToRemove = dto.Ticks % 10; // 1 microsecond = 10 ticks - return ticksToRemove == 0 ? dto : new DateTimeOffset(dto.Ticks - ticksToRemove, dto.Offset); - } - - public async Task ScheduleReminderAsync( + public Task ScheduleReminderAsync( ScheduledReminder reminder, CancellationToken cancellationToken = default) - { - await EnsureInitializedAsync(cancellationToken); + => _storage.ScheduleReminderAsync(reminder, cancellationToken); - // For PostgreSQL compatibility, ensure timestamp precision is truncated to microseconds - // before storage to avoid precision loss on round-trip (PostgreSQL has 6 decimal places, .NET has 7) - var truncatedWhen = TruncateToMicroseconds(reminder.When); - var adjustedReminder = reminder with { When = truncatedWhen }; - - try - { - // Serialize the message - var (serializerId, manifest, payload) = SerializeMessage(adjustedReminder.Message); - - await using var connection = _dialect.CreateConnection(_settings.ConnectionString); - await connection.OpenAsync(cancellationToken); - - await using var command = connection.CreateCommand(); - command.CommandText = _dialect.GetUpsertReminderSql(_settings.SchemaName, _settings.TableName); - command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + public Task CancelReminderAsync( + ReminderEntity entity, + ReminderKey key, + CancellationToken cancellationToken = default) + => _storage.CancelReminderAsync(entity, key, cancellationToken); - // Add parameters - _dialect.AddParameter(command, "@ShardRegionName", adjustedReminder.Entity.ShardRegionName); - _dialect.AddParameter(command, "@EntityId", adjustedReminder.Entity.EntityId); - _dialect.AddParameter(command, "@ReminderKey", adjustedReminder.Key.Name); - _dialect.AddParameter(command, "@WhenUtc", adjustedReminder.When); // Pass DateTimeOffset, not DateTime - _dialect.AddParameter(command, "@RepeatIntervalTicks", adjustedReminder.RepeatInterval?.Ticks ?? (object)DBNull.Value); - _dialect.AddParameter(command, "@SerializerId", serializerId); - _dialect.AddParameter(command, "@Manifest", manifest ?? (object)DBNull.Value); - _dialect.AddParameter(command, "@Payload", payload); - _dialect.AddParameter(command, "@AttemptCount", adjustedReminder.AttemptCount); - _dialect.AddParameter(command, "@LastFailureReason", adjustedReminder.LastFailureReason ?? (object)DBNull.Value); + public Task> GetRemindersForEntityAsync( + ReminderEntity entity, + int take = 10, + int skip = 0, + CancellationToken cancellationToken = default) + => _storage.GetRemindersForEntityAsync(entity, take, skip, cancellationToken); - await command.ExecuteNonQueryAsync(cancellationToken); + public Task CancelAllRemindersForEntityAsync( + ReminderEntity entity, + CancellationToken cancellationToken = default) + => _storage.CancelAllRemindersForEntityAsync(entity, cancellationToken); - // Return the truncated reminder so it matches what's stored in the database - return new ReminderProtocol.ReminderScheduled( - adjustedReminder.ToScheduleReminder(), - ReminderScheduleResponseCode.Success); - } - catch (Exception ex) - { - return new ReminderProtocol.ReminderScheduled( - adjustedReminder.ToScheduleReminder(), - ReminderScheduleResponseCode.Error, - ex.Message); - } - } + public Task GetRemindersOverviewAsync( + DateTimeOffset now, + CancellationToken cancellationToken = default) + => _storage.GetRemindersOverviewAsync(now, cancellationToken); - public async Task GetNextRemindersAsync( + public Task GetNextRemindersAsync( DateTimeOffset untilDeadline, DateTimeOffset now, ReminderBatchSize maxCount, CancellationToken cancellationToken = default) - { - await EnsureInitializedAsync(cancellationToken); - - var reminders = new List(); - - await using var connection = _dialect.CreateConnection(_settings.ConnectionString); - await connection.OpenAsync(cancellationToken); - - await using var command = connection.CreateCommand(); - command.CommandText = _dialect.GetSelectDueRemindersSql( - _settings.SchemaName, - _settings.TableName, - maxCount.Value); - command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; - - _dialect.AddParameter(command, "@UntilDeadline", untilDeadline.UtcDateTime); - - await using var reader = await command.ExecuteReaderAsync(cancellationToken); - - while (await reader.ReadAsync(cancellationToken)) - { - var reminder = ReadReminderFromReader(reader); - reminders.Add(reminder); - } - - // Get overview for all remaining pending reminders AFTER this fetch - // We need to exclude the reminders we just fetched since they will be processed/completed - var overview = await GetRemindersOverviewAsync(now, cancellationToken); - - // Find reminders that weren't fetched (those still pending after this fetch) - var fetchedKeys = new HashSet<(string, string, string)>( - reminders.Select(r => (r.Entity.ShardRegionName, r.Entity.EntityId, r.Key.Name))); - - // Get all pending reminders and filter out the ones we fetched - await using var conn2 = _dialect.CreateConnection(_settings.ConnectionString); - await conn2.OpenAsync(cancellationToken); - await using var cmd2 = conn2.CreateCommand(); - cmd2.CommandText = _dialect.GetOverviewSql(_settings.SchemaName, _settings.TableName); - cmd2.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; - await using var reader2 = await cmd2.ExecuteReaderAsync(cancellationToken); - - var remainingReminders = new List(); - while (await reader2.ReadAsync(cancellationToken)) - { - var isCompleted = reader2.GetBoolean(GetOrdinal(reader2, "IsCompleted", "is_completed")); - if (!isCompleted) - { - var reminder = ReadReminderFromReader(reader2); - var key = (reminder.Entity.ShardRegionName, reminder.Entity.EntityId, reminder.Key.Name); - if (!fetchedKeys.Contains(key)) - { - remainingReminders.Add(reminder); - } - } - } - - // Calculate overview from remaining reminders - var nextReminder = remainingReminders.OrderBy(r => r.When).FirstOrDefault(); - var timeUntilNext = nextReminder != null ? nextReminder.When - now : TimeSpan.MaxValue; - - var adjustedOverview = new ReminderOverview - { - TimeUntilNext = timeUntilNext, - TotalPendingReminders = remainingReminders.Count - }; - - return new PendingRemindersWithSummary(reminders, adjustedOverview); - } - - // SQL Server has a limit of 2100 parameters per query. - // With 3 parameters per reminder (sr, eid, rk) + 2 shared (completedAtUtc, status), - // we can safely fit ~690 reminders per batch statement. Use 500 to leave headroom. - private const int MaxRemindersPerStatement = 500; + => _storage.GetNextRemindersAsync(untilDeadline, now, maxCount, cancellationToken); - public async Task MarkRemindersAsCompletedAsync( + public Task MarkRemindersAsCompletedAsync( IEnumerable completedReminders, CancellationToken cancellationToken = default) - { - await EnsureInitializedAsync(cancellationToken); - - var remindersList = completedReminders.ToList(); - if (remindersList.Count == 0) - return true; - - try - { - await using var connection = _dialect.CreateConnection(_settings.ConnectionString); - await connection.OpenAsync(cancellationToken); - - // No transaction wrapper — each chunk auto-commits independently. - // If chunk 3 of 4 times out, chunks 1-2 are already persisted rather than - // rolling back all progress. This is safe because mark-complete is idempotent. - // Group by (Status, When) so each batch UPDATE shares the same completion metadata - var groups = remindersList.GroupBy(r => (r.Status, r.When)); - - foreach (var group in groups) - { - var items = group.ToList(); - - // Chunk to stay within SQL parameter limits - for (var offset = 0; offset < items.Count; offset += MaxRemindersPerStatement) - { - var chunk = items.Skip(offset).Take(MaxRemindersPerStatement).ToList(); - - await using var command = connection.CreateCommand(); - command.CommandText = _dialect.GetBatchMarkCompletedSql( - _settings.SchemaName, _settings.TableName, chunk.Count); - command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; - - // Shared parameters for this group - _dialect.AddParameter(command, "@CompletedAtUtc", group.Key.When.UtcDateTime); - _dialect.AddParameter(command, "@CompletionStatus", group.Key.Status.ToString()); - - // Per-reminder key parameters - for (var i = 0; i < chunk.Count; i++) - { - _dialect.AddParameter(command, $"@sr{i}", chunk[i].Entity.ShardRegionName); - _dialect.AddParameter(command, $"@eid{i}", chunk[i].Entity.EntityId); - _dialect.AddParameter(command, $"@rk{i}", chunk[i].Key.Name); - } - - await command.ExecuteNonQueryAsync(cancellationToken); - } - } - - return true; - } - catch (Exception) - { - return false; - } - } - - public async Task GetRemindersOverviewAsync( - DateTimeOffset now, - CancellationToken cancellationToken = default) - { - await EnsureInitializedAsync(cancellationToken); + => _storage.MarkRemindersAsCompletedAsync(completedReminders, cancellationToken); - var allReminders = new List(); - - await using var connection = _dialect.CreateConnection(_settings.ConnectionString); - await connection.OpenAsync(cancellationToken); - - await using var command = connection.CreateCommand(); - command.CommandText = _dialect.GetOverviewSql(_settings.SchemaName, _settings.TableName); - command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; - - await using var reader = await command.ExecuteReaderAsync(cancellationToken); - - while (await reader.ReadAsync(cancellationToken)) - { - var isCompleted = reader.GetBoolean(GetOrdinal(reader, "IsCompleted", "is_completed")); - - if (!isCompleted) - { - var reminder = ReadReminderFromReader(reader); - allReminders.Add(reminder); - } - } - - // Calculate time until next reminder - var nextReminder = allReminders.OrderBy(r => r.When).FirstOrDefault(); - var timeUntilNext = nextReminder != null ? nextReminder.When - now : TimeSpan.MaxValue; - - return new ReminderOverview - { - TimeUntilNext = timeUntilNext, - TotalPendingReminders = allReminders.Count - }; - } - - public async Task CleanUpCompletedRemindersAsync( + public Task CleanUpCompletedRemindersAsync( DateTimeOffset olderThan, CancellationToken cancellationToken = default) - { - await EnsureInitializedAsync(cancellationToken); - - try - { - await using var connection = _dialect.CreateConnection(_settings.ConnectionString); - await connection.OpenAsync(cancellationToken); - - await using var command = connection.CreateCommand(); - command.CommandText = _dialect.GetCleanupSql(_settings.SchemaName, _settings.TableName); - command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; - - _dialect.AddParameter(command, "@OlderThan", olderThan.UtcDateTime); - - await command.ExecuteNonQueryAsync(cancellationToken); - return true; - } - catch (Exception) - { - return false; - } - } - - public async Task CancelReminderAsync( - ReminderEntity entity, - ReminderKey key, - CancellationToken cancellationToken = default) - { - await EnsureInitializedAsync(cancellationToken); - - try - { - await using var connection = _dialect.CreateConnection(_settings.ConnectionString); - await connection.OpenAsync(cancellationToken); - - await using var command = connection.CreateCommand(); - command.CommandText = _dialect.GetCancelReminderSql(_settings.SchemaName, _settings.TableName); - command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; - - _dialect.AddParameter(command, "@ShardRegionName", entity.ShardRegionName); - _dialect.AddParameter(command, "@EntityId", entity.EntityId); - _dialect.AddParameter(command, "@ReminderKey", key.Name); - _dialect.AddParameter(command, "@CompletedAtUtc", DateTimeOffset.UtcNow.UtcDateTime); - - var count = await command.ExecuteNonQueryAsync(cancellationToken); - - if (count > 0) - { - return new ReminderProtocol.RemindersCancelled( - entity, - ReminderCancelResponseCode.Success, - new List { key }, - null); - } - - return new ReminderProtocol.RemindersCancelled( - entity, - ReminderCancelResponseCode.NotFound, - new List(), - null); - } - catch (Exception ex) - { - return new ReminderProtocol.RemindersCancelled( - entity, - ReminderCancelResponseCode.Error, - new List(), - ex.Message); - } - } - - public async Task CancelAllRemindersForEntityAsync( - ReminderEntity entity, - CancellationToken cancellationToken = default) - { - await EnsureInitializedAsync(cancellationToken); - - try - { - await using var connection = _dialect.CreateConnection(_settings.ConnectionString); - await connection.OpenAsync(cancellationToken); - - // First, get all reminder keys for the entity that will be cancelled - var cancelledKeys = new List(); - - await using (var selectCommand = connection.CreateCommand()) - { - selectCommand.CommandText = _dialect.GetFetchRemindersSql(_settings.SchemaName, _settings.TableName); - selectCommand.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; - - _dialect.AddParameter(selectCommand, "@ShardRegionName", entity.ShardRegionName); - _dialect.AddParameter(selectCommand, "@EntityId", entity.EntityId); - - await using var reader = await selectCommand.ExecuteReaderAsync(cancellationToken); - while (await reader.ReadAsync(cancellationToken)) - { - var reminderKey = reader.GetString(GetOrdinal(reader, "ReminderKey", "reminder_key")); - var isCompleted = reader.GetBoolean(GetOrdinal(reader, "IsCompleted", "is_completed")); - - if (!isCompleted) - { - cancelledKeys.Add(new ReminderKey(reminderKey)); - } - } - } - - // If no reminders found, return early - if (cancelledKeys.Count == 0) - { - return new ReminderProtocol.RemindersCancelled( - entity, - ReminderCancelResponseCode.NotFound, - new List(), - null); - } - - // Now cancel all reminders - await using var command = connection.CreateCommand(); - command.CommandText = _dialect.GetCancelAllRemindersSql(_settings.SchemaName, _settings.TableName); - command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; - - _dialect.AddParameter(command, "@ShardRegionName", entity.ShardRegionName); - _dialect.AddParameter(command, "@EntityId", entity.EntityId); - _dialect.AddParameter(command, "@CompletedAtUtc", DateTimeOffset.UtcNow.UtcDateTime); - - await command.ExecuteNonQueryAsync(cancellationToken); - - return new ReminderProtocol.RemindersCancelled( - entity, - ReminderCancelResponseCode.Success, - cancelledKeys, - null); - } - catch (Exception ex) - { - return new ReminderProtocol.RemindersCancelled( - entity, - ReminderCancelResponseCode.Error, - new List(), - ex.Message); - } - } - - public async Task> GetRemindersForEntityAsync( - ReminderEntity entity, - int take = 10, - int skip = 0, - CancellationToken cancellationToken = default) - { - await EnsureInitializedAsync(cancellationToken); - - var reminders = new List(); - - await using var connection = _dialect.CreateConnection(_settings.ConnectionString); - await connection.OpenAsync(cancellationToken); - - await using var command = connection.CreateCommand(); - command.CommandText = _dialect.GetFetchRemindersSql(_settings.SchemaName, _settings.TableName); - command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; - - _dialect.AddParameter(command, "@ShardRegionName", entity.ShardRegionName); - _dialect.AddParameter(command, "@EntityId", entity.EntityId); - - await using var reader = await command.ExecuteReaderAsync(cancellationToken); - - while (await reader.ReadAsync(cancellationToken)) - { - var reminder = ReadReminderFromReader(reader); - reminders.Add(reminder); - } - - return reminders.Skip(skip).Take(take).ToList(); - } - - private Task EnsureInitializedAsync(CancellationToken cancellationToken) - { - if (_initialized || !_settings.AutoInitialize) - return Task.CompletedTask; - - lock (_initLock) - { - if (_initialized) - return Task.CompletedTask; - - // Run initialization synchronously within the lock - // This is acceptable because it only happens once - Task.Run(async () => - { - await using var connection = _dialect.CreateConnection(_settings.ConnectionString); - await connection.OpenAsync(cancellationToken); - - await using var command = connection.CreateCommand(); - command.CommandText = _dialect.GetCreateTableSql(_settings.SchemaName, _settings.TableName); - command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; - - await command.ExecuteNonQueryAsync(cancellationToken); - }, cancellationToken).Wait(cancellationToken); - - _initialized = true; - } - - return Task.CompletedTask; - } - - private (int serializerId, string? manifest, byte[] payload) SerializeMessage(object message) - { - var serializer = _serialization.FindSerializerFor(message); - var manifest = Akka.Serialization.Serialization.ManifestFor(serializer, message); - var payload = serializer.ToBinary(message); - - return (serializer.Identifier, manifest, payload); - } - - private object DeserializeMessage(int serializerId, string? manifest, byte[] payload) - { - return _serialization.Deserialize(payload, serializerId, manifest ?? string.Empty); - } - - private static int GetOrdinal(IDataReader reader, string sqlServerName, string postgreSqlName) - { - try - { - return reader.GetOrdinal(sqlServerName); - } - catch - { - return reader.GetOrdinal(postgreSqlName); - } - } - - private ScheduledReminder ReadReminderFromReader(IDataReader reader) - { - // Handle both SQL Server (PascalCase) and PostgreSQL (snake_case) column names - var shardRegionName = reader.GetString(GetOrdinal(reader, "ShardRegionName", "shard_region_name")); - var entityId = reader.GetString(GetOrdinal(reader, "EntityId", "entity_id")); - var reminderKey = reader.GetString(GetOrdinal(reader, "ReminderKey", "reminder_key")); - var whenUtc = reader.GetDateTime(GetOrdinal(reader, "WhenUtc", "when_utc")); - - var repeatIntervalTicksOrdinal = GetOrdinal(reader, "RepeatIntervalTicks", "repeat_interval_ticks"); - var repeatInterval = reader.IsDBNull(repeatIntervalTicksOrdinal) - ? (TimeSpan?)null - : TimeSpan.FromTicks(reader.GetInt64(repeatIntervalTicksOrdinal)); - - var serializerId = reader.GetInt32(GetOrdinal(reader, "SerializerId", "serializer_id")); - - var manifestOrdinal = GetOrdinal(reader, "Manifest", "manifest"); - var manifest = reader.IsDBNull(manifestOrdinal) ? null : reader.GetString(manifestOrdinal); - - var payload = (byte[])reader.GetValue(GetOrdinal(reader, "Payload", "payload")); - var attemptCount = reader.GetInt32(GetOrdinal(reader, "AttemptCount", "attempt_count")); - - var lastFailureReasonOrdinal = GetOrdinal(reader, "LastFailureReason", "last_failure_reason"); - var lastFailureReason = reader.IsDBNull(lastFailureReasonOrdinal) - ? null - : reader.GetString(lastFailureReasonOrdinal); - - // Deserialize the message - var message = DeserializeMessage(serializerId, manifest, payload); - - return new ScheduledReminder( - new ReminderEntity(shardRegionName, entityId), - new ReminderKey(reminderKey), - new DateTimeOffset(whenUtc, TimeSpan.Zero), - message, - repeatInterval, - attemptCount, - lastFailureReason); - } + => _storage.CleanUpCompletedRemindersAsync(olderThan, cancellationToken); } diff --git a/src/Akka.Reminders.SqlServer/Akka.Reminders.SqlServer.csproj b/src/Akka.Reminders.SqlServer/Akka.Reminders.SqlServer.csproj new file mode 100644 index 0000000..c894a83 --- /dev/null +++ b/src/Akka.Reminders.SqlServer/Akka.Reminders.SqlServer.csproj @@ -0,0 +1,25 @@ + + + + net9.0 + enable + enable + Aaron.Akka.Reminders.SqlServer + SQL Server storage implementation for Akka.Reminders. + akka;akkadotnet;akka.reminders;sql;sqlserver; + true + + + + + + + + + + + + + + + diff --git a/src/Akka.Reminders.SqlServer/Configuration/SqlServerReminderStorageSettings.cs b/src/Akka.Reminders.SqlServer/Configuration/SqlServerReminderStorageSettings.cs new file mode 100644 index 0000000..250a700 --- /dev/null +++ b/src/Akka.Reminders.SqlServer/Configuration/SqlServerReminderStorageSettings.cs @@ -0,0 +1,72 @@ +namespace Akka.Reminders.SqlServer.Configuration; + +/// +/// Configuration settings for SQL Server reminder storage. +/// +public sealed record SqlServerReminderStorageSettings +{ + /// + /// The SQL Server connection string. + /// + public required string ConnectionString { get; init; } + + /// + /// The schema name for the reminders table. + /// Default: "reminders" + /// + public string SchemaName { get; init; } = "reminders"; + + /// + /// The table name for storing reminders. + /// Default: "scheduled_reminders" + /// + public string TableName { get; init; } = "scheduled_reminders"; + + /// + /// Whether to automatically create the schema and table if they don't exist. + /// Default: true + /// + public bool AutoInitialize { get; init; } = true; + + /// + /// The timeout for database operations. + /// Default: 30 seconds + /// + public TimeSpan CommandTimeout { get; init; } = TimeSpan.FromSeconds(30); + + /// + /// Creates settings for SQL Server. + /// + public static SqlServerReminderStorageSettings Create( + string connectionString, + string? schemaName = null, + string? tableName = null, + bool? autoInitialize = null) + { + return new SqlServerReminderStorageSettings + { + ConnectionString = connectionString, + SchemaName = schemaName ?? "reminders", + TableName = tableName ?? "scheduled_reminders", + AutoInitialize = autoInitialize ?? true + }; + } + + /// + /// Validates the settings and throws if invalid. + /// + public void Validate() + { + if (string.IsNullOrWhiteSpace(ConnectionString)) + throw new ArgumentException("ConnectionString cannot be null or empty.", nameof(ConnectionString)); + + if (string.IsNullOrWhiteSpace(SchemaName)) + throw new ArgumentException("SchemaName cannot be null or empty.", nameof(SchemaName)); + + if (string.IsNullOrWhiteSpace(TableName)) + throw new ArgumentException("TableName cannot be null or empty.", nameof(TableName)); + + if (CommandTimeout <= TimeSpan.Zero) + throw new ArgumentException("CommandTimeout must be positive.", nameof(CommandTimeout)); + } +} diff --git a/src/Akka.Reminders.SqlServer/Hosting/SqlServerStorageExtensions.cs b/src/Akka.Reminders.SqlServer/Hosting/SqlServerStorageExtensions.cs new file mode 100644 index 0000000..dd1c471 --- /dev/null +++ b/src/Akka.Reminders.SqlServer/Hosting/SqlServerStorageExtensions.cs @@ -0,0 +1,44 @@ +using Akka.Actor; +using Akka.Reminders.SqlServer.Configuration; + +namespace Akka.Reminders.SqlServer.Hosting; + +/// +/// Extension methods for configuring SQL Server reminder storage. +/// +public static class SqlServerStorageExtensions +{ + public static ReminderConfigurationBuilder WithSqlServerStorage( + this ReminderConfigurationBuilder builder, + string connectionString, + string? schemaName = null, + string? tableName = null, + bool? autoInitialize = null) + { + var settings = SqlServerReminderStorageSettings.Create( + connectionString, + schemaName, + tableName, + autoInitialize); + + return builder.WithStorage(system => new SqlServerReminderStorage(settings, system)); + } + + public static ReminderConfigurationBuilder WithSqlServerStorage( + this ReminderConfigurationBuilder builder, + SqlServerReminderStorageSettings settings) + { + return builder.WithStorage(system => new SqlServerReminderStorage(settings, system)); + } + + public static ReminderConfigurationBuilder WithSqlServerStorage( + this ReminderConfigurationBuilder builder, + Func settingsFactory) + { + return builder.WithStorage(system => + { + var settings = settingsFactory(system); + return new SqlServerReminderStorage(settings, system); + }); + } +} diff --git a/src/Akka.Reminders.SqlServer/Internal/ISqlDialect.cs b/src/Akka.Reminders.SqlServer/Internal/ISqlDialect.cs new file mode 100644 index 0000000..5c29f69 --- /dev/null +++ b/src/Akka.Reminders.SqlServer/Internal/ISqlDialect.cs @@ -0,0 +1,19 @@ +using System.Data.Common; + +namespace Akka.Reminders.SqlServer.Internal; + +internal interface ISqlDialect +{ + string GetCreateTableSql(string schemaName, string tableName); + string GetUpsertReminderSql(string schemaName, string tableName); + string GetSelectDueRemindersSql(string schemaName, string tableName, int maxCount); + string GetMarkCompletedSql(string schemaName, string tableName); + string GetBatchMarkCompletedSql(string schemaName, string tableName, int count); + string GetCleanupSql(string schemaName, string tableName); + string GetOverviewSql(string schemaName, string tableName); + string GetCancelReminderSql(string schemaName, string tableName); + string GetCancelAllRemindersSql(string schemaName, string tableName); + string GetFetchRemindersSql(string schemaName, string tableName); + DbConnection CreateConnection(string connectionString); + void AddParameter(DbCommand command, string name, object value); +} diff --git a/src/Akka.Reminders.Sql/Internal/SqlServerDialect.cs b/src/Akka.Reminders.SqlServer/Internal/SqlServerDialect.cs similarity index 94% rename from src/Akka.Reminders.Sql/Internal/SqlServerDialect.cs rename to src/Akka.Reminders.SqlServer/Internal/SqlServerDialect.cs index c65e97f..afc66e7 100644 --- a/src/Akka.Reminders.Sql/Internal/SqlServerDialect.cs +++ b/src/Akka.Reminders.SqlServer/Internal/SqlServerDialect.cs @@ -2,12 +2,8 @@ using System.Data.Common; using Microsoft.Data.SqlClient; -namespace Akka.Reminders.Sql.Internal; +namespace Akka.Reminders.SqlServer.Internal; -/// -/// SQL Server implementation of the SQL dialect for reminder storage. -/// Uses SQL Server-specific features like MERGE and DATETIME2. -/// internal sealed class SqlServerDialect : ISqlDialect { public static readonly SqlServerDialect Instance = new(); @@ -46,12 +42,10 @@ CompletionStatus VARCHAR(20) NOT NULL DEFAULT 'Pending', CONSTRAINT PK_{tableName} PRIMARY KEY (ShardRegionName, EntityId, ReminderKey) ); - -- Filtered index for efficient queries on pending reminders CREATE INDEX IX_{tableName}_DueReminders ON {fullTableName} (WhenUtc, ShardRegionName, EntityId) WHERE IsCompleted = 0; - -- Index for cleanup operations CREATE INDEX IX_{tableName}_Cleanup ON {fullTableName} (CompletedAtUtc) WHERE IsCompleted = 1; @@ -138,7 +132,6 @@ public string GetBatchMarkCompletedSql(string schemaName, string tableName, int { var fullTableName = $"[{schemaName}].[{tableName}]"; - // Build VALUES list: (@sr0, @eid0, @rk0), (@sr1, @eid1, @rk1), ... var values = string.Join(",\n ", Enumerable.Range(0, count).Select(i => $"(@sr{i}, @eid{i}, @rk{i})")); @@ -236,18 +229,15 @@ public void AddParameter(DbCommand command, string name, object value) { var sqlCommand = (SqlCommand)command; - // Handle null values if (value == null) { sqlCommand.Parameters.AddWithValue(name, DBNull.Value); return; } - // Handle specific types switch (value) { case DateTimeOffset dto: - // SQL Server DATETIME2 has 100ns precision matching .NET, but store as UTC DateTime sqlCommand.Parameters.Add(name, SqlDbType.DateTime2).Value = dto.UtcDateTime; break; case DateTime dt: diff --git a/src/Akka.Reminders.Sql/Scripts/SqlServer-Create.sql b/src/Akka.Reminders.SqlServer/Scripts/SqlServer-Create.sql similarity index 95% rename from src/Akka.Reminders.Sql/Scripts/SqlServer-Create.sql rename to src/Akka.Reminders.SqlServer/Scripts/SqlServer-Create.sql index eeab37f..be0010c 100644 --- a/src/Akka.Reminders.Sql/Scripts/SqlServer-Create.sql +++ b/src/Akka.Reminders.SqlServer/Scripts/SqlServer-Create.sql @@ -54,7 +54,6 @@ BEGIN EXEC(@CreateTableSql); PRINT 'Created table: ' + @FullTableName - -- Create filtered index for efficient queries on pending reminders DECLARE @IndexName1 NVARCHAR(500) = 'IX_' + @TableName + '_DueReminders'; DECLARE @CreateIndex1Sql NVARCHAR(MAX) = ' CREATE INDEX ' + @IndexName1 + ' @@ -64,7 +63,6 @@ BEGIN EXEC(@CreateIndex1Sql); PRINT 'Created index: ' + @IndexName1 - -- Create index for cleanup operations DECLARE @IndexName2 NVARCHAR(500) = 'IX_' + @TableName + '_Cleanup'; DECLARE @CreateIndex2Sql NVARCHAR(MAX) = ' CREATE INDEX ' + @IndexName2 + ' diff --git a/src/Akka.Reminders.SqlServer/SqlServerReminderStorage.cs b/src/Akka.Reminders.SqlServer/SqlServerReminderStorage.cs new file mode 100644 index 0000000..4f06297 --- /dev/null +++ b/src/Akka.Reminders.SqlServer/SqlServerReminderStorage.cs @@ -0,0 +1,485 @@ +using System.Data; +using Akka.Actor; +using Akka.Reminders.SqlServer.Configuration; +using Akka.Reminders.SqlServer.Internal; +using Akka.Reminders.Storage; + +namespace Akka.Reminders.SqlServer; + +/// +/// SQL Server implementation of . +/// +public sealed class SqlServerReminderStorage : IReminderStorage +{ + private readonly SqlServerReminderStorageSettings _settings; + private readonly ISqlDialect _dialect; + private readonly Akka.Serialization.Serialization _serialization; + private readonly object _initLock = new(); + private volatile bool _initialized; + + public SqlServerReminderStorage(SqlServerReminderStorageSettings settings, ActorSystem system) + { + _settings = settings ?? throw new ArgumentNullException(nameof(settings)); + _settings.Validate(); + + _serialization = system.Serialization; + _dialect = SqlServerDialect.Instance; + } + + public async Task ScheduleReminderAsync( + ScheduledReminder reminder, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + try + { + var (serializerId, manifest, payload) = SerializeMessage(reminder.Message); + + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetUpsertReminderSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@ShardRegionName", reminder.Entity.ShardRegionName); + _dialect.AddParameter(command, "@EntityId", reminder.Entity.EntityId); + _dialect.AddParameter(command, "@ReminderKey", reminder.Key.Name); + _dialect.AddParameter(command, "@WhenUtc", reminder.When); + _dialect.AddParameter(command, "@RepeatIntervalTicks", reminder.RepeatInterval?.Ticks ?? (object)DBNull.Value); + _dialect.AddParameter(command, "@SerializerId", serializerId); + _dialect.AddParameter(command, "@Manifest", manifest ?? (object)DBNull.Value); + _dialect.AddParameter(command, "@Payload", payload); + _dialect.AddParameter(command, "@AttemptCount", reminder.AttemptCount); + _dialect.AddParameter(command, "@LastFailureReason", reminder.LastFailureReason ?? (object)DBNull.Value); + + await command.ExecuteNonQueryAsync(cancellationToken); + + return new ReminderProtocol.ReminderScheduled( + reminder.ToScheduleReminder(), + ReminderScheduleResponseCode.Success); + } + catch (Exception ex) + { + return new ReminderProtocol.ReminderScheduled( + reminder.ToScheduleReminder(), + ReminderScheduleResponseCode.Error, + ex.Message); + } + } + + public async Task GetNextRemindersAsync( + DateTimeOffset untilDeadline, + DateTimeOffset now, + ReminderBatchSize maxCount, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var reminders = new List(); + + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetSelectDueRemindersSql( + _settings.SchemaName, + _settings.TableName, + maxCount.Value); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@UntilDeadline", untilDeadline.UtcDateTime); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + + while (await reader.ReadAsync(cancellationToken)) + { + var reminder = ReadReminderFromReader(reader); + reminders.Add(reminder); + } + + var overview = await GetRemindersOverviewAsync(now, cancellationToken); + + var fetchedKeys = new HashSet<(string, string, string)>( + reminders.Select(r => (r.Entity.ShardRegionName, r.Entity.EntityId, r.Key.Name))); + + await using var conn2 = _dialect.CreateConnection(_settings.ConnectionString); + await conn2.OpenAsync(cancellationToken); + await using var cmd2 = conn2.CreateCommand(); + cmd2.CommandText = _dialect.GetOverviewSql(_settings.SchemaName, _settings.TableName); + cmd2.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + await using var reader2 = await cmd2.ExecuteReaderAsync(cancellationToken); + + var remainingReminders = new List(); + while (await reader2.ReadAsync(cancellationToken)) + { + var isCompleted = reader2.GetBoolean(reader2.GetOrdinal("IsCompleted")); + if (!isCompleted) + { + var reminder = ReadReminderFromReader(reader2); + var key = (reminder.Entity.ShardRegionName, reminder.Entity.EntityId, reminder.Key.Name); + if (!fetchedKeys.Contains(key)) + { + remainingReminders.Add(reminder); + } + } + } + + var nextReminder = remainingReminders.OrderBy(r => r.When).FirstOrDefault(); + var timeUntilNext = nextReminder != null ? nextReminder.When - now : TimeSpan.MaxValue; + + var adjustedOverview = new ReminderOverview + { + TimeUntilNext = timeUntilNext, + TotalPendingReminders = remainingReminders.Count + }; + + return new PendingRemindersWithSummary(reminders, adjustedOverview); + } + + private const int MaxRemindersPerStatement = 500; + + public async Task MarkRemindersAsCompletedAsync( + IEnumerable completedReminders, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var remindersList = completedReminders.ToList(); + if (remindersList.Count == 0) + return true; + + try + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + var groups = remindersList.GroupBy(r => (r.Status, r.When)); + + foreach (var group in groups) + { + var items = group.ToList(); + + for (var offset = 0; offset < items.Count; offset += MaxRemindersPerStatement) + { + var chunk = items.Skip(offset).Take(MaxRemindersPerStatement).ToList(); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetBatchMarkCompletedSql( + _settings.SchemaName, _settings.TableName, chunk.Count); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@CompletedAtUtc", group.Key.When.UtcDateTime); + _dialect.AddParameter(command, "@CompletionStatus", group.Key.Status.ToString()); + + for (var i = 0; i < chunk.Count; i++) + { + _dialect.AddParameter(command, $"@sr{i}", chunk[i].Entity.ShardRegionName); + _dialect.AddParameter(command, $"@eid{i}", chunk[i].Entity.EntityId); + _dialect.AddParameter(command, $"@rk{i}", chunk[i].Key.Name); + } + + await command.ExecuteNonQueryAsync(cancellationToken); + } + } + + return true; + } + catch (Exception) + { + return false; + } + } + + public async Task GetRemindersOverviewAsync( + DateTimeOffset now, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var allReminders = new List(); + + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetOverviewSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + + while (await reader.ReadAsync(cancellationToken)) + { + var isCompleted = reader.GetBoolean(reader.GetOrdinal("IsCompleted")); + + if (!isCompleted) + { + var reminder = ReadReminderFromReader(reader); + allReminders.Add(reminder); + } + } + + var nextReminder = allReminders.OrderBy(r => r.When).FirstOrDefault(); + var timeUntilNext = nextReminder != null ? nextReminder.When - now : TimeSpan.MaxValue; + + return new ReminderOverview + { + TimeUntilNext = timeUntilNext, + TotalPendingReminders = allReminders.Count + }; + } + + public async Task CleanUpCompletedRemindersAsync( + DateTimeOffset olderThan, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + try + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetCleanupSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@OlderThan", olderThan.UtcDateTime); + + await command.ExecuteNonQueryAsync(cancellationToken); + return true; + } + catch (Exception) + { + return false; + } + } + + public async Task CancelReminderAsync( + ReminderEntity entity, + ReminderKey key, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + try + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetCancelReminderSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@ShardRegionName", entity.ShardRegionName); + _dialect.AddParameter(command, "@EntityId", entity.EntityId); + _dialect.AddParameter(command, "@ReminderKey", key.Name); + _dialect.AddParameter(command, "@CompletedAtUtc", DateTimeOffset.UtcNow.UtcDateTime); + + var count = await command.ExecuteNonQueryAsync(cancellationToken); + + if (count > 0) + { + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.Success, + new List { key }, + null); + } + + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.NotFound, + new List(), + null); + } + catch (Exception ex) + { + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.Error, + new List(), + ex.Message); + } + } + + public async Task CancelAllRemindersForEntityAsync( + ReminderEntity entity, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + try + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + var cancelledKeys = new List(); + + await using (var selectCommand = connection.CreateCommand()) + { + selectCommand.CommandText = _dialect.GetFetchRemindersSql(_settings.SchemaName, _settings.TableName); + selectCommand.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(selectCommand, "@ShardRegionName", entity.ShardRegionName); + _dialect.AddParameter(selectCommand, "@EntityId", entity.EntityId); + + await using var reader = await selectCommand.ExecuteReaderAsync(cancellationToken); + while (await reader.ReadAsync(cancellationToken)) + { + var reminderKey = reader.GetString(reader.GetOrdinal("ReminderKey")); + var isCompleted = reader.GetBoolean(reader.GetOrdinal("IsCompleted")); + + if (!isCompleted) + { + cancelledKeys.Add(new ReminderKey(reminderKey)); + } + } + } + + if (cancelledKeys.Count == 0) + { + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.NotFound, + new List(), + null); + } + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetCancelAllRemindersSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@ShardRegionName", entity.ShardRegionName); + _dialect.AddParameter(command, "@EntityId", entity.EntityId); + _dialect.AddParameter(command, "@CompletedAtUtc", DateTimeOffset.UtcNow.UtcDateTime); + + await command.ExecuteNonQueryAsync(cancellationToken); + + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.Success, + cancelledKeys, + null); + } + catch (Exception ex) + { + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.Error, + new List(), + ex.Message); + } + } + + public async Task> GetRemindersForEntityAsync( + ReminderEntity entity, + int take = 10, + int skip = 0, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var reminders = new List(); + + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetFetchRemindersSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@ShardRegionName", entity.ShardRegionName); + _dialect.AddParameter(command, "@EntityId", entity.EntityId); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + + while (await reader.ReadAsync(cancellationToken)) + { + var reminder = ReadReminderFromReader(reader); + reminders.Add(reminder); + } + + return reminders.Skip(skip).Take(take).ToList(); + } + + private Task EnsureInitializedAsync(CancellationToken cancellationToken) + { + if (_initialized || !_settings.AutoInitialize) + return Task.CompletedTask; + + lock (_initLock) + { + if (_initialized) + return Task.CompletedTask; + + Task.Run(async () => + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetCreateTableSql(_settings.SchemaName, _settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + await command.ExecuteNonQueryAsync(cancellationToken); + }, cancellationToken).Wait(cancellationToken); + + _initialized = true; + } + + return Task.CompletedTask; + } + + private (int serializerId, string? manifest, byte[] payload) SerializeMessage(object message) + { + var serializer = _serialization.FindSerializerFor(message); + var manifest = Akka.Serialization.Serialization.ManifestFor(serializer, message); + var payload = serializer.ToBinary(message); + + return (serializer.Identifier, manifest, payload); + } + + private object DeserializeMessage(int serializerId, string? manifest, byte[] payload) + { + return _serialization.Deserialize(payload, serializerId, manifest ?? string.Empty); + } + + private ScheduledReminder ReadReminderFromReader(IDataReader reader) + { + var shardRegionName = reader.GetString(reader.GetOrdinal("ShardRegionName")); + var entityId = reader.GetString(reader.GetOrdinal("EntityId")); + var reminderKey = reader.GetString(reader.GetOrdinal("ReminderKey")); + var whenUtc = reader.GetDateTime(reader.GetOrdinal("WhenUtc")); + + var repeatIntervalTicksOrdinal = reader.GetOrdinal("RepeatIntervalTicks"); + var repeatInterval = reader.IsDBNull(repeatIntervalTicksOrdinal) + ? (TimeSpan?)null + : TimeSpan.FromTicks(reader.GetInt64(repeatIntervalTicksOrdinal)); + + var serializerId = reader.GetInt32(reader.GetOrdinal("SerializerId")); + + var manifestOrdinal = reader.GetOrdinal("Manifest"); + var manifest = reader.IsDBNull(manifestOrdinal) ? null : reader.GetString(manifestOrdinal); + + var payload = (byte[])reader.GetValue(reader.GetOrdinal("Payload")); + var attemptCount = reader.GetInt32(reader.GetOrdinal("AttemptCount")); + + var lastFailureReasonOrdinal = reader.GetOrdinal("LastFailureReason"); + var lastFailureReason = reader.IsDBNull(lastFailureReasonOrdinal) + ? null + : reader.GetString(lastFailureReasonOrdinal); + + var message = DeserializeMessage(serializerId, manifest, payload); + + return new ScheduledReminder( + new ReminderEntity(shardRegionName, entityId), + new ReminderKey(reminderKey), + new DateTimeOffset(DateTime.SpecifyKind(whenUtc, DateTimeKind.Utc)), + message, + repeatInterval, + attemptCount, + lastFailureReason); + } +} diff --git a/src/Akka.Reminders.Sqlite/Akka.Reminders.Sqlite.csproj b/src/Akka.Reminders.Sqlite/Akka.Reminders.Sqlite.csproj new file mode 100644 index 0000000..5ad0adf --- /dev/null +++ b/src/Akka.Reminders.Sqlite/Akka.Reminders.Sqlite.csproj @@ -0,0 +1,25 @@ + + + + net9.0 + enable + enable + Aaron.Akka.Reminders.Sqlite + SQLite storage implementation for Akka.Reminders. + akka;akkadotnet;akka.reminders;sql;sqlite; + true + + + + + + + + + + + + + + + diff --git a/src/Akka.Reminders.Sqlite/Configuration/SqliteReminderStorageSettings.cs b/src/Akka.Reminders.Sqlite/Configuration/SqliteReminderStorageSettings.cs new file mode 100644 index 0000000..fc8bae0 --- /dev/null +++ b/src/Akka.Reminders.Sqlite/Configuration/SqliteReminderStorageSettings.cs @@ -0,0 +1,61 @@ +namespace Akka.Reminders.Sqlite.Configuration; + +/// +/// Configuration settings for SQLite reminder storage. +/// +public sealed record SqliteReminderStorageSettings +{ + /// + /// The SQLite connection string. + /// + public required string ConnectionString { get; init; } + + /// + /// The table name for storing reminders. + /// Default: "scheduled_reminders" + /// + public string TableName { get; init; } = "scheduled_reminders"; + + /// + /// Whether to automatically create the table if it doesn't exist. + /// Default: true + /// + public bool AutoInitialize { get; init; } = true; + + /// + /// The timeout for database operations. + /// Default: 30 seconds + /// + public TimeSpan CommandTimeout { get; init; } = TimeSpan.FromSeconds(30); + + /// + /// Creates settings for SQLite. + /// + public static SqliteReminderStorageSettings Create( + string connectionString, + string? tableName = null, + bool? autoInitialize = null) + { + return new SqliteReminderStorageSettings + { + ConnectionString = connectionString, + TableName = tableName ?? "scheduled_reminders", + AutoInitialize = autoInitialize ?? true + }; + } + + /// + /// Validates the settings and throws if invalid. + /// + public void Validate() + { + if (string.IsNullOrWhiteSpace(ConnectionString)) + throw new ArgumentException("ConnectionString cannot be null or empty.", nameof(ConnectionString)); + + if (string.IsNullOrWhiteSpace(TableName)) + throw new ArgumentException("TableName cannot be null or empty.", nameof(TableName)); + + if (CommandTimeout <= TimeSpan.Zero) + throw new ArgumentException("CommandTimeout must be positive.", nameof(CommandTimeout)); + } +} diff --git a/src/Akka.Reminders.Sqlite/Hosting/SqliteStorageExtensions.cs b/src/Akka.Reminders.Sqlite/Hosting/SqliteStorageExtensions.cs new file mode 100644 index 0000000..cb7cc3f --- /dev/null +++ b/src/Akka.Reminders.Sqlite/Hosting/SqliteStorageExtensions.cs @@ -0,0 +1,42 @@ +using Akka.Actor; +using Akka.Reminders.Sqlite.Configuration; + +namespace Akka.Reminders.Sqlite.Hosting; + +/// +/// Extension methods for configuring SQLite reminder storage. +/// +public static class SqliteStorageExtensions +{ + public static ReminderConfigurationBuilder WithSqliteStorage( + this ReminderConfigurationBuilder builder, + string connectionString, + string? tableName = null, + bool? autoInitialize = null) + { + var settings = SqliteReminderStorageSettings.Create( + connectionString, + tableName, + autoInitialize); + + return builder.WithStorage(system => new SqliteReminderStorage(settings, system)); + } + + public static ReminderConfigurationBuilder WithSqliteStorage( + this ReminderConfigurationBuilder builder, + SqliteReminderStorageSettings settings) + { + return builder.WithStorage(system => new SqliteReminderStorage(settings, system)); + } + + public static ReminderConfigurationBuilder WithSqliteStorage( + this ReminderConfigurationBuilder builder, + Func settingsFactory) + { + return builder.WithStorage(system => + { + var settings = settingsFactory(system); + return new SqliteReminderStorage(settings, system); + }); + } +} diff --git a/src/Akka.Reminders.Sqlite/Internal/ISqlDialect.cs b/src/Akka.Reminders.Sqlite/Internal/ISqlDialect.cs new file mode 100644 index 0000000..6a92f10 --- /dev/null +++ b/src/Akka.Reminders.Sqlite/Internal/ISqlDialect.cs @@ -0,0 +1,19 @@ +using System.Data.Common; + +namespace Akka.Reminders.Sqlite.Internal; + +internal interface ISqlDialect +{ + string GetCreateTableSql(string tableName); + string GetUpsertReminderSql(string tableName); + string GetSelectDueRemindersSql(string tableName, int maxCount); + string GetMarkCompletedSql(string tableName); + string GetBatchMarkCompletedSql(string tableName, int count); + string GetCleanupSql(string tableName); + string GetOverviewSql(string tableName); + string GetCancelReminderSql(string tableName); + string GetCancelAllRemindersSql(string tableName); + string GetFetchRemindersSql(string tableName); + DbConnection CreateConnection(string connectionString); + void AddParameter(DbCommand command, string name, object value); +} diff --git a/src/Akka.Reminders.Sqlite/Internal/SqliteDialect.cs b/src/Akka.Reminders.Sqlite/Internal/SqliteDialect.cs new file mode 100644 index 0000000..46dc706 --- /dev/null +++ b/src/Akka.Reminders.Sqlite/Internal/SqliteDialect.cs @@ -0,0 +1,224 @@ +using System.Data.Common; +using System.Globalization; +using Microsoft.Data.Sqlite; + +namespace Akka.Reminders.Sqlite.Internal; + +internal sealed class SqliteDialect : ISqlDialect +{ + public static readonly SqliteDialect Instance = new(); + + private SqliteDialect() { } + + public string GetCreateTableSql(string tableName) + { + var fullTableName = $"\"{tableName}\""; + + return $""" + CREATE TABLE IF NOT EXISTS {fullTableName} ( + shard_region_name TEXT NOT NULL, + entity_id TEXT NOT NULL, + reminder_key TEXT NOT NULL, + when_utc TEXT NOT NULL, + repeat_interval_ticks INTEGER NULL, + serializer_id INTEGER NOT NULL, + manifest TEXT NULL, + payload BLOB NOT NULL, + attempt_count INTEGER NOT NULL DEFAULT 0, + last_failure_reason TEXT NULL, + is_completed INTEGER NOT NULL DEFAULT 0, + completed_at_utc TEXT NULL, + completion_status TEXT NOT NULL DEFAULT 'Pending', + + PRIMARY KEY (shard_region_name, entity_id, reminder_key) + ); + + CREATE INDEX IF NOT EXISTS ix_{tableName}_due_reminders + ON {fullTableName} (when_utc, shard_region_name, entity_id) + WHERE is_completed = 0; + + CREATE INDEX IF NOT EXISTS ix_{tableName}_cleanup + ON {fullTableName} (completed_at_utc) + WHERE is_completed = 1; + """; + } + + public string GetUpsertReminderSql(string tableName) + { + var fullTableName = $"\"{tableName}\""; + + return $""" + INSERT INTO {fullTableName} + (shard_region_name, entity_id, reminder_key, when_utc, repeat_interval_ticks, + serializer_id, manifest, payload, attempt_count, last_failure_reason, + is_completed, completed_at_utc, completion_status) + VALUES + (@ShardRegionName, @EntityId, @ReminderKey, @WhenUtc, @RepeatIntervalTicks, + @SerializerId, @Manifest, @Payload, @AttemptCount, @LastFailureReason, + 0, NULL, 'Pending') + ON CONFLICT (shard_region_name, entity_id, reminder_key) + DO UPDATE SET + when_utc = excluded.when_utc, + repeat_interval_ticks = excluded.repeat_interval_ticks, + serializer_id = excluded.serializer_id, + manifest = excluded.manifest, + payload = excluded.payload, + attempt_count = excluded.attempt_count, + last_failure_reason = excluded.last_failure_reason, + is_completed = 0, + completed_at_utc = NULL, + completion_status = 'Pending'; + """; + } + + public string GetSelectDueRemindersSql(string tableName, int maxCount) + { + if (maxCount < 1) + throw new ArgumentOutOfRangeException(nameof(maxCount), "maxCount must be greater than or equal to 1."); + + var fullTableName = $"\"{tableName}\""; + + return $""" + SELECT shard_region_name, entity_id, reminder_key, when_utc, repeat_interval_ticks, + serializer_id, manifest, payload, attempt_count, last_failure_reason + FROM {fullTableName} + WHERE is_completed = 0 + AND when_utc <= @UntilDeadline + ORDER BY when_utc ASC + LIMIT {maxCount}; + """; + } + + public string GetMarkCompletedSql(string tableName) + { + var fullTableName = $"\"{tableName}\""; + + return $""" + UPDATE {fullTableName} + SET is_completed = 1, + completed_at_utc = @CompletedAtUtc, + completion_status = @CompletionStatus + WHERE shard_region_name = @ShardRegionName + AND entity_id = @EntityId + AND reminder_key = @ReminderKey; + """; + } + + public string GetBatchMarkCompletedSql(string tableName, int count) + { + var fullTableName = $"\"{tableName}\""; + + var predicates = string.Join(" OR ", + Enumerable.Range(0, count).Select(i => + $"(shard_region_name = @sr{i} AND entity_id = @eid{i} AND reminder_key = @rk{i})")); + + return $""" + UPDATE {fullTableName} + SET is_completed = 1, + completed_at_utc = @CompletedAtUtc, + completion_status = @CompletionStatus + WHERE {predicates}; + """; + } + + public string GetCleanupSql(string tableName) + { + var fullTableName = $"\"{tableName}\""; + + return $""" + DELETE FROM {fullTableName} + WHERE is_completed = 1 + AND completed_at_utc < @OlderThan; + """; + } + + public string GetOverviewSql(string tableName) + { + var fullTableName = $"\"{tableName}\""; + + return $""" + SELECT shard_region_name, entity_id, reminder_key, when_utc, repeat_interval_ticks, + serializer_id, manifest, payload, attempt_count, last_failure_reason, is_completed + FROM {fullTableName} + ORDER BY when_utc ASC; + """; + } + + public string GetCancelReminderSql(string tableName) + { + var fullTableName = $"\"{tableName}\""; + + return $""" + UPDATE {fullTableName} + SET is_completed = 1, + completed_at_utc = @CompletedAtUtc, + completion_status = 'Cancelled' + WHERE shard_region_name = @ShardRegionName + AND entity_id = @EntityId + AND reminder_key = @ReminderKey + AND is_completed = 0; + """; + } + + public string GetCancelAllRemindersSql(string tableName) + { + var fullTableName = $"\"{tableName}\""; + + return $""" + UPDATE {fullTableName} + SET is_completed = 1, + completed_at_utc = @CompletedAtUtc, + completion_status = 'Cancelled' + WHERE shard_region_name = @ShardRegionName + AND entity_id = @EntityId + AND is_completed = 0; + """; + } + + public string GetFetchRemindersSql(string tableName) + { + var fullTableName = $"\"{tableName}\""; + + return $""" + SELECT shard_region_name, entity_id, reminder_key, when_utc, repeat_interval_ticks, + serializer_id, manifest, payload, attempt_count, last_failure_reason, is_completed + FROM {fullTableName} + WHERE shard_region_name = @ShardRegionName + AND entity_id = @EntityId + AND is_completed = 0 + ORDER BY when_utc ASC; + """; + } + + public DbConnection CreateConnection(string connectionString) + { + return new SqliteConnection(connectionString); + } + + public void AddParameter(DbCommand command, string name, object value) + { + var sqliteCommand = (SqliteCommand)command; + + if (value == null) + { + sqliteCommand.Parameters.AddWithValue(name, DBNull.Value); + return; + } + + switch (value) + { + case DateTimeOffset dto: + sqliteCommand.Parameters.AddWithValue(name, dto.UtcDateTime.ToString("O", CultureInfo.InvariantCulture)); + break; + case DateTime dt: + sqliteCommand.Parameters.AddWithValue(name, dt.ToUniversalTime().ToString("O", CultureInfo.InvariantCulture)); + break; + case bool b: + sqliteCommand.Parameters.AddWithValue(name, b ? 1 : 0); + break; + default: + sqliteCommand.Parameters.AddWithValue(name, value); + break; + } + } +} diff --git a/src/Akka.Reminders.Sqlite/Scripts/Sqlite-Create.sql b/src/Akka.Reminders.Sqlite/Scripts/Sqlite-Create.sql new file mode 100644 index 0000000..c29b52a --- /dev/null +++ b/src/Akka.Reminders.Sqlite/Scripts/Sqlite-Create.sql @@ -0,0 +1,34 @@ +-- Akka.Reminders SQLite Schema Creation Script +-- This script creates the table for storing reminders in SQLite. +-- +-- Usage: +-- 1. Review and modify the table name if needed (default: 'scheduled_reminders') +-- 2. Execute this script against your SQLite database +-- +-- Note: This script is idempotent - it will only create objects if they don't already exist. + +CREATE TABLE IF NOT EXISTS scheduled_reminders ( + shard_region_name TEXT NOT NULL, + entity_id TEXT NOT NULL, + reminder_key TEXT NOT NULL, + when_utc TEXT NOT NULL, + repeat_interval_ticks INTEGER NULL, + serializer_id INTEGER NOT NULL, + manifest TEXT NULL, + payload BLOB NOT NULL, + attempt_count INTEGER NOT NULL DEFAULT 0, + last_failure_reason TEXT NULL, + is_completed INTEGER NOT NULL DEFAULT 0, + completed_at_utc TEXT NULL, + completion_status TEXT NOT NULL DEFAULT 'Pending', + + PRIMARY KEY (shard_region_name, entity_id, reminder_key) +); + +CREATE INDEX IF NOT EXISTS ix_scheduled_reminders_due_reminders +ON scheduled_reminders (when_utc, shard_region_name, entity_id) +WHERE is_completed = 0; + +CREATE INDEX IF NOT EXISTS ix_scheduled_reminders_cleanup +ON scheduled_reminders (completed_at_utc) +WHERE is_completed = 1; diff --git a/src/Akka.Reminders.Sqlite/SqliteReminderStorage.cs b/src/Akka.Reminders.Sqlite/SqliteReminderStorage.cs new file mode 100644 index 0000000..c4d13ef --- /dev/null +++ b/src/Akka.Reminders.Sqlite/SqliteReminderStorage.cs @@ -0,0 +1,504 @@ +using System.Data; +using System.Globalization; +using Akka.Actor; +using Akka.Reminders.Sqlite.Configuration; +using Akka.Reminders.Sqlite.Internal; +using Akka.Reminders.Storage; + +namespace Akka.Reminders.Sqlite; + +/// +/// SQLite implementation of . +/// +public sealed class SqliteReminderStorage : IReminderStorage +{ + private readonly SqliteReminderStorageSettings _settings; + private readonly ISqlDialect _dialect; + private readonly Akka.Serialization.Serialization _serialization; + private readonly object _initLock = new(); + private volatile bool _initialized; + + public SqliteReminderStorage(SqliteReminderStorageSettings settings, ActorSystem system) + { + _settings = settings ?? throw new ArgumentNullException(nameof(settings)); + _settings.Validate(); + + _serialization = system.Serialization; + _dialect = SqliteDialect.Instance; + } + + public async Task ScheduleReminderAsync( + ScheduledReminder reminder, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + try + { + var (serializerId, manifest, payload) = SerializeMessage(reminder.Message); + + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetUpsertReminderSql(_settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@ShardRegionName", reminder.Entity.ShardRegionName); + _dialect.AddParameter(command, "@EntityId", reminder.Entity.EntityId); + _dialect.AddParameter(command, "@ReminderKey", reminder.Key.Name); + _dialect.AddParameter(command, "@WhenUtc", reminder.When); + _dialect.AddParameter(command, "@RepeatIntervalTicks", reminder.RepeatInterval?.Ticks ?? (object)DBNull.Value); + _dialect.AddParameter(command, "@SerializerId", serializerId); + _dialect.AddParameter(command, "@Manifest", manifest ?? (object)DBNull.Value); + _dialect.AddParameter(command, "@Payload", payload); + _dialect.AddParameter(command, "@AttemptCount", reminder.AttemptCount); + _dialect.AddParameter(command, "@LastFailureReason", reminder.LastFailureReason ?? (object)DBNull.Value); + + await command.ExecuteNonQueryAsync(cancellationToken); + + return new ReminderProtocol.ReminderScheduled( + reminder.ToScheduleReminder(), + ReminderScheduleResponseCode.Success); + } + catch (Exception ex) + { + return new ReminderProtocol.ReminderScheduled( + reminder.ToScheduleReminder(), + ReminderScheduleResponseCode.Error, + ex.Message); + } + } + + public async Task GetNextRemindersAsync( + DateTimeOffset untilDeadline, + DateTimeOffset now, + ReminderBatchSize maxCount, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var reminders = new List(); + + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetSelectDueRemindersSql(_settings.TableName, maxCount.Value); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@UntilDeadline", untilDeadline.UtcDateTime); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + + while (await reader.ReadAsync(cancellationToken)) + { + var reminder = ReadReminderFromReader(reader); + reminders.Add(reminder); + } + + var overview = await GetRemindersOverviewAsync(now, cancellationToken); + + var fetchedKeys = new HashSet<(string, string, string)>( + reminders.Select(r => (r.Entity.ShardRegionName, r.Entity.EntityId, r.Key.Name))); + + await using var conn2 = _dialect.CreateConnection(_settings.ConnectionString); + await conn2.OpenAsync(cancellationToken); + await using var cmd2 = conn2.CreateCommand(); + cmd2.CommandText = _dialect.GetOverviewSql(_settings.TableName); + cmd2.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + await using var reader2 = await cmd2.ExecuteReaderAsync(cancellationToken); + + var remainingReminders = new List(); + while (await reader2.ReadAsync(cancellationToken)) + { + var isCompleted = ReadBoolean(reader2, "is_completed"); + if (!isCompleted) + { + var reminder = ReadReminderFromReader(reader2); + var key = (reminder.Entity.ShardRegionName, reminder.Entity.EntityId, reminder.Key.Name); + if (!fetchedKeys.Contains(key)) + { + remainingReminders.Add(reminder); + } + } + } + + var nextReminder = remainingReminders.OrderBy(r => r.When).FirstOrDefault(); + var timeUntilNext = nextReminder != null ? nextReminder.When - now : TimeSpan.MaxValue; + + var adjustedOverview = new ReminderOverview + { + TimeUntilNext = timeUntilNext, + TotalPendingReminders = remainingReminders.Count + }; + + return new PendingRemindersWithSummary(reminders, adjustedOverview); + } + + // SQLite default parameter limit is lower than SQL Server/PostgreSQL. + // 250 reminders => 752 parameters (250 * 3 + 2 shared), leaving room under 999. + private const int MaxRemindersPerStatement = 250; + + public async Task MarkRemindersAsCompletedAsync( + IEnumerable completedReminders, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var remindersList = completedReminders.ToList(); + if (remindersList.Count == 0) + return true; + + try + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + var groups = remindersList.GroupBy(r => (r.Status, r.When)); + + foreach (var group in groups) + { + var items = group.ToList(); + + for (var offset = 0; offset < items.Count; offset += MaxRemindersPerStatement) + { + var chunk = items.Skip(offset).Take(MaxRemindersPerStatement).ToList(); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetBatchMarkCompletedSql(_settings.TableName, chunk.Count); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@CompletedAtUtc", group.Key.When.UtcDateTime); + _dialect.AddParameter(command, "@CompletionStatus", group.Key.Status.ToString()); + + for (var i = 0; i < chunk.Count; i++) + { + _dialect.AddParameter(command, $"@sr{i}", chunk[i].Entity.ShardRegionName); + _dialect.AddParameter(command, $"@eid{i}", chunk[i].Entity.EntityId); + _dialect.AddParameter(command, $"@rk{i}", chunk[i].Key.Name); + } + + await command.ExecuteNonQueryAsync(cancellationToken); + } + } + + return true; + } + catch (Exception) + { + return false; + } + } + + public async Task GetRemindersOverviewAsync( + DateTimeOffset now, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var allReminders = new List(); + + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetOverviewSql(_settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + + while (await reader.ReadAsync(cancellationToken)) + { + var isCompleted = ReadBoolean(reader, "is_completed"); + + if (!isCompleted) + { + var reminder = ReadReminderFromReader(reader); + allReminders.Add(reminder); + } + } + + var nextReminder = allReminders.OrderBy(r => r.When).FirstOrDefault(); + var timeUntilNext = nextReminder != null ? nextReminder.When - now : TimeSpan.MaxValue; + + return new ReminderOverview + { + TimeUntilNext = timeUntilNext, + TotalPendingReminders = allReminders.Count + }; + } + + public async Task CleanUpCompletedRemindersAsync( + DateTimeOffset olderThan, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + try + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetCleanupSql(_settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@OlderThan", olderThan.UtcDateTime); + + await command.ExecuteNonQueryAsync(cancellationToken); + return true; + } + catch (Exception) + { + return false; + } + } + + public async Task CancelReminderAsync( + ReminderEntity entity, + ReminderKey key, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + try + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetCancelReminderSql(_settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@ShardRegionName", entity.ShardRegionName); + _dialect.AddParameter(command, "@EntityId", entity.EntityId); + _dialect.AddParameter(command, "@ReminderKey", key.Name); + _dialect.AddParameter(command, "@CompletedAtUtc", DateTimeOffset.UtcNow.UtcDateTime); + + var count = await command.ExecuteNonQueryAsync(cancellationToken); + + if (count > 0) + { + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.Success, + new List { key }, + null); + } + + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.NotFound, + new List(), + null); + } + catch (Exception ex) + { + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.Error, + new List(), + ex.Message); + } + } + + public async Task CancelAllRemindersForEntityAsync( + ReminderEntity entity, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + try + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + var cancelledKeys = new List(); + + await using (var selectCommand = connection.CreateCommand()) + { + selectCommand.CommandText = _dialect.GetFetchRemindersSql(_settings.TableName); + selectCommand.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(selectCommand, "@ShardRegionName", entity.ShardRegionName); + _dialect.AddParameter(selectCommand, "@EntityId", entity.EntityId); + + await using var reader = await selectCommand.ExecuteReaderAsync(cancellationToken); + while (await reader.ReadAsync(cancellationToken)) + { + var reminderKey = reader.GetString(reader.GetOrdinal("reminder_key")); + var isCompleted = ReadBoolean(reader, "is_completed"); + + if (!isCompleted) + { + cancelledKeys.Add(new ReminderKey(reminderKey)); + } + } + } + + if (cancelledKeys.Count == 0) + { + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.NotFound, + new List(), + null); + } + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetCancelAllRemindersSql(_settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@ShardRegionName", entity.ShardRegionName); + _dialect.AddParameter(command, "@EntityId", entity.EntityId); + _dialect.AddParameter(command, "@CompletedAtUtc", DateTimeOffset.UtcNow.UtcDateTime); + + await command.ExecuteNonQueryAsync(cancellationToken); + + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.Success, + cancelledKeys, + null); + } + catch (Exception ex) + { + return new ReminderProtocol.RemindersCancelled( + entity, + ReminderCancelResponseCode.Error, + new List(), + ex.Message); + } + } + + public async Task> GetRemindersForEntityAsync( + ReminderEntity entity, + int take = 10, + int skip = 0, + CancellationToken cancellationToken = default) + { + await EnsureInitializedAsync(cancellationToken); + + var reminders = new List(); + + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetFetchRemindersSql(_settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + _dialect.AddParameter(command, "@ShardRegionName", entity.ShardRegionName); + _dialect.AddParameter(command, "@EntityId", entity.EntityId); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + + while (await reader.ReadAsync(cancellationToken)) + { + var reminder = ReadReminderFromReader(reader); + reminders.Add(reminder); + } + + return reminders.Skip(skip).Take(take).ToList(); + } + + private Task EnsureInitializedAsync(CancellationToken cancellationToken) + { + if (_initialized || !_settings.AutoInitialize) + return Task.CompletedTask; + + lock (_initLock) + { + if (_initialized) + return Task.CompletedTask; + + Task.Run(async () => + { + await using var connection = _dialect.CreateConnection(_settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = _dialect.GetCreateTableSql(_settings.TableName); + command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds; + + await command.ExecuteNonQueryAsync(cancellationToken); + }, cancellationToken).Wait(cancellationToken); + + _initialized = true; + } + + return Task.CompletedTask; + } + + private (int serializerId, string? manifest, byte[] payload) SerializeMessage(object message) + { + var serializer = _serialization.FindSerializerFor(message); + var manifest = Akka.Serialization.Serialization.ManifestFor(serializer, message); + var payload = serializer.ToBinary(message); + + return (serializer.Identifier, manifest, payload); + } + + private object DeserializeMessage(int serializerId, string? manifest, byte[] payload) + { + return _serialization.Deserialize(payload, serializerId, manifest ?? string.Empty); + } + + private static bool ReadBoolean(IDataReader reader, string columnName) + { + var ordinal = reader.GetOrdinal(columnName); + return Convert.ToInt32(reader.GetValue(ordinal), CultureInfo.InvariantCulture) != 0; + } + + private static DateTimeOffset ReadDateTimeOffset(IDataReader reader, string columnName) + { + var ordinal = reader.GetOrdinal(columnName); + var value = reader.GetValue(ordinal); + + return value switch + { + DateTimeOffset dto => dto, + DateTime dt => new DateTimeOffset(DateTime.SpecifyKind(dt, DateTimeKind.Utc)), + string s => DateTimeOffset.Parse(s, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind), + _ => throw new InvalidOperationException($"Unexpected datetime value type '{value.GetType()}' for column '{columnName}'.") + }; + } + + private ScheduledReminder ReadReminderFromReader(IDataReader reader) + { + var shardRegionName = reader.GetString(reader.GetOrdinal("shard_region_name")); + var entityId = reader.GetString(reader.GetOrdinal("entity_id")); + var reminderKey = reader.GetString(reader.GetOrdinal("reminder_key")); + var whenUtc = ReadDateTimeOffset(reader, "when_utc"); + + var repeatIntervalTicksOrdinal = reader.GetOrdinal("repeat_interval_ticks"); + var repeatInterval = reader.IsDBNull(repeatIntervalTicksOrdinal) + ? (TimeSpan?)null + : TimeSpan.FromTicks(Convert.ToInt64(reader.GetValue(repeatIntervalTicksOrdinal), CultureInfo.InvariantCulture)); + + var serializerId = Convert.ToInt32(reader.GetValue(reader.GetOrdinal("serializer_id")), CultureInfo.InvariantCulture); + + var manifestOrdinal = reader.GetOrdinal("manifest"); + var manifest = reader.IsDBNull(manifestOrdinal) ? null : reader.GetString(manifestOrdinal); + + var payload = (byte[])reader.GetValue(reader.GetOrdinal("payload")); + var attemptCount = Convert.ToInt32(reader.GetValue(reader.GetOrdinal("attempt_count")), CultureInfo.InvariantCulture); + + var lastFailureReasonOrdinal = reader.GetOrdinal("last_failure_reason"); + var lastFailureReason = reader.IsDBNull(lastFailureReasonOrdinal) + ? null + : reader.GetString(lastFailureReasonOrdinal); + + var message = DeserializeMessage(serializerId, manifest, payload); + + return new ScheduledReminder( + new ReminderEntity(shardRegionName, entityId), + new ReminderKey(reminderKey), + whenUtc, + message, + repeatInterval, + attemptCount, + lastFailureReason); + } +} diff --git a/src/Akka.Reminders.Tests/Akka.Reminders.Tests.csproj b/src/Akka.Reminders.Tests/Akka.Reminders.Tests.csproj index 7ffdcbb..1353fad 100644 --- a/src/Akka.Reminders.Tests/Akka.Reminders.Tests.csproj +++ b/src/Akka.Reminders.Tests/Akka.Reminders.Tests.csproj @@ -21,7 +21,10 @@ + + + diff --git a/src/Akka.Reminders.Tests/Storage/SqlCompatibilitySpecs.cs b/src/Akka.Reminders.Tests/Storage/SqlCompatibilitySpecs.cs new file mode 100644 index 0000000..5c3f1c9 --- /dev/null +++ b/src/Akka.Reminders.Tests/Storage/SqlCompatibilitySpecs.cs @@ -0,0 +1,53 @@ +using Akka.Actor; +using Akka.Reminders; +using Akka.Reminders.Sql; +using Akka.Reminders.Sql.Configuration; + +namespace Akka.Reminders.Tests.Storage; + +public sealed class SqlCompatibilitySpecs : IAsyncLifetime +{ + private ActorSystem? _system; + private SqlReminderStorage? _storage; + private string? _databasePath; + + public Task InitializeAsync() + { + _system = ActorSystem.Create("compatibility-system"); + _databasePath = Path.Combine(Path.GetTempPath(), $"akka-reminders-compat-{Guid.NewGuid():N}.db"); + + var connectionString = $"Data Source={_databasePath};Mode=ReadWriteCreate;Cache=Shared"; + var settings = SqlReminderStorageSettings.CreateSqlite(connectionString); + + _storage = new SqlReminderStorage(settings, _system); + return Task.CompletedTask; + } + + public async Task DisposeAsync() + { + if (_system != null) + { + await _system.Terminate(); + } + + if (!string.IsNullOrWhiteSpace(_databasePath) && File.Exists(_databasePath)) + { + File.Delete(_databasePath); + } + } + + [Fact] + public async Task LegacySqlReminderStorage_ShouldScheduleAndFetch_WithSqliteProvider() + { + var entity = new ReminderEntity("compat", "entity-1"); + var key = new ReminderKey("compat-key"); + var reminder = new ScheduledReminder(entity, key, DateTimeOffset.UtcNow.AddMinutes(5), "hello"); + + var scheduled = await _storage!.ScheduleReminderAsync(reminder); + var reminders = await _storage.GetRemindersForEntityAsync(entity); + + Assert.Equal(ReminderScheduleResponseCode.Success, scheduled.ResponseCode); + Assert.Single(reminders); + Assert.Equal(key, reminders[0].Key); + } +} diff --git a/src/Akka.Reminders.Tests/Storage/SqlReminderStorageSpecs.cs b/src/Akka.Reminders.Tests/Storage/SqlReminderStorageSpecs.cs index 76cd5c4..166e707 100644 --- a/src/Akka.Reminders.Tests/Storage/SqlReminderStorageSpecs.cs +++ b/src/Akka.Reminders.Tests/Storage/SqlReminderStorageSpecs.cs @@ -1,15 +1,16 @@ using Akka.Actor; -using Akka.Reminders.Sql; -using Akka.Reminders.Sql.Configuration; +using Akka.Reminders.PostgreSql; +using Akka.Reminders.PostgreSql.Configuration; +using Akka.Reminders.SqlServer; +using Akka.Reminders.SqlServer.Configuration; +using Akka.Reminders.Sqlite; +using Akka.Reminders.Sqlite.Configuration; using Akka.Reminders.Storage; using Testcontainers.MsSql; using Testcontainers.PostgreSql; namespace Akka.Reminders.Tests.Storage; -/// -/// Tests for with SQL Server using Testcontainers. -/// [Collection("SqlServer")] public class SqlServerReminderStorageSpecs : ReminderStorageSpecBase { @@ -27,9 +28,9 @@ protected override async Task CreateStorage() await _container.StartAsync(); var connectionString = _container.GetConnectionString(); - var settings = SqlReminderStorageSettings.CreateSqlServer(connectionString); + var settings = SqlServerReminderStorageSettings.Create(connectionString); - return new SqlReminderStorage(settings, _system); + return new SqlServerReminderStorage(settings, _system); } protected override async Task CleanupStorage(IReminderStorage storage) @@ -38,6 +39,7 @@ protected override async Task CleanupStorage(IReminderStorage storage) { await _container.DisposeAsync(); } + if (_system != null) { await _system.Terminate(); @@ -45,9 +47,6 @@ protected override async Task CleanupStorage(IReminderStorage storage) } } -/// -/// Tests for with PostgreSQL using Testcontainers. -/// [Collection("PostgreSQL")] public class PostgreSqlReminderStorageSpecs : ReminderStorageSpecBase { @@ -64,9 +63,9 @@ protected override async Task CreateStorage() await _container.StartAsync(); var connectionString = _container.GetConnectionString(); - var settings = SqlReminderStorageSettings.CreatePostgreSql(connectionString); + var settings = PostgreSqlReminderStorageSettings.Create(connectionString); - return new SqlReminderStorage(settings, _system); + return new PostgreSqlReminderStorage(settings, _system); } protected override async Task CleanupStorage(IReminderStorage storage) @@ -75,9 +74,42 @@ protected override async Task CleanupStorage(IReminderStorage storage) { await _container.DisposeAsync(); } + if (_system != null) { await _system.Terminate(); } } } + +[Collection("Sqlite")] +public class SqliteReminderStorageSpecs : ReminderStorageSpecBase +{ + private ActorSystem? _system; + private string? _databasePath; + + protected override Task CreateStorage() + { + _system = ActorSystem.Create("test-system"); + + _databasePath = Path.Combine(Path.GetTempPath(), $"akka-reminders-{Guid.NewGuid():N}.db"); + var connectionString = $"Data Source={_databasePath};Mode=ReadWriteCreate;Cache=Shared"; + var settings = SqliteReminderStorageSettings.Create(connectionString); + + IReminderStorage storage = new SqliteReminderStorage(settings, _system); + return Task.FromResult(storage); + } + + protected override async Task CleanupStorage(IReminderStorage storage) + { + if (_system != null) + { + await _system.Terminate(); + } + + if (!string.IsNullOrWhiteSpace(_databasePath) && File.Exists(_databasePath)) + { + File.Delete(_databasePath); + } + } +}