Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions .github/workflows/redis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: redis

on:
push:
branches: [ '**' ]
pull_request:
branches: [ '**' ]

env:
config: Release
disable_test_parallelization: true

jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 10

steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Setup .NET 8
uses: actions/setup-dotnet@v1
with:
dotnet-version: 8.0.x

- name: Setup .NET 9
uses: actions/setup-dotnet@v1
with:
dotnet-version: 9.0.x

- name: Setup .NET 10
uses: actions/setup-dotnet@v1
with:
dotnet-version: 10.0.x

- name: Start Redis and PostgreSQL
run: docker compose up -d redis-server postgresql

- name: Build
run: dotnet build src/Transports/Redis/Wolverine.Redis.Tests/Wolverine.Redis.Tests.csproj --configuration ${{ env.config }} --framework net10.0

- name: Wait for Redis
run: |
echo "Waiting for Redis to be ready..."
for i in {1..30}; do
if docker compose exec -T redis-server redis-cli ping | grep -q PONG; then
echo "Redis is ready"
break
fi
echo "Attempt $i: Redis not ready yet, waiting..."
sleep 2
done

- name: Wait for PostgreSQL
run: |
echo "Waiting for PostgreSQL to be ready..."
for i in {1..30}; do
if docker compose exec -T postgresql pg_isready -U postgres; then
echo "PostgreSQL is ready"
break
fi
echo "Attempt $i: PostgreSQL not ready yet, waiting..."
sleep 2
done

- name: Test
run: dotnet test src/Transports/Redis/Wolverine.Redis.Tests/Wolverine.Redis.Tests.csproj --configuration ${{ env.config }} --framework net10.0 --no-build --logger "GitHubActions;summary.includePassedTests=true;summary.includeSkippedTests=true"

- name: Stop containers
if: always()
run: docker compose down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" PrivateAssets="All" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageReference Include="NSubstitute" Version="5.3.0" />
<PackageReference Include="Shouldly" Version="4.3.0" />
Expand Down
130 changes: 54 additions & 76 deletions src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -493,100 +493,78 @@ public async Task<long> MoveScheduledToReadyStreamAsync(CancellationToken cancel

try
{
// Get current time as Unix timestamp in milliseconds
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();

// Get all messages that are ready to be executed (score <= now)
// Query only entries whose score <= now (ready to execute), without
// removing entries that aren't ready yet. This avoids the race condition
// where a pop-then-re-add temporarily empties the sorted set.
var readyEntries = await database.SortedSetRangeByScoreAsync(
scheduledKey,
double.NegativeInfinity,
now,
take: 100,
order: Order.Ascending);

if (readyEntries.Length == 0)
{
return 0;
}

_logger.LogDebug(
"Found {Count} scheduled messages ready for execution in {ScheduledKey}",
readyEntries.Length,
scheduledKey);

long count = 0;
var limit = 100; // Limit number of messages to move in one call
var hasMore = true;
while (limit-- > 0 && hasMore && !cancellationToken.IsCancellationRequested)

foreach (var serializedEnvelope in readyEntries)
{
var setEntries = await database.SortedSetPopAsync(
scheduledKey,
1,
Order.Descending);
var readyMessages = new List<RedisValue>();
foreach (var sortedSetEntry in setEntries)
{
if (sortedSetEntry.Score > now)
{
// Not ready yet, re-add to the sorted set
await database.SortedSetAddAsync(
scheduledKey,
sortedSetEntry.Element,
sortedSetEntry.Score);
hasMore = false;
}
else
{
readyMessages.Add(sortedSetEntry.Element);
}
}
if (cancellationToken.IsCancellationRequested) break;

if (readyMessages.Count == 0)
try
{
return 0;
}
// Remove from sorted set first; if another consumer already
// removed it we skip.
var removed = await database.SortedSetRemoveAsync(scheduledKey, serializedEnvelope);
if (!removed) continue;

// Deserialize the envelope
var envelope = EnvelopeSerializer.Deserialize(serializedEnvelope);

// Add it to the stream
_endpoint.EnvelopeMapper ??= _endpoint.BuildMapper(_runtime);
var fields = new List<NameValueEntry>();
_endpoint.EnvelopeMapper.MapEnvelopeToOutgoing(envelope, fields);

_logger.LogDebug(
"Found {Count} scheduled messages ready for execution in {ScheduledKey}",
readyMessages.Count,
scheduledKey);
var messageId = await database.StreamAddAsync(
_endpoint.StreamKey,
fields.ToArray());

count++;

foreach (var serializedEnvelope in readyMessages)
_logger.LogDebug(
"Moved scheduled message {EnvelopeId} (Attempts={Attempts}) to stream {StreamKey} with message ID {MessageId}",
envelope.Id,
envelope.Attempts,
_endpoint.StreamKey,
messageId);
}
catch (Exception ex)
{
try
{
// Deserialize the envelope
var envelope =
EnvelopeSerializer.Deserialize(serializedEnvelope);

// Add it to the stream
_endpoint.EnvelopeMapper ??=
_endpoint.BuildMapper(_runtime);
var fields = new List<NameValueEntry>();
_endpoint.EnvelopeMapper.MapEnvelopeToOutgoing(
envelope,
fields);

var messageId = await database.StreamAddAsync(
_endpoint.StreamKey,
fields.ToArray());

// Remove from scheduled set
// await database.SortedSetRemoveAsync(scheduledKey, serializedEnvelope);

count++;

_logger.LogDebug(
"Moved scheduled message {EnvelopeId} (Attempts={Attempts}) to stream {StreamKey} with message ID {MessageId}",
envelope.Id,
envelope.Attempts,
_endpoint.StreamKey,
messageId);
}
catch (Exception ex)
{
_logger.LogError(
ex,
"Error processing scheduled message in {ScheduledKey}",
scheduledKey);
// Remove the corrupted message from the scheduled set
await database.SortedSetRemoveAsync(
scheduledKey,
serializedEnvelope);
}
_logger.LogError(
ex,
"Error processing scheduled message in {ScheduledKey}",
scheduledKey);
// Remove the corrupted message from the scheduled set
await database.SortedSetRemoveAsync(scheduledKey, serializedEnvelope);
}
}

return count;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error moving scheduled messages from {ScheduledKey} to stream {StreamKey}",
_logger.LogError(ex, "Error moving scheduled messages from {ScheduledKey} to stream {StreamKey}",
scheduledKey, _endpoint.StreamKey);
throw;
}
Expand Down
Loading