Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public async Task ClearAllAsync(CancellationToken cancellationToken)
// Shouldn't really get called at runtime, so we're doing it crudely
var nodes = await LoadAllNodesAsync(cancellationToken);
using var session = _store.OpenAsyncSession();

foreach (var node in nodes)
{
session.Delete(node);
Expand All @@ -23,7 +23,7 @@ public async Task ClearAllAsync(CancellationToken cancellationToken)
session.Delete(AgentAssignment.ToId(agent));
}
}

await session.SaveChangesAsync(cancellationToken);
}

Expand All @@ -33,36 +33,36 @@ public async Task<int> PersistAsync(WolverineNode node, CancellationToken cancel
{
TransactionMode = TransactionMode.ClusterWide
});

var sequence = await session.LoadAsync<NodeSequence>(NodeSequence.SequenceId, cancellationToken);
sequence ??= new NodeSequence();

node.AssignedNodeNumber = ++sequence.Count;

await session.StoreAsync(sequence, cancellationToken);
await session.StoreAsync(node, cancellationToken);
await session.SaveChangesAsync(cancellationToken);
return node.AssignedNodeNumber;

return node.AssignedNodeNumber;
}

public async Task DeleteAsync(Guid nodeId, int assignedNodeNumber)
{
using var session = _store.OpenAsyncSession();
session.Delete(nodeId.ToString());

await session.SaveChangesAsync();

// Actually okay for these to be eventually consistent

var query = new IndexQuery
{
Query = $"from AgentAssignments a where a.NodeId = $node",
WaitForNonStaleResults = true,
WaitForNonStaleResultsTimeout = 5.Seconds(),
QueryParameters = new(){{"node", nodeId}}
QueryParameters = new() { { "node", nodeId } }
};

var op = await _store.Operations.SendAsync(
new DeleteByQueryOperation(query));
await op.WaitForCompletionAsync();
Expand All @@ -82,25 +82,67 @@ public async Task<IReadOnlyList<WolverineNode>> LoadAllNodesAsync(CancellationTo
.Query<AgentAssignment>()
.Customize(x => x.WaitForNonStaleResults())
.ToListAsync(token: cancellationToken);

foreach (var node in answer)
{
node.ActiveAgents.Clear();
node.ActiveAgents.AddRange(assignments.Where(x => x.NodeId == node.NodeId).Select(x => x.AgentUri));
}

return answer;
}

public Task PersistAgentRestrictionsAsync(IReadOnlyList<AgentRestriction> restrictions,
/// <summary>
/// RavenDB persistence model for agent restrictions, required since RavenDB is built around string identifiers.
/// </summary>
public record WolverineAgentRestriction(string Id, Uri AgentUri, AgentRestrictionType Type, int NodeNumber);

public async Task PersistAgentRestrictionsAsync(IReadOnlyList<AgentRestriction> restrictions,
CancellationToken cancellationToken)
{
throw new NotImplementedException();
using var session = _store.OpenAsyncSession(new SessionOptions
{
TransactionMode = TransactionMode.ClusterWide
});

foreach (var restriction in restrictions)
{
if (restriction.Type == AgentRestrictionType.None)
{
session.Delete(restriction.Id.ToString());
}
else
{
var persistedRestriction = new WolverineAgentRestriction(
Id: restriction.Id.ToString(),
AgentUri: restriction.AgentUri,
Type: restriction.Type,
NodeNumber: restriction.NodeNumber);
await session.StoreAsync(persistedRestriction, changeVector: null, id: persistedRestriction.Id, cancellationToken);
}
}

await session.SaveChangesAsync(cancellationToken);
}

public Task<NodeAgentState> LoadNodeAgentStateAsync(CancellationToken cancellationToken)
public async Task<NodeAgentState> LoadNodeAgentStateAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
using var session = _store.OpenAsyncSession();
var nodes = await session
.Query<WolverineNode>()
.Customize(x => x.WaitForNonStaleResults())
.ToListAsync(token: cancellationToken);

var restrictions = await session
.Query<WolverineAgentRestriction>()
.Customize(x => x.WaitForNonStaleResults())
.ToListAsync(token: cancellationToken);

var agentRestrictions = restrictions.Select(
x => new AgentRestriction(Guid.Parse(x.Id), x.AgentUri, x.Type, x.NodeNumber)
);

return new NodeAgentState(nodes, new AgentRestrictions([.. agentRestrictions]));
}

public async Task AssignAgentsAsync(Guid nodeId, IReadOnlyList<Uri> agents, CancellationToken cancellationToken)
Expand All @@ -111,15 +153,15 @@ public async Task AssignAgentsAsync(Guid nodeId, IReadOnlyList<Uri> agents, Canc
var agentAssignment = new AgentAssignment(agent, nodeId);
await session.StoreAsync(agentAssignment, cancellationToken);
}

await session.SaveChangesAsync(token: cancellationToken);
}

public async Task RemoveAssignmentAsync(Guid nodeId, Uri agentUri, CancellationToken cancellationToken)
{
using var session = _store.OpenAsyncSession();
session.Delete(AgentAssignment.ToId(agentUri));

await session.SaveChangesAsync(token: cancellationToken);
}

Expand All @@ -129,7 +171,7 @@ public async Task AddAssignmentAsync(Guid nodeId, Uri agentUri, CancellationToke

var agentAssignment = new AgentAssignment(agentUri, nodeId);
await session.StoreAsync(agentAssignment, agentAssignment.Id, cancellationToken);

await session.SaveChangesAsync(token: cancellationToken);
}

Expand Down Expand Up @@ -165,7 +207,7 @@ public async Task LogRecordsAsync(params NodeRecord[] records)
{
await session.StoreAsync(record);
}

await session.SaveChangesAsync();
}

Expand All @@ -183,7 +225,7 @@ public async Task<IReadOnlyList<NodeRecord>> FetchRecentRecordsAsync(int count)
public class NodeSequence
{
public static readonly string SequenceId = "nodes/sequence";

public string Id { get; set; } = SequenceId;
public int Count { get; set; }
}
Loading