Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,64 +208,72 @@ public async IAsyncEnumerable<Node> DiscoverNodes([EnumeratorCancellation] Cance
{
Channel<Node> discoveredNodesChannel = Channel.CreateBounded<Node>(1);

async Task DiscoverAsync(IEnumerable<IEnr> startingNode, ArrayPoolSpan<byte> nodeId)
async Task DiscoverAsync(IEnumerable<IEnr> startingNode, ArrayPoolSpan<byte> nodeId, bool disposeNodeId = true)
{
using ArrayPoolSpan<byte> _ = nodeId;

static int[] GetDistances(byte[] srcNodeId, in ArrayPoolSpan<byte> destNodeId)
try
{
const int WiderDistanceRange = 3;
static int[] GetDistances(byte[] srcNodeId, in ArrayPoolSpan<byte> destNodeId)
{
const int WiderDistanceRange = 3;

int[] distances = new int[WiderDistanceRange];
distances[0] = TableUtility.Log2Distance(srcNodeId, destNodeId);
int[] distances = new int[WiderDistanceRange];
distances[0] = TableUtility.Log2Distance(srcNodeId, destNodeId);

for (int n = 1, i = 1; n < WiderDistanceRange; i++)
{
if (distances[0] - i > 0)
for (int n = 1, i = 1; n < WiderDistanceRange; i++)
{
distances[n++] = distances[0] - i;
if (distances[0] - i > 0)
{
distances[n++] = distances[0] - i;
}
if (distances[0] + i <= 256)
{
distances[n++] = distances[0] + i;
}
}
if (distances[0] + i <= 256)
{
distances[n++] = distances[0] + i;
}
}

return distances;
}
return distances;
}

Queue<IEnr> nodesToCheck = new(startingNode);
HashSet<IEnr> checkedNodes = [];
Queue<IEnr> nodesToCheck = new(startingNode);
HashSet<IEnr> checkedNodes = [];

while (!token.IsCancellationRequested)
{
if (!nodesToCheck.TryDequeue(out IEnr? newEntry))
while (!token.IsCancellationRequested)
{
return;
}
if (!nodesToCheck.TryDequeue(out IEnr? newEntry))
{
return;
}

if (TryGetNodeFromEnr(newEntry, out Node? node2))
{
await discoveredNodesChannel.Writer.WriteAsync(node2!, token);
if (TryGetNodeFromEnr(newEntry, out Node? node2))
{
await discoveredNodesChannel.Writer.WriteAsync(node2!, token);

if (_logger.IsDebug) _logger.Debug($"A node discovered via discv5: {newEntry} = {node2}.");
if (_logger.IsDebug) _logger.Debug($"A node discovered via discv5: {newEntry} = {node2}.");

_discoveryReport?.NodeFound();
}
_discoveryReport?.NodeFound();
}

if (!checkedNodes.Add(newEntry))
{
continue;
}
if (!checkedNodes.Add(newEntry))
{
continue;
}

foreach (IEnr newEnr in await _discv5Protocol.SendFindNodeAsync(newEntry, GetDistances(newEntry.NodeId, in nodeId)) ?? [])
{
if (!checkedNodes.Contains(newEnr))
foreach (IEnr newEnr in await _discv5Protocol.SendFindNodeAsync(newEntry, GetDistances(newEntry.NodeId, in nodeId)) ?? [])
{
nodesToCheck.Enqueue(newEnr);
if (!checkedNodes.Contains(newEnr))
{
nodesToCheck.Enqueue(newEnr);
}
}
}
}
finally
{
if (disposeNodeId)
{
nodeId.Dispose();
}
}
}

IEnumerable<IEnr> GetStartingNodes() => _discv5Protocol.GetAllNodes;
Expand All @@ -284,7 +292,7 @@ static int[] GetDistances(byte[] srcNodeId, in ArrayPoolSpan<byte> destNodeId)
{
using ArrayPoolList<Task> discoverTasks = new(RandomNodesToLookupCount);

discoverTasks.Add(DiscoverAsync(GetStartingNodes(), selfNodeId));
discoverTasks.Add(DiscoverAsync(GetStartingNodes(), selfNodeId, false));

for (int i = 0; i < RandomNodesToLookupCount; i++)
{
Expand All @@ -294,6 +302,11 @@ static int[] GetDistances(byte[] srcNodeId, in ArrayPoolSpan<byte> destNodeId)
}

await Task.WhenAll(discoverTasks);
await Task.Delay(TimeSpan.FromSeconds(2), token);
}
catch (OperationCanceledException)
{
if (_logger.IsTrace) _logger.Trace($"Discovery has been stopped.");
}
catch (Exception ex)
{
Expand Down