Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Akka.Reminders.PostgreSql/Internal/ISqlDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ internal interface ISqlDialect
string GetMarkCompletedSql(string schemaName, string tableName);
string GetBatchMarkCompletedSql(string schemaName, string tableName, int count);
string GetCleanupSql(string schemaName, string tableName);
string GetOverviewSql(string schemaName, string tableName);
string GetOverviewAggregateSql(string schemaName, string tableName);
string GetNextReminderTimeSql(string schemaName, string tableName);
string GetCancelReminderSql(string schemaName, string tableName);
string GetCancelAllRemindersSql(string schemaName, string tableName);
string GetFetchRemindersSql(string schemaName, string tableName);
Expand Down
20 changes: 16 additions & 4 deletions src/Akka.Reminders.PostgreSql/Internal/PostgreSqlDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,27 @@ DELETE FROM {fullTableName}
""";
}

public string GetOverviewSql(string schemaName, string tableName)
public string GetOverviewAggregateSql(string schemaName, string tableName)
{
var fullTableName = $"\"{schemaName}\".\"{tableName}\"";

return $"""
SELECT shard_region_name, entity_id, reminder_key, when_utc, repeat_interval_ticks,
serializer_id, manifest, payload, attempt_count, last_failure_reason, is_completed
SELECT COUNT(*) AS total_count, MIN(when_utc) AS next_when_utc
FROM {fullTableName}
ORDER BY when_utc ASC;
WHERE is_completed = FALSE;
""";
}

public string GetNextReminderTimeSql(string schemaName, string tableName)
{
var fullTableName = $"\"{schemaName}\".\"{tableName}\"";

return $"""
SELECT when_utc
FROM {fullTableName}
WHERE is_completed = FALSE
ORDER BY when_utc ASC
LIMIT 1 OFFSET @Skip;
""";
}

Expand Down
80 changes: 41 additions & 39 deletions src/Akka.Reminders.PostgreSql/PostgreSqlReminderStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,40 +108,43 @@ public async Task<PendingRemindersWithSummary> GetNextRemindersAsync(
reminders.Add(reminder);
}

var overview = await GetRemindersOverviewAsync(now, cancellationToken);

var fetchedKeys = new HashSet<(string, string, string)>(
reminders.Select(r => (r.Entity.ShardRegionName, r.Entity.EntityId, r.Key.Name)));

// Get overview of remaining reminders using aggregate queries
await using var conn2 = _dialect.CreateConnection(_settings.ConnectionString);
await conn2.OpenAsync(cancellationToken);
await using var cmd2 = conn2.CreateCommand();
cmd2.CommandText = _dialect.GetOverviewSql(_settings.SchemaName, _settings.TableName);
cmd2.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds;
await using var reader2 = await cmd2.ExecuteReaderAsync(cancellationToken);

var remainingReminders = new List<ScheduledReminder>();
while (await reader2.ReadAsync(cancellationToken))
long totalPending = 0;
await using (var cmd2 = conn2.CreateCommand())
{
var isCompleted = reader2.GetBoolean(reader2.GetOrdinal("is_completed"));
if (!isCompleted)
cmd2.CommandText = _dialect.GetOverviewAggregateSql(_settings.SchemaName, _settings.TableName);
cmd2.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds;
await using var reader2 = await cmd2.ExecuteReaderAsync(cancellationToken);
if (await reader2.ReadAsync(cancellationToken))
{
var reminder = ReadReminderFromReader(reader2);
var key = (reminder.Entity.ShardRegionName, reminder.Entity.EntityId, reminder.Key.Name);
if (!fetchedKeys.Contains(key))
{
remainingReminders.Add(reminder);
}
totalPending = reader2.GetInt64(reader2.GetOrdinal("total_count"));
}
}

var nextReminder = remainingReminders.OrderBy(r => r.When).FirstOrDefault();
var timeUntilNext = nextReminder != null ? nextReminder.When - now : TimeSpan.MaxValue;
var remainingCount = totalPending - reminders.Count;
var timeUntilNext = TimeSpan.MaxValue;

if (remainingCount > 0)
{
await using var cmd3 = conn2.CreateCommand();
cmd3.CommandText = _dialect.GetNextReminderTimeSql(_settings.SchemaName, _settings.TableName);
cmd3.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds;
_dialect.AddParameter(cmd3, "@Skip", reminders.Count);

var result = await cmd3.ExecuteScalarAsync(cancellationToken);
if (result is DateTime nextWhenUtc)
{
timeUntilNext = new DateTimeOffset(DateTime.SpecifyKind(nextWhenUtc, DateTimeKind.Utc)) - now;
}
}

var adjustedOverview = new ReminderOverview
{
TimeUntilNext = timeUntilNext,
TotalPendingReminders = remainingReminders.Count
TotalPendingReminders = remainingCount
};

return new PendingRemindersWithSummary(reminders, adjustedOverview);
Expand Down Expand Up @@ -201,42 +204,41 @@ public async Task<bool> MarkRemindersAsCompletedAsync(
}
}

/// <inheritdoc />
public async Task<ReminderOverview> GetRemindersOverviewAsync(
DateTimeOffset now,
CancellationToken cancellationToken = default)
{
await EnsureInitializedAsync(cancellationToken);

var allReminders = new List<ScheduledReminder>();

await using var connection = _dialect.CreateConnection(_settings.ConnectionString);
await connection.OpenAsync(cancellationToken);

await using var command = connection.CreateCommand();
command.CommandText = _dialect.GetOverviewSql(_settings.SchemaName, _settings.TableName);
command.CommandText = _dialect.GetOverviewAggregateSql(_settings.SchemaName, _settings.TableName);
command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds;

await using var reader = await command.ExecuteReaderAsync(cancellationToken);

while (await reader.ReadAsync(cancellationToken))
if (await reader.ReadAsync(cancellationToken))
{
var isCompleted = reader.GetBoolean(reader.GetOrdinal("is_completed"));
var totalCount = reader.GetInt64(reader.GetOrdinal("total_count"));
var nextWhenUtcOrdinal = reader.GetOrdinal("next_when_utc");

if (totalCount == 0 || reader.IsDBNull(nextWhenUtcOrdinal))
return ReminderOverview.Empty;

if (!isCompleted)
var nextWhenUtc = reader.GetDateTime(nextWhenUtcOrdinal);
var timeUntilNext = new DateTimeOffset(DateTime.SpecifyKind(nextWhenUtc, DateTimeKind.Utc)) - now;

return new ReminderOverview
{
var reminder = ReadReminderFromReader(reader);
allReminders.Add(reminder);
}
TotalPendingReminders = totalCount,
TimeUntilNext = timeUntilNext
};
}

var nextReminder = allReminders.OrderBy(r => r.When).FirstOrDefault();
var timeUntilNext = nextReminder != null ? nextReminder.When - now : TimeSpan.MaxValue;

return new ReminderOverview
{
TimeUntilNext = timeUntilNext,
TotalPendingReminders = allReminders.Count
};
return ReminderOverview.Empty;
}

public async Task<bool> CleanUpCompletedRemindersAsync(
Expand Down
3 changes: 2 additions & 1 deletion src/Akka.Reminders.SqlServer/Internal/ISqlDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ internal interface ISqlDialect
string GetMarkCompletedSql(string schemaName, string tableName);
string GetBatchMarkCompletedSql(string schemaName, string tableName, int count);
string GetCleanupSql(string schemaName, string tableName);
string GetOverviewSql(string schemaName, string tableName);
string GetOverviewAggregateSql(string schemaName, string tableName);
string GetNextReminderTimeSql(string schemaName, string tableName);
string GetCancelReminderSql(string schemaName, string tableName);
string GetCancelAllRemindersSql(string schemaName, string tableName);
string GetFetchRemindersSql(string schemaName, string tableName);
Expand Down
20 changes: 16 additions & 4 deletions src/Akka.Reminders.SqlServer/Internal/SqlServerDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,27 @@ DELETE FROM {fullTableName}
""";
}

public string GetOverviewSql(string schemaName, string tableName)
public string GetOverviewAggregateSql(string schemaName, string tableName)
{
var fullTableName = $"[{schemaName}].[{tableName}]";

return $"""
SELECT ShardRegionName, EntityId, ReminderKey, WhenUtc, RepeatIntervalTicks,
SerializerId, Manifest, Payload, AttemptCount, LastFailureReason, IsCompleted
SELECT COUNT(*) AS TotalCount, MIN(WhenUtc) AS NextWhenUtc
FROM {fullTableName}
ORDER BY WhenUtc ASC;
WHERE IsCompleted = 0;
""";
}

public string GetNextReminderTimeSql(string schemaName, string tableName)
{
var fullTableName = $"[{schemaName}].[{tableName}]";

return $"""
SELECT WhenUtc
FROM {fullTableName}
WHERE IsCompleted = 0
ORDER BY WhenUtc ASC
OFFSET @Skip ROWS FETCH NEXT 1 ROW ONLY;
""";
}

Expand Down
80 changes: 41 additions & 39 deletions src/Akka.Reminders.SqlServer/SqlServerReminderStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,40 +99,43 @@ public async Task<PendingRemindersWithSummary> GetNextRemindersAsync(
reminders.Add(reminder);
}

var overview = await GetRemindersOverviewAsync(now, cancellationToken);

var fetchedKeys = new HashSet<(string, string, string)>(
reminders.Select(r => (r.Entity.ShardRegionName, r.Entity.EntityId, r.Key.Name)));

// Get overview of remaining reminders using aggregate queries
await using var conn2 = _dialect.CreateConnection(_settings.ConnectionString);
await conn2.OpenAsync(cancellationToken);
await using var cmd2 = conn2.CreateCommand();
cmd2.CommandText = _dialect.GetOverviewSql(_settings.SchemaName, _settings.TableName);
cmd2.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds;
await using var reader2 = await cmd2.ExecuteReaderAsync(cancellationToken);

var remainingReminders = new List<ScheduledReminder>();
while (await reader2.ReadAsync(cancellationToken))
long totalPending = 0;
await using (var cmd2 = conn2.CreateCommand())
{
var isCompleted = reader2.GetBoolean(reader2.GetOrdinal("IsCompleted"));
if (!isCompleted)
cmd2.CommandText = _dialect.GetOverviewAggregateSql(_settings.SchemaName, _settings.TableName);
cmd2.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds;
await using var reader2 = await cmd2.ExecuteReaderAsync(cancellationToken);
if (await reader2.ReadAsync(cancellationToken))
{
var reminder = ReadReminderFromReader(reader2);
var key = (reminder.Entity.ShardRegionName, reminder.Entity.EntityId, reminder.Key.Name);
if (!fetchedKeys.Contains(key))
{
remainingReminders.Add(reminder);
}
totalPending = reader2.GetInt32(reader2.GetOrdinal("TotalCount"));
}
}

var nextReminder = remainingReminders.OrderBy(r => r.When).FirstOrDefault();
var timeUntilNext = nextReminder != null ? nextReminder.When - now : TimeSpan.MaxValue;
var remainingCount = totalPending - reminders.Count;
var timeUntilNext = TimeSpan.MaxValue;

if (remainingCount > 0)
{
await using var cmd3 = conn2.CreateCommand();
cmd3.CommandText = _dialect.GetNextReminderTimeSql(_settings.SchemaName, _settings.TableName);
cmd3.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds;
_dialect.AddParameter(cmd3, "@Skip", reminders.Count);

var result = await cmd3.ExecuteScalarAsync(cancellationToken);
if (result is DateTime nextWhenUtc)
{
timeUntilNext = new DateTimeOffset(DateTime.SpecifyKind(nextWhenUtc, DateTimeKind.Utc)) - now;
}
}

var adjustedOverview = new ReminderOverview
{
TimeUntilNext = timeUntilNext,
TotalPendingReminders = remainingReminders.Count
TotalPendingReminders = remainingCount
};

return new PendingRemindersWithSummary(reminders, adjustedOverview);
Expand Down Expand Up @@ -192,42 +195,41 @@ public async Task<bool> MarkRemindersAsCompletedAsync(
}
}

/// <inheritdoc />
public async Task<ReminderOverview> GetRemindersOverviewAsync(
DateTimeOffset now,
CancellationToken cancellationToken = default)
{
await EnsureInitializedAsync(cancellationToken);

var allReminders = new List<ScheduledReminder>();

await using var connection = _dialect.CreateConnection(_settings.ConnectionString);
await connection.OpenAsync(cancellationToken);

await using var command = connection.CreateCommand();
command.CommandText = _dialect.GetOverviewSql(_settings.SchemaName, _settings.TableName);
command.CommandText = _dialect.GetOverviewAggregateSql(_settings.SchemaName, _settings.TableName);
command.CommandTimeout = (int)_settings.CommandTimeout.TotalSeconds;

await using var reader = await command.ExecuteReaderAsync(cancellationToken);

while (await reader.ReadAsync(cancellationToken))
if (await reader.ReadAsync(cancellationToken))
{
var isCompleted = reader.GetBoolean(reader.GetOrdinal("IsCompleted"));
var totalCount = reader.GetInt32(reader.GetOrdinal("TotalCount"));
var nextWhenUtcOrdinal = reader.GetOrdinal("NextWhenUtc");

if (totalCount == 0 || reader.IsDBNull(nextWhenUtcOrdinal))
return ReminderOverview.Empty;

if (!isCompleted)
var nextWhenUtc = reader.GetDateTime(nextWhenUtcOrdinal);
var timeUntilNext = new DateTimeOffset(DateTime.SpecifyKind(nextWhenUtc, DateTimeKind.Utc)) - now;

return new ReminderOverview
{
var reminder = ReadReminderFromReader(reader);
allReminders.Add(reminder);
}
TotalPendingReminders = totalCount,
TimeUntilNext = timeUntilNext
};
}

var nextReminder = allReminders.OrderBy(r => r.When).FirstOrDefault();
var timeUntilNext = nextReminder != null ? nextReminder.When - now : TimeSpan.MaxValue;

return new ReminderOverview
{
TimeUntilNext = timeUntilNext,
TotalPendingReminders = allReminders.Count
};
return ReminderOverview.Empty;
}

public async Task<bool> CleanUpCompletedRemindersAsync(
Expand Down
3 changes: 2 additions & 1 deletion src/Akka.Reminders.Sqlite/Internal/ISqlDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ internal interface ISqlDialect
string GetMarkCompletedSql(string tableName);
string GetBatchMarkCompletedSql(string tableName, int count);
string GetCleanupSql(string tableName);
string GetOverviewSql(string tableName);
string GetOverviewAggregateSql(string tableName);
string GetNextReminderTimeSql(string tableName);
string GetCancelReminderSql(string tableName);
string GetCancelAllRemindersSql(string tableName);
string GetFetchRemindersSql(string tableName);
Expand Down
20 changes: 16 additions & 4 deletions src/Akka.Reminders.Sqlite/Internal/SqliteDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,27 @@ DELETE FROM {fullTableName}
""";
}

public string GetOverviewSql(string tableName)
public string GetOverviewAggregateSql(string tableName)
{
var fullTableName = $"\"{tableName}\"";

return $"""
SELECT shard_region_name, entity_id, reminder_key, when_utc, repeat_interval_ticks,
serializer_id, manifest, payload, attempt_count, last_failure_reason, is_completed
SELECT COUNT(*) AS total_count, MIN(when_utc) AS next_when_utc
FROM {fullTableName}
ORDER BY when_utc ASC;
WHERE is_completed = 0;
""";
}

public string GetNextReminderTimeSql(string tableName)
{
var fullTableName = $"\"{tableName}\"";

return $"""
SELECT when_utc
FROM {fullTableName}
WHERE is_completed = 0
ORDER BY when_utc ASC
LIMIT 1 OFFSET @Skip;
""";
}

Expand Down
Loading