diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.NodeAgents.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.NodeAgents.cs index 65512eeb6..bb66f093c 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.NodeAgents.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.NodeAgents.cs @@ -14,7 +14,7 @@ public async Task ClearAllAsync(CancellationToken cancellationToken) // Shouldn't really get called at runtime, so we're doing it crudely var nodes = await LoadAllNodesAsync(cancellationToken); using var session = _store.OpenAsyncSession(); - + foreach (var node in nodes) { session.Delete(node); @@ -23,7 +23,7 @@ public async Task ClearAllAsync(CancellationToken cancellationToken) session.Delete(AgentAssignment.ToId(agent)); } } - + await session.SaveChangesAsync(cancellationToken); } @@ -33,26 +33,26 @@ public async Task PersistAsync(WolverineNode node, CancellationToken cancel { TransactionMode = TransactionMode.ClusterWide }); - + var sequence = await session.LoadAsync(NodeSequence.SequenceId, cancellationToken); sequence ??= new NodeSequence(); node.AssignedNodeNumber = ++sequence.Count; - + await session.StoreAsync(sequence, cancellationToken); await session.StoreAsync(node, cancellationToken); await session.SaveChangesAsync(cancellationToken); - - return node.AssignedNodeNumber; + + return node.AssignedNodeNumber; } public async Task DeleteAsync(Guid nodeId, int assignedNodeNumber) { using var session = _store.OpenAsyncSession(); session.Delete(nodeId.ToString()); - + await session.SaveChangesAsync(); - + // Actually okay for these to be eventually consistent var query = new IndexQuery @@ -60,9 +60,9 @@ public async Task DeleteAsync(Guid nodeId, int assignedNodeNumber) Query = $"from AgentAssignments a where a.NodeId = $node", WaitForNonStaleResults = true, WaitForNonStaleResultsTimeout = 5.Seconds(), - QueryParameters = new(){{"node", nodeId}} + QueryParameters = new() { { "node", nodeId } } }; - + var op = await _store.Operations.SendAsync( new DeleteByQueryOperation(query)); await op.WaitForCompletionAsync(); @@ -82,25 +82,67 @@ public async Task> LoadAllNodesAsync(CancellationTo .Query() .Customize(x => x.WaitForNonStaleResults()) .ToListAsync(token: cancellationToken); - + foreach (var node in answer) { node.ActiveAgents.Clear(); node.ActiveAgents.AddRange(assignments.Where(x => x.NodeId == node.NodeId).Select(x => x.AgentUri)); } - + return answer; } - public Task PersistAgentRestrictionsAsync(IReadOnlyList restrictions, + /// + /// RavenDB persistence model for agent restrictions, required since RavenDB is built around string identifiers. + /// + public record WolverineAgentRestriction(string Id, Uri AgentUri, AgentRestrictionType Type, int NodeNumber); + + public async Task PersistAgentRestrictionsAsync(IReadOnlyList restrictions, CancellationToken cancellationToken) { - throw new NotImplementedException(); + using var session = _store.OpenAsyncSession(new SessionOptions + { + TransactionMode = TransactionMode.ClusterWide + }); + + foreach (var restriction in restrictions) + { + if (restriction.Type == AgentRestrictionType.None) + { + session.Delete(restriction.Id.ToString()); + } + else + { + var persistedRestriction = new WolverineAgentRestriction( + Id: restriction.Id.ToString(), + AgentUri: restriction.AgentUri, + Type: restriction.Type, + NodeNumber: restriction.NodeNumber); + await session.StoreAsync(persistedRestriction, changeVector: null, id: persistedRestriction.Id, cancellationToken); + } + } + + await session.SaveChangesAsync(cancellationToken); } - public Task LoadNodeAgentStateAsync(CancellationToken cancellationToken) + public async Task LoadNodeAgentStateAsync(CancellationToken cancellationToken) { - throw new NotImplementedException(); + using var session = _store.OpenAsyncSession(); + var nodes = await session + .Query() + .Customize(x => x.WaitForNonStaleResults()) + .ToListAsync(token: cancellationToken); + + var restrictions = await session + .Query() + .Customize(x => x.WaitForNonStaleResults()) + .ToListAsync(token: cancellationToken); + + var agentRestrictions = restrictions.Select( + x => new AgentRestriction(Guid.Parse(x.Id), x.AgentUri, x.Type, x.NodeNumber) + ); + + return new NodeAgentState(nodes, new AgentRestrictions([.. agentRestrictions])); } public async Task AssignAgentsAsync(Guid nodeId, IReadOnlyList agents, CancellationToken cancellationToken) @@ -111,7 +153,7 @@ public async Task AssignAgentsAsync(Guid nodeId, IReadOnlyList agents, Canc var agentAssignment = new AgentAssignment(agent, nodeId); await session.StoreAsync(agentAssignment, cancellationToken); } - + await session.SaveChangesAsync(token: cancellationToken); } @@ -119,7 +161,7 @@ public async Task RemoveAssignmentAsync(Guid nodeId, Uri agentUri, CancellationT { using var session = _store.OpenAsyncSession(); session.Delete(AgentAssignment.ToId(agentUri)); - + await session.SaveChangesAsync(token: cancellationToken); } @@ -129,7 +171,7 @@ public async Task AddAssignmentAsync(Guid nodeId, Uri agentUri, CancellationToke var agentAssignment = new AgentAssignment(agentUri, nodeId); await session.StoreAsync(agentAssignment, agentAssignment.Id, cancellationToken); - + await session.SaveChangesAsync(token: cancellationToken); } @@ -165,7 +207,7 @@ public async Task LogRecordsAsync(params NodeRecord[] records) { await session.StoreAsync(record); } - + await session.SaveChangesAsync(); } @@ -183,7 +225,7 @@ public async Task> FetchRecentRecordsAsync(int count) public class NodeSequence { public static readonly string SequenceId = "nodes/sequence"; - + public string Id { get; set; } = SequenceId; public int Count { get; set; } }