Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@ public override async Task<IHost> BuildCleanHost()
return host;
}

[Fact]
public async Task exists_should_account_for_destination_too()
{
var envelope = ObjectMother.Envelope();
envelope.Status = EnvelopeStatus.Incoming;
envelope.SentAt = ((DateTimeOffset)DateTime.Today).ToUniversalTime();

(await thePersistence.Inbox.ExistsAsync(envelope, CancellationToken.None)).ShouldBeFalse();

await thePersistence.Inbox.StoreIncomingAsync(envelope);

(await thePersistence.Inbox.ExistsAsync(envelope, CancellationToken.None)).ShouldBeTrue();

var envelope2 = ObjectMother.Envelope();
envelope2.Id = envelope.Id;
envelope2.Destination = new Uri("stub://different");

(await thePersistence.Inbox.ExistsAsync(envelope2, CancellationToken.None)).ShouldBeFalse();
await thePersistence.Inbox.StoreIncomingAsync(envelope2);
(await thePersistence.Inbox.ExistsAsync(envelope2, CancellationToken.None)).ShouldBeTrue();
(await thePersistence.Inbox.ExistsAsync(envelope, CancellationToken.None)).ShouldBeTrue();
}

[Fact]
public async Task should_have_receive_at_in_primary_keys()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,28 @@ await thePersistence.As<IMessageDatabase>().PollForScheduledMessagesAsync(theRec
stored.OwnerId.ShouldBe(durabilitySettings.AssignedNodeNumber);
stored.Status.ShouldBe(EnvelopeStatus.Incoming);
}

[Fact]
public async Task exists_should_account_for_destination_too()
{
var envelope = ObjectMother.Envelope();
envelope.Status = EnvelopeStatus.Incoming;
envelope.SentAt = ((DateTimeOffset)DateTime.Today).ToUniversalTime();

(await thePersistence.Inbox.ExistsAsync(envelope, CancellationToken.None)).ShouldBeFalse();

await thePersistence.Inbox.StoreIncomingAsync(envelope);

(await thePersistence.Inbox.ExistsAsync(envelope, CancellationToken.None)).ShouldBeTrue();

var envelope2 = ObjectMother.Envelope();
envelope2.Id = envelope.Id;
envelope2.Destination = new Uri("stub://different");

(await thePersistence.Inbox.ExistsAsync(envelope2, CancellationToken.None)).ShouldBeFalse();
await thePersistence.Inbox.StoreIncomingAsync(envelope2);
(await thePersistence.Inbox.ExistsAsync(envelope2, CancellationToken.None)).ShouldBeTrue();
(await thePersistence.Inbox.ExistsAsync(envelope, CancellationToken.None)).ShouldBeTrue();
}

}
26 changes: 20 additions & 6 deletions src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,27 @@ public override async Task<bool> ExistsAsync(Envelope envelope, CancellationToke
{
if (HasDisposed) return false;

await using var conn = await NpgsqlDataSource.OpenConnectionAsync(cancellation);
var count = await conn
.CreateCommand($"select count(id) from {SchemaName}.{DatabaseConstants.IncomingTable} where id = :id")
.With("id", envelope.Id)
.ExecuteScalarAsync(cancellation);
if (Durability.MessageIdentity == MessageIdentity.IdOnly)
{
await using var conn = await NpgsqlDataSource.OpenConnectionAsync(cancellation);
var count = await conn
.CreateCommand($"select count(id) from {SchemaName}.{DatabaseConstants.IncomingTable} where id = :id")
.With("id", envelope.Id)
.ExecuteScalarAsync(cancellation);

return ((long)count) > 0;
return ((long)count) > 0;
}
else
{
await using var conn = await NpgsqlDataSource.OpenConnectionAsync(cancellation);
var count = await conn
.CreateCommand($"select count(id) from {SchemaName}.{DatabaseConstants.IncomingTable} where id = :id and {DatabaseConstants.ReceivedAt} = :destination")
.With("id", envelope.Id)
.With("destination", envelope.Destination.ToString())
.ExecuteScalarAsync(cancellation);

return ((long)count) > 0;
}
}

public override void WriteLoadScheduledEnvelopeSql(DbCommandBuilder builder, DateTimeOffset utcNow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,29 @@ public override async Task<bool> ExistsAsync(Envelope envelope, CancellationToke
{
if (HasDisposed) return false;

await using var conn = CreateConnection();
await conn.OpenAsync(cancellation);
var count = await conn
.CreateCommand($"select count(id) from {SchemaName}.{DatabaseConstants.IncomingTable} where id = @id")
.With("id", envelope.Id)
.ExecuteScalarAsync(cancellation);

return ((int)count) > 0;
if (Durability.MessageIdentity == MessageIdentity.IdOnly)
{
await using var conn = CreateConnection();
await conn.OpenAsync(cancellation);
var count = await conn
.CreateCommand($"select count(id) from {SchemaName}.{DatabaseConstants.IncomingTable} where id = @id")
.With("id", envelope.Id)
.ExecuteScalarAsync(cancellation);

return ((int)count) > 0;
}
else
{
await using var conn = CreateConnection();
await conn.OpenAsync(cancellation);
var count = await conn
.CreateCommand($"select count(id) from {SchemaName}.{DatabaseConstants.IncomingTable} where id = @id and {DatabaseConstants.ReceivedAt} = @destination")
.With("id", envelope.Id)
.With("destination", envelope.Destination.ToString())
.ExecuteScalarAsync(cancellation);

return ((int)count) > 0;
}
}

public override void WriteLoadScheduledEnvelopeSql(DbCommandBuilder builder, DateTimeOffset utcNow)
Expand Down
Loading