From 8286c5938e052f9607bf08757e5e5a4ec24b2244 Mon Sep 17 00:00:00 2001 From: xBaank Date: Mon, 19 Jan 2026 19:13:48 +0100 Subject: [PATCH 1/2] Implement cancellation handling on upload --- Netorrent/P2P/Download/RequestScheduler.cs | 4 +- Netorrent/P2P/Messages/RequestBlock.cs | 5 ++- Netorrent/P2P/PeerConnection.cs | 2 +- Netorrent/P2P/Upload/UploadScheduler.cs | 52 +++++++++++++++------- 4 files changed, 42 insertions(+), 21 deletions(-) diff --git a/Netorrent/P2P/Download/RequestScheduler.cs b/Netorrent/P2P/Download/RequestScheduler.cs index 461e49ad..ffc56b2d 100644 --- a/Netorrent/P2P/Download/RequestScheduler.cs +++ b/Netorrent/P2P/Download/RequestScheduler.cs @@ -46,7 +46,9 @@ private async Task ScheduleTimeoutsBlocksAsync(CancellationToken cancellationTok { while (true) { - _downloadMessageChannel.Writer.TryWrite(_timeoutMessage); + await _downloadMessageChannel + .Writer.WriteAsync(_timeoutMessage, cancellationToken) + .ConfigureAwait(false); await Task.Delay(10.Seconds, cancellationToken).ConfigureAwait(false); } } diff --git a/Netorrent/P2P/Messages/RequestBlock.cs b/Netorrent/P2P/Messages/RequestBlock.cs index cd32a8df..4dfa8860 100644 --- a/Netorrent/P2P/Messages/RequestBlock.cs +++ b/Netorrent/P2P/Messages/RequestBlock.cs @@ -26,11 +26,12 @@ public override bool Equals(object? obj) return obj is RequestBlock request && Index == request.Index && Begin == request.Begin - && Length == request.Length; + && Length == request.Length + && RequestedFrom.SequenceEqual(request.RequestedFrom); } public override int GetHashCode() { - return HashCode.Combine(Index, Begin, Length); + return HashCode.Combine(Index, Begin, Length, RequestedFrom); } } diff --git a/Netorrent/P2P/PeerConnection.cs b/Netorrent/P2P/PeerConnection.cs index fb4c6136..80936edf 100644 --- a/Netorrent/P2P/PeerConnection.cs +++ b/Netorrent/P2P/PeerConnection.cs @@ -383,7 +383,7 @@ private void ReceiveCancel(Message message) var begin = BinaryPrimitives.ReadInt32BigEndian(span[4..8]); var length = BinaryPrimitives.ReadInt32BigEndian(span[8..12]); - var request = new RequestBlock(index, begin, length); + var request = new RequestBlock(index, begin, length) { RequestedFrom = [this] }; uploadScheduler.CancelRequest(request); } diff --git a/Netorrent/P2P/Upload/UploadScheduler.cs b/Netorrent/P2P/Upload/UploadScheduler.cs index 53dbe5eb..c8d915c1 100644 --- a/Netorrent/P2P/Upload/UploadScheduler.cs +++ b/Netorrent/P2P/Upload/UploadScheduler.cs @@ -25,7 +25,8 @@ ILogger logger Channel.CreateBounded( new BoundedChannelOptions(128) { SingleWriter = false, SingleReader = true } ); - + private readonly Lock _cancelLock = new(); + private readonly HashSet _requests = []; private static readonly UploadMessage.CheckRoundMessage _checkRoundMessage = new(); private readonly TimeSpan _interval = 10.Seconds; @@ -105,15 +106,21 @@ private async ValueTask ProcessRequestAsync( CancellationToken cancellationToken ) { - if ( - requestBlock.State == RequestBlockState.Cancelled - || requestBlock.RequestedFrom.Count == 0 - ) + var peer = requestBlock.RequestedFrom[0]; + + lock (_cancelLock) { - return; + if ( + _requests.TryGetValue(requestBlock, out var actualValue) + && actualValue.State == RequestBlockState.Cancelled + ) + { + _requests.Remove(actualValue); + peer.DecrementUploadRequested(); + return; + } } - var peer = requestBlock.RequestedFrom[0]; var pieceData = await pieceStorage .ReadAsync( requestBlock.Index, @@ -148,6 +155,7 @@ CancellationToken cancellationToken } finally { + _requests.Remove(requestBlock); peer.DecrementUploadRequested(); } } @@ -283,25 +291,35 @@ CancellationToken cancellationToken return; } - from.IncrementUploadRequested(); - await _uploadMessagesChannel - .Writer.WriteAsync(new UploadMessage.RequestBlockMessage(request), cancellationToken) - .ConfigureAwait(false); + if (_requests.Add(request)) + { + from.IncrementUploadRequested(); + + await _uploadMessagesChannel + .Writer.WriteAsync( + new UploadMessage.RequestBlockMessage(request), + cancellationToken + ) + .ConfigureAwait(false); + } } - //TODO properly implement cancellation - public void CancelRequest(RequestBlock request) + public void CancelRequest(RequestBlock cancelled) { - var from = request.RequestedFrom[0]; - - request.State = RequestBlockState.Cancelled; - from.DecrementUploadRequested(); + lock (_cancelLock) + { + if (_requests.TryGetValue(cancelled, out var requestBlock)) + { + cancelled.State = RequestBlockState.Cancelled; + } + } } private async ValueTask DrainChannelsAsync() { await foreach (var _ in _uploadMessagesChannel.Reader.ReadAllAsync().ConfigureAwait(false)) { } + _requests.Clear(); } public async ValueTask DisposeAsync() From 05334c6d8735645b8d689e95e77f9717f74b6bdf Mon Sep 17 00:00:00 2001 From: xBaank Date: Mon, 19 Jan 2026 19:35:26 +0100 Subject: [PATCH 2/2] Add test --- Netorrent.Tests/Fakes/FakePeerConnection.cs | 2 +- Netorrent.Tests/P2P/UploadSchedulerTests.cs | 33 ++++++++++++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/Netorrent.Tests/Fakes/FakePeerConnection.cs b/Netorrent.Tests/Fakes/FakePeerConnection.cs index 19f851d5..31752d69 100644 --- a/Netorrent.Tests/Fakes/FakePeerConnection.cs +++ b/Netorrent.Tests/Fakes/FakePeerConnection.cs @@ -59,7 +59,7 @@ public ulong DecrementRequestedBlock() public ulong DecrementUploadRequested() => _uploadRequestedCount--; - public ValueTask DisposeAsync() => ValueTask.CompletedTask; + public async ValueTask DisposeAsync() => SentBlocks.OnCompleted(); public ulong IncrementRequestedBlock() { diff --git a/Netorrent.Tests/P2P/UploadSchedulerTests.cs b/Netorrent.Tests/P2P/UploadSchedulerTests.cs index fbf4f85a..f5ff39a3 100644 --- a/Netorrent.Tests/P2P/UploadSchedulerTests.cs +++ b/Netorrent.Tests/P2P/UploadSchedulerTests.cs @@ -13,7 +13,7 @@ public class UploadSchedulerTests { //TODO use matrix to create multiple peers [Test] - public async Task Should_Upload_To_Peer(CancellationToken cancellationToken) + public async Task Should_Upload_Block_To_Peer(CancellationToken cancellationToken) { var logger = NullLogger.Instance; var bitfield = new Bitfield(5, true); @@ -42,4 +42,35 @@ public async Task Should_Upload_To_Peer(CancellationToken cancellationToken) block.Payload.Length.ShouldBe(0); chokeState.ShouldBe(false); } + + [Test] + public async Task Should_Cancel_Block_To_Peer(CancellationToken cancellationToken) + { + var logger = NullLogger.Instance; + var bitfield = new Bitfield(5, true); + var peerConnection = new FakePeerConnection(bitfield); + await using var uploadScheduler = new UploadScheduler( + new Dictionary() + { + [peerConnection.PeerEndpoint] = peerConnection, + }, + new FakePieceStorage(), + bitfield, + new DataStatistics(10), + logger + ); + var requestBlock = new RequestBlock(0, 0, 0); + var amChokingTask = peerConnection.AmChoking.FirstAsync(i => i == false, cancellationToken); + var isEmptyTask = peerConnection.SentBlocks.IsEmptyAsync(cancellationToken); + peerConnection.PeerInterested.Value = true; + requestBlock.RequestedFrom.Add(peerConnection); + var uploadTask = uploadScheduler.StartAsync(cancellationToken); + var chokeState = await amChokingTask; + await uploadScheduler.AddRequestAsync(requestBlock, cancellationToken); + uploadScheduler.CancelRequest(requestBlock); + await Task.Delay(1000, cancellationToken); + await peerConnection.DisposeAsync(); + (await isEmptyTask).ShouldBe(true); + chokeState.ShouldBe(false); + } }