diff --git a/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs b/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs index a5304df31..32fba6915 100644 --- a/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs +++ b/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs @@ -419,7 +419,8 @@ public async Task persist_an_incoming_envelope_raw() Status = EnvelopeStatus.Scheduled, Attempts = 2, MessageType = "foo", - ContentType = EnvelopeConstants.JsonContentType + ContentType = EnvelopeConstants.JsonContentType, + Destination = TransportConstants.DurableLocalUri }; var container = Host.Services.GetRequiredService(); @@ -465,7 +466,8 @@ public async Task persist_an_incoming_envelope_mapped() Status = EnvelopeStatus.Scheduled, Attempts = 2, MessageType = "foo", - ContentType = EnvelopeConstants.JsonContentType + ContentType = EnvelopeConstants.JsonContentType, + Destination = TransportConstants.DurableLocalUri }; var container = Host.Services.GetRequiredService(); diff --git a/src/Persistence/MartenTests/Bugs/Bug_1175_schema_name_with_queues.cs b/src/Persistence/MartenTests/Bugs/Bug_1175_schema_name_with_queues.cs index 9f9bc27c5..f6ccee797 100644 --- a/src/Persistence/MartenTests/Bugs/Bug_1175_schema_name_with_queues.cs +++ b/src/Persistence/MartenTests/Bugs/Bug_1175_schema_name_with_queues.cs @@ -52,10 +52,13 @@ public async Task send_messages_with_postgresql_queueing() opts.ListenToPostgresqlQueue("request").MaximumParallelMessages(14, ProcessingOrder.UnOrdered); opts.PublishMessage().ToPostgresqlQueue("response"); - opts.Services.AddMarten(opt => + opts.Durability.ScheduledJobPollingTime = 250.Milliseconds(); + + opts.Services.AddMarten(m => { - opt.Connection(Servers.PostgresConnectionString); - opt.Events.TenancyStyle = TenancyStyle.Conjoined; + m.Connection(Servers.PostgresConnectionString); + m.Events.TenancyStyle = TenancyStyle.Conjoined; + m.DisableNpgsqlLogging = true; }) .UseLightweightSessions() .IntegrateWithWolverine(options => diff --git a/src/Persistence/MartenTests/Bugs/Bug_826_issue_with_paused_listener.cs b/src/Persistence/MartenTests/Bugs/Bug_826_issue_with_paused_listener.cs index f98746a4a..837a60786 100644 --- a/src/Persistence/MartenTests/Bugs/Bug_826_issue_with_paused_listener.cs +++ b/src/Persistence/MartenTests/Bugs/Bug_826_issue_with_paused_listener.cs @@ -56,7 +56,7 @@ public async Task can_resume_listening() await host.TrackActivity() .WaitForMessageToBeReceivedAt(host) - .Timeout(20.Seconds()) + .Timeout(60.Seconds()) .PublishMessageAndWaitAsync(new ThisMeansTrouble()); } } diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs index d0c1a695b..2af2b04d8 100644 --- a/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs +++ b/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs @@ -234,6 +234,17 @@ await _dataSource.CreateCommand($"update {_nodeTable} set health_check = now() w .With("id", nodeId).ExecuteNonQueryAsync(); } + public async Task MarkHealthCheckAsync(WolverineNode node, CancellationToken token) + { + var count = await _dataSource.CreateCommand($"update {_nodeTable} set health_check = now() where id = :id") + .With("id", node.NodeId).ExecuteNonQueryAsync(token); + + if (count == 0) + { + await PersistAsync(node, token); + } + } + public async Task> LoadAllNodeAssignedIdsAsync() { return await _dataSource.CreateCommand($"select node_number from {_nodeTable}") diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs index aedeb86e4..4de599bf5 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs @@ -20,6 +20,8 @@ public Task ReassignIncomingAsync(int ownerId, IReadOnlyList incoming) { builder.Append($"update {SchemaName}.{DatabaseConstants.IncomingTable} set owner_id = "); builder.AppendParameter(ownerId); + builder.Append($" where {DatabaseConstants.Id} = "); + builder.AppendParameter(envelope.Id); builder.Append($" and {DatabaseConstants.ReceivedAt} = "); builder.AppendParameter(envelope.Destination.ToString()); builder.Append(";"); diff --git a/src/Persistence/Wolverine.RDBMS/Polling/DatabaseBatcher.cs b/src/Persistence/Wolverine.RDBMS/Polling/DatabaseBatcher.cs index 8171a7c07..d967c383f 100644 --- a/src/Persistence/Wolverine.RDBMS/Polling/DatabaseBatcher.cs +++ b/src/Persistence/Wolverine.RDBMS/Polling/DatabaseBatcher.cs @@ -88,6 +88,10 @@ public async Task DrainAsync() _executingBlock.Complete(); await _executingBlock.Completion; } + catch (TaskCanceledException) + { + // it just timed out, let it go + } catch (Exception e) { _logger.LogError(e, "Error trying to drain the current database batcher"); diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.NodeAgents.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.NodeAgents.cs index 07a2894af..8a1271903 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.NodeAgents.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.NodeAgents.cs @@ -166,6 +166,13 @@ public async Task MarkHealthCheckAsync(Guid nodeId) await session.SaveChangesAsync(); } + public async Task MarkHealthCheckAsync(WolverineNode node, CancellationToken cancellationToken) + { + using var session = _store.OpenAsyncSession(); + session.Advanced.AddOrPatch(node.NodeId.ToString(), node, x => x.LastHealthCheck, DateTimeOffset.UtcNow); + await session.SaveChangesAsync(cancellationToken); + } + public async Task OverwriteHealthCheckTimeAsync(Guid nodeId, DateTimeOffset lastHeartbeatTime) { using var session = _store.OpenAsyncSession(); diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerNodePersistence.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerNodePersistence.cs index ae321daaa..f560307cc 100644 --- a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerNodePersistence.cs +++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerNodePersistence.cs @@ -47,6 +47,15 @@ public async Task PersistAsync(WolverineNode node, CancellationToken cancel await using var conn = new SqlConnection(_settings.ConnectionString); await conn.OpenAsync(cancellationToken); + var raw = await persistNode(conn, node, cancellationToken); + + await conn.CloseAsync(); + + return (int)raw; + } + + private async Task persistNode(SqlConnection conn, WolverineNode node, CancellationToken cancellationToken) + { var strings = node.Capabilities.Select(x => x.ToString()).Join(","); var cmd = conn.CreateCommand($"insert into {_nodeTable} (id, uri, capabilities, description) OUTPUT Inserted.node_number values (@id, @uri, @capabilities, @description) ") @@ -55,10 +64,7 @@ public async Task PersistAsync(WolverineNode node, CancellationToken cancel .With("capabilities", strings); var raw = await cmd.ExecuteScalarAsync(cancellationToken); - - await conn.CloseAsync(); - - return (int)raw; + return raw; } public async Task DeleteAsync(Guid nodeId, int assignedNodeNumber) @@ -160,6 +166,22 @@ await conn.CreateCommand($"update {_nodeTable} set health_check = GETUTCDATE() w await conn.CloseAsync(); } + public async Task MarkHealthCheckAsync(WolverineNode node, CancellationToken cancellationToken) + { + await using var conn = new SqlConnection(_settings.ConnectionString); + await conn.OpenAsync(cancellationToken); + + var count = await conn.CreateCommand($"update {_nodeTable} set health_check = GETUTCDATE() where id = @id") + .With("id", node.NodeId).ExecuteNonQueryAsync(cancellationToken); + + if (count == 0) + { + await persistNode(conn, node, cancellationToken); + } + + await conn.CloseAsync(); + } + private async Task readNodeAsync(DbDataReader reader) { var node = new WolverineNode diff --git a/src/Testing/Wolverine.ComplianceTests/NodePersistenceCompliance.cs b/src/Testing/Wolverine.ComplianceTests/NodePersistenceCompliance.cs index 0c01e8395..1bc9104e2 100644 --- a/src/Testing/Wolverine.ComplianceTests/NodePersistenceCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/NodePersistenceCompliance.cs @@ -244,9 +244,15 @@ public async Task update_health_check_smoke_test() await _database.Nodes.PersistAsync(node1, CancellationToken.None); await _database.Nodes.PersistAsync(node2, CancellationToken.None); - await _database.Nodes.PersistAsync(node3, CancellationToken.None); + //await _database.Nodes.PersistAsync(node3, CancellationToken.None); + + await _database.Nodes.MarkHealthCheckAsync(node1, CancellationToken.None); + await _database.Nodes.MarkHealthCheckAsync(node2, CancellationToken.None); + await _database.Nodes.MarkHealthCheckAsync(node3, CancellationToken.None); - await _database.Nodes.MarkHealthCheckAsync(node1.NodeId); + // Proving the upsert behavior + var nodes = await _database.Nodes.LoadAllNodesAsync(CancellationToken.None); + nodes.Any(x => x.NodeId == node3.NodeId).ShouldBeTrue(); } } \ No newline at end of file diff --git a/src/Wolverine/Persistence/Durability/NullMessageStore.cs b/src/Wolverine/Persistence/Durability/NullMessageStore.cs index e187aefd8..15b56c6dd 100644 --- a/src/Wolverine/Persistence/Durability/NullMessageStore.cs +++ b/src/Wolverine/Persistence/Durability/NullMessageStore.cs @@ -289,6 +289,11 @@ public Task MarkHealthCheckAsync(Guid nodeId) return Task.CompletedTask; } + public Task MarkHealthCheckAsync(WolverineNode node, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + public Task OverwriteHealthCheckTimeAsync(Guid nodeId, DateTimeOffset lastHeartbeatTime) { return Task.CompletedTask; diff --git a/src/Wolverine/Runtime/Agents/INodeAgentPersistence.cs b/src/Wolverine/Runtime/Agents/INodeAgentPersistence.cs index 4d0f7f201..8f4457e4e 100644 --- a/src/Wolverine/Runtime/Agents/INodeAgentPersistence.cs +++ b/src/Wolverine/Runtime/Agents/INodeAgentPersistence.cs @@ -19,7 +19,12 @@ public interface INodeAgentPersistence [Obsolete("Kill this in 3.0")] Task MarkNodeAsLeaderAsync(Guid? originalLeader, Guid id); Task LoadNodeAsync(Guid nodeId, CancellationToken cancellationToken); + + // TODO -- make this take WolverineNode instead + [Obsolete("Will be removed in 4.0")] Task MarkHealthCheckAsync(Guid nodeId); + + Task MarkHealthCheckAsync(WolverineNode node, CancellationToken cancellationToken); Task OverwriteHealthCheckTimeAsync(Guid nodeId, DateTimeOffset lastHeartbeatTime); diff --git a/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs b/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs index 5be29b6b6..6512252ba 100644 --- a/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs +++ b/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs @@ -10,6 +10,14 @@ public partial class NodeAgentController public async Task EvaluateAssignmentsAsync(IReadOnlyList nodes) { using var activity = WolverineTracing.ActivitySource.StartActivity("wolverine_node_assignments"); + + // Not sure how this *could* happen, but we had a report of it happening in production + // probably because someone messed w/ the database though + if (!nodes.Any()) + { + // At least use the current node + nodes = new List { WolverineNode.For(_runtime.Options) }; + } var grid = new AssignmentGrid(); diff --git a/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs b/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs index 94b9ad848..e25f094a7 100644 --- a/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs +++ b/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs @@ -33,8 +33,8 @@ public async Task DoHealthChecksAsync() using var activity = WolverineTracing.ActivitySource.StartActivity("wolverine_node_assignments"); - // write health check regardless - await _persistence.MarkHealthCheckAsync(_runtime.Options.UniqueNodeId); + // write health check regardless, and due to GH-1232, pass in the whole node so you can do an upsert + await _persistence.MarkHealthCheckAsync(WolverineNode.For(_runtime.Options), _cancellation.Token); var nodes = await _persistence.LoadAllNodesAsync(_cancellation.Token); diff --git a/src/Wolverine/Runtime/Agents/WolverineNode.cs b/src/Wolverine/Runtime/Agents/WolverineNode.cs index baba7e502..3dac56af0 100644 --- a/src/Wolverine/Runtime/Agents/WolverineNode.cs +++ b/src/Wolverine/Runtime/Agents/WolverineNode.cs @@ -36,7 +36,8 @@ public static WolverineNode For(WolverineOptions options) return new WolverineNode { NodeId = options.UniqueNodeId, - ControlUri = options.Transports.NodeControlEndpoint?.Uri + ControlUri = options.Transports.NodeControlEndpoint?.Uri, + LastHealthCheck = DateTimeOffset.UtcNow }; } diff --git a/src/Wolverine/Runtime/HandlerPipeline.cs b/src/Wolverine/Runtime/HandlerPipeline.cs index e09826b51..f26db8a9a 100644 --- a/src/Wolverine/Runtime/HandlerPipeline.cs +++ b/src/Wolverine/Runtime/HandlerPipeline.cs @@ -81,6 +81,10 @@ public async Task InvokeAsync(Envelope envelope, IChannelCallback channel, Activ var continuation = await executeAsync(context, envelope, activity); await continuation.ExecuteAsync(context, _runtime, DateTimeOffset.Now, activity); } + catch (ObjectDisposedException) + { + // It's shutting down, get out of here + } catch (Exception e) { await channel.CompleteAsync(envelope);