Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/EventTests/Daemon/DeadLetterCountDefaultsTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Events;
using JasperFx.Events.Daemon;
using JasperFx.Events.Projections;
using Shouldly;

namespace EventTests.Daemon;

public class DeadLetterCountDefaultsTests
{
// A bare IEventDatabase that does NOT override the dead-letter count read members,
// so the calls below exercise the default interface implementations added for
// jasperfx#356 (the "return 0 / empty" stand-ins for stores that don't yet read
// dead letters). Everything else throws — those members are never invoked here.
private sealed class BareEventDatabase : IEventDatabase
{
public string Identifier => throw new NotImplementedException();
public Uri DatabaseUri => throw new NotImplementedException();
public ShardStateTracker Tracker => throw new NotImplementedException();
public string StorageIdentifier => throw new NotImplementedException();

public Task StoreDeadLetterEventAsync(object storage, DeadLetterEvent deadLetterEvent, CancellationToken token)
=> throw new NotImplementedException();

public Task EnsureStorageExistsAsync(Type storageType, CancellationToken token)
=> throw new NotImplementedException();

public Task WaitForNonStaleProjectionDataAsync(TimeSpan timeout)
=> throw new NotImplementedException();

public Task<long> ProjectionProgressFor(ShardName name, CancellationToken token = default)
=> throw new NotImplementedException();

public Task<long?> FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, CancellationToken token)
=> throw new NotImplementedException();

public Task<long> FetchHighestEventSequenceNumber(CancellationToken token)
=> throw new NotImplementedException();

public Task<IReadOnlyList<ShardState>> AllProjectionProgress(CancellationToken token = default)
=> throw new NotImplementedException();
}

private readonly IEventDatabase theDatabase = new BareEventDatabase();

[Fact]
public async Task count_dead_letters_default_returns_zero()
{
var count = await theDatabase.CountDeadLetterEventsAsync(new ShardName("Fake"));
count.ShouldBe(0);
}

[Fact]
public async Task fetch_dead_letter_counts_default_returns_empty()
{
var counts = await theDatabase.FetchDeadLetterCountsAsync();
counts.ShouldBeEmpty();
}
}
14 changes: 14 additions & 0 deletions src/JasperFx.Events/Daemon/DeadLetterShardCount.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using JasperFx.Events.Projections;

namespace JasperFx.Events.Daemon;

/// <summary>
/// A count of stored projection/subscription dead letter events for a single shard.
/// <see cref="ProjectionName" /> aligns with <see cref="ShardName.Name" /> and
/// <see cref="ShardKey" /> aligns with <see cref="ShardName.ShardKey" />, matching how
/// <see cref="DeadLetterEvent" /> records them. See jasperfx#356.
/// </summary>
/// <param name="ProjectionName">The projection name (<see cref="ShardName.Name" />).</param>
/// <param name="ShardKey">The shard key within the projection (<see cref="ShardName.ShardKey" />).</param>
/// <param name="Count">The number of stored dead letter events for this shard.</param>
public record DeadLetterShardCount(string ProjectionName, string ShardKey, long Count);
24 changes: 24 additions & 0 deletions src/JasperFx.Events/IEventDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,28 @@ Task<long> ProjectionProgressFor(ShardName name,
/// <returns></returns>
Task<IReadOnlyList<ShardState>> AllProjectionProgress(
CancellationToken token = default);

/// <summary>
/// Count the stored dead letter events for a single projection/subscription shard. With
/// <c>SkipApplyErrors</c> on (the JasperFx.Events 2.0 default), a failed <c>Apply()</c> is
/// recorded as a <see cref="DeadLetterEvent" /> and the shard keeps advancing, so the
/// accumulation of these is the primary "this projection is unhealthy" signal.
/// The default implementation returns 0 as a stand-in; event stores that persist dead
/// letters should override this. See jasperfx#356.
/// </summary>
/// <param name="shard">The projection/subscription shard to count dead letters for.</param>
/// <param name="token"></param>
Task<long> CountDeadLetterEventsAsync(ShardName shard, CancellationToken token = default)
=> Task.FromResult(0L);

/// <summary>
/// Fetch the stored dead letter event counts for this database, one row per shard
/// (<see cref="DeadLetterShardCount.ProjectionName" /> + <see cref="DeadLetterShardCount.ShardKey" />).
/// Mirrors the "give me every row" shape of <see cref="AllProjectionProgress" />.
/// The default implementation returns an empty list as a stand-in; event stores that
/// persist dead letters should override this. See jasperfx#356.
/// </summary>
/// <param name="token"></param>
Task<IReadOnlyList<DeadLetterShardCount>> FetchDeadLetterCountsAsync(CancellationToken token = default)
=> Task.FromResult<IReadOnlyList<DeadLetterShardCount>>([]);
}
Loading