Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Netorrent.Tests/Fakes/FakePeerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public ulong DecrementRequestedBlock()

public ulong DecrementUploadRequested() => _uploadRequestedCount--;

public ValueTask DisposeAsync() => ValueTask.CompletedTask;
public async ValueTask DisposeAsync() => SentBlocks.OnCompleted();

public ulong IncrementRequestedBlock()
{
Expand Down
33 changes: 32 additions & 1 deletion Netorrent.Tests/P2P/UploadSchedulerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class UploadSchedulerTests
{
//TODO use matrix to create multiple peers
[Test]
public async Task Should_Upload_To_Peer(CancellationToken cancellationToken)
public async Task Should_Upload_Block_To_Peer(CancellationToken cancellationToken)
{
var logger = NullLogger.Instance;
var bitfield = new Bitfield(5, true);
Expand Down Expand Up @@ -42,4 +42,35 @@ public async Task Should_Upload_To_Peer(CancellationToken cancellationToken)
block.Payload.Length.ShouldBe(0);
chokeState.ShouldBe(false);
}

[Test]
public async Task Should_Cancel_Block_To_Peer(CancellationToken cancellationToken)
{
var logger = NullLogger.Instance;
var bitfield = new Bitfield(5, true);
var peerConnection = new FakePeerConnection(bitfield);
await using var uploadScheduler = new UploadScheduler(
new Dictionary<PeerEndpoint, IPeerConnection>()
{
[peerConnection.PeerEndpoint] = peerConnection,
},
new FakePieceStorage(),
bitfield,
new DataStatistics(10),
logger
);
var requestBlock = new RequestBlock(0, 0, 0);
var amChokingTask = peerConnection.AmChoking.FirstAsync(i => i == false, cancellationToken);
var isEmptyTask = peerConnection.SentBlocks.IsEmptyAsync(cancellationToken);
peerConnection.PeerInterested.Value = true;
requestBlock.RequestedFrom.Add(peerConnection);
var uploadTask = uploadScheduler.StartAsync(cancellationToken);
var chokeState = await amChokingTask;
await uploadScheduler.AddRequestAsync(requestBlock, cancellationToken);
uploadScheduler.CancelRequest(requestBlock);
await Task.Delay(1000, cancellationToken);
await peerConnection.DisposeAsync();
(await isEmptyTask).ShouldBe(true);
chokeState.ShouldBe(false);
}
}
4 changes: 3 additions & 1 deletion Netorrent/P2P/Download/RequestScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ private async Task ScheduleTimeoutsBlocksAsync(CancellationToken cancellationTok
{
while (true)
{
_downloadMessageChannel.Writer.TryWrite(_timeoutMessage);
await _downloadMessageChannel
.Writer.WriteAsync(_timeoutMessage, cancellationToken)
.ConfigureAwait(false);
await Task.Delay(10.Seconds, cancellationToken).ConfigureAwait(false);
}
}
Expand Down
5 changes: 3 additions & 2 deletions Netorrent/P2P/Messages/RequestBlock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ public override bool Equals(object? obj)
return obj is RequestBlock request
&& Index == request.Index
&& Begin == request.Begin
&& Length == request.Length;
&& Length == request.Length
&& RequestedFrom.SequenceEqual(request.RequestedFrom);
}

public override int GetHashCode()
{
return HashCode.Combine(Index, Begin, Length);
return HashCode.Combine(Index, Begin, Length, RequestedFrom);
}
}
2 changes: 1 addition & 1 deletion Netorrent/P2P/PeerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ private void ReceiveCancel(Message message)
var begin = BinaryPrimitives.ReadInt32BigEndian(span[4..8]);
var length = BinaryPrimitives.ReadInt32BigEndian(span[8..12]);

var request = new RequestBlock(index, begin, length);
var request = new RequestBlock(index, begin, length) { RequestedFrom = [this] };
uploadScheduler.CancelRequest(request);
}

Expand Down
52 changes: 35 additions & 17 deletions Netorrent/P2P/Upload/UploadScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ ILogger logger
Channel.CreateBounded<UploadMessage>(
new BoundedChannelOptions(128) { SingleWriter = false, SingleReader = true }
);

private readonly Lock _cancelLock = new();
private readonly HashSet<RequestBlock> _requests = [];
private static readonly UploadMessage.CheckRoundMessage _checkRoundMessage = new();
private readonly TimeSpan _interval = 10.Seconds;

Expand Down Expand Up @@ -105,15 +106,21 @@ private async ValueTask ProcessRequestAsync(
CancellationToken cancellationToken
)
{
if (
requestBlock.State == RequestBlockState.Cancelled
|| requestBlock.RequestedFrom.Count == 0
)
var peer = requestBlock.RequestedFrom[0];

lock (_cancelLock)
{
return;
if (
_requests.TryGetValue(requestBlock, out var actualValue)
&& actualValue.State == RequestBlockState.Cancelled
)
{
_requests.Remove(actualValue);
peer.DecrementUploadRequested();
return;
}
}

var peer = requestBlock.RequestedFrom[0];
var pieceData = await pieceStorage
.ReadAsync(
requestBlock.Index,
Expand Down Expand Up @@ -148,6 +155,7 @@ CancellationToken cancellationToken
}
finally
{
_requests.Remove(requestBlock);
peer.DecrementUploadRequested();
}
}
Expand Down Expand Up @@ -283,25 +291,35 @@ CancellationToken cancellationToken
return;
}

from.IncrementUploadRequested();
await _uploadMessagesChannel
.Writer.WriteAsync(new UploadMessage.RequestBlockMessage(request), cancellationToken)
.ConfigureAwait(false);
if (_requests.Add(request))
{
from.IncrementUploadRequested();

await _uploadMessagesChannel
.Writer.WriteAsync(
new UploadMessage.RequestBlockMessage(request),
cancellationToken
)
.ConfigureAwait(false);
}
}

//TODO properly implement cancellation
public void CancelRequest(RequestBlock request)
public void CancelRequest(RequestBlock cancelled)
{
var from = request.RequestedFrom[0];

request.State = RequestBlockState.Cancelled;
from.DecrementUploadRequested();
lock (_cancelLock)
{
if (_requests.TryGetValue(cancelled, out var requestBlock))
{
cancelled.State = RequestBlockState.Cancelled;
}
}
}

private async ValueTask DrainChannelsAsync()
{
await foreach (var _ in _uploadMessagesChannel.Reader.ReadAllAsync().ConfigureAwait(false))
{ }
_requests.Clear();
}

public async ValueTask DisposeAsync()
Expand Down