diff --git a/Netorrent.Tests.Integration/Torrents/TorrentTests.cs b/Netorrent.Tests.Integration/Torrents/TorrentTests.cs index a408f559..719534ba 100644 --- a/Netorrent.Tests.Integration/Torrents/TorrentTests.cs +++ b/Netorrent.Tests.Integration/Torrents/TorrentTests.cs @@ -10,7 +10,7 @@ namespace Netorrent.Tests.Integration.Torrents; [ClassDataSource(Shared = SharedType.PerClass)] -[Timeout(2 * 60_000)] +[Timeout(4 * 60_000)] public class TorrentTests(OpenTrackerFixture fixture) { private readonly OpenTrackerFixture _fixture = fixture; @@ -80,8 +80,8 @@ public async Task Should_Download_Torrent_With_Different_Ips_And_Trackers( UsedAddressProtocol.Ipv4 | UsedAddressProtocol.Ipv6 )] UsedAddressProtocol usedAdressProtocol, - [Matrix(3)] int seedersCount, - [Matrix(12)] int leechersCount, + [Matrix(4)] int seedersCount, + [Matrix(30)] int leechersCount, CancellationToken cancellationToken ) { diff --git a/Netorrent.Tests/Fakes/FakePeerConnection.cs b/Netorrent.Tests/Fakes/FakePeerConnection.cs index 3c75f2ce..79618d81 100644 --- a/Netorrent.Tests/Fakes/FakePeerConnection.cs +++ b/Netorrent.Tests/Fakes/FakePeerConnection.cs @@ -1,25 +1,26 @@ +using Netorrent.Extensions; using Netorrent.P2P; using Netorrent.P2P.Download; using Netorrent.P2P.Measurement; using Netorrent.P2P.Messages; using R3; -internal class FakePeerConnection : IPeerConnection +internal class FakePeerConnection(Bitfield myBitfield) : IPeerConnection { public Subject SentBlocks = new(); - public ReactiveProperty AmChoking => new(true); + public SynchronizedReactiveProperty AmChoking { get; } = new(true); - public ReactiveProperty AmInterested => new(false); + public SynchronizedReactiveProperty AmInterested { get; } = new(false); - public ReactiveProperty PeerChoking => new(true); + public SynchronizedReactiveProperty PeerChoking { get; } = new(true); - public ReactiveProperty PeerInterested => new(false); + public SynchronizedReactiveProperty PeerInterested { get; } = new(false); public TimeSpan ConnectionDuration => throw new NotImplementedException(); - public SpeedTracker DownloadSpeedTracker => throw new NotImplementedException(); + public SpeedTracker DownloadTracker => new(); - public SpeedTracker UploadSpeedTracker => throw new NotImplementedException(); + public SpeedTracker UploadTracker => new(); public PeerEndpoint PeerEndpoint => throw new NotImplementedException(); @@ -27,12 +28,24 @@ internal class FakePeerConnection : IPeerConnection public int UploadRequestedBlocksCount => _uploadRequestedCount; - public Bitfield MyBitField => throw new NotImplementedException(); + public Bitfield MyBitField => myBitfield; public Bitfield? PeerBitField => throw new NotImplementedException(); public PeerRequestWindow PeerRequestWindow => throw new NotImplementedException(); + ReadOnlyReactiveProperty IPeerConnection.AmChoking => AmChoking; + + ReadOnlyReactiveProperty IPeerConnection.AmInterested => AmInterested; + + ReadOnlyReactiveProperty IPeerConnection.PeerChoking => PeerChoking; + + ReadOnlyReactiveProperty IPeerConnection.PeerInterested => PeerInterested; + + public TimeSpan TimeSinceReceivedBlock => 0.Seconds; + + public TimeSpan TimeSinceSentBlock => 0.Seconds; + private int _uploadRequestedCount = 0; public int DecrementRequestedBlock() @@ -72,9 +85,13 @@ public bool TrySendRequest(RequestBlock nextBlock) throw new NotImplementedException(); } - public bool TrySendUnchoked() + public async ValueTask UnchokeAsync(CancellationToken cancellationToken) { AmChoking.Value = false; - return true; + } + + public async ValueTask ChokeAsync(CancellationToken cancellationToken) + { + AmChoking.Value = true; } } diff --git a/Netorrent.Tests/Fakes/FakeUploadScheduler.cs b/Netorrent.Tests/Fakes/FakeUploadScheduler.cs index 44630f99..ff01a6b3 100644 --- a/Netorrent.Tests/Fakes/FakeUploadScheduler.cs +++ b/Netorrent.Tests/Fakes/FakeUploadScheduler.cs @@ -6,6 +6,11 @@ namespace Netorrent.Tests.Fakes; internal class FakeUploadScheduler : IUploadScheduler { + public async ValueTask AddPeerAsync( + IPeerConnection peerConnection, + CancellationToken cancellationToken + ) { } + public ValueTask AddRequestAsync(RequestBlock request, CancellationToken cancellationToken) { throw new NotImplementedException(); @@ -18,18 +23,10 @@ public void CancelRequest(RequestBlock request) public ValueTask DisposeAsync() => ValueTask.CompletedTask; - public ValueTask FreeSlotAsync( + public async ValueTask RemovePeerAsync( IPeerConnection peerConnection, CancellationToken cancellationToken - ) => ValueTask.CompletedTask; - - public ValueTask RequestSlotAsync( - IPeerConnection peerConnection, - CancellationToken cancellationToken - ) - { - throw new NotImplementedException(); - } + ) { } public Task StartAsync(CancellationToken cancellationToken) => Task.Delay(-1, cancellationToken); diff --git a/Netorrent.Tests/P2P/PeerConectionTests.cs b/Netorrent.Tests/P2P/PeerConectionTests.cs index 3ef17441..cda707d9 100644 --- a/Netorrent.Tests/P2P/PeerConectionTests.cs +++ b/Netorrent.Tests/P2P/PeerConectionTests.cs @@ -55,7 +55,7 @@ public async Task Should_Send_Unchoke(CancellationToken cancellationToken) _ = peerConnection.StartAsync(cancellationToken); var statesTask = peerConnection.AmChoking.Take(2).ToListAsync(cancellationToken); - peerConnection.TrySendUnchoked(); + await peerConnection.UnchokeAsync(cancellationToken); var states = await statesTask; states.First().ShouldBeTrue(); diff --git a/Netorrent.Tests/P2P/UploadSchedulerTests.cs b/Netorrent.Tests/P2P/UploadSchedulerTests.cs index 4b3336ef..23a3cbfa 100644 --- a/Netorrent.Tests/P2P/UploadSchedulerTests.cs +++ b/Netorrent.Tests/P2P/UploadSchedulerTests.cs @@ -15,24 +15,27 @@ public class UploadSchedulerTests public async Task Should_Upload_To_Peer(CancellationToken cancellationToken) { var logger = NullLogger.Instance; - await using var peerConnection = new FakePeerConnection(); + var bitfield = new Bitfield(5, true); + await using var peerConnection = new FakePeerConnection(bitfield); await using var uploadScheduler = new UploadScheduler( new FakePieceStorage(), - new Bitfield(5, true), + bitfield, new TransferStatistics(10), logger ); var requestBlock = new RequestBlock(0, 0, 0); - var amChokingTask = peerConnection.AmChoking.FirstAsync(cancellationToken); + var amChokingTask = peerConnection.AmChoking.FirstAsync(i => i == false, cancellationToken); var blockTask = peerConnection.SentBlocks.FirstAsync(cancellationToken); + peerConnection.PeerInterested.Value = true; requestBlock.RequestedFrom.Add(peerConnection); - await uploadScheduler.RequestSlotAsync(peerConnection, cancellationToken); - await uploadScheduler.AddRequestAsync(requestBlock, cancellationToken); + await uploadScheduler.AddPeerAsync(peerConnection, cancellationToken); var uploadTask = uploadScheduler.StartAsync(cancellationToken); + var chokeState = await amChokingTask; + await uploadScheduler.AddRequestAsync(requestBlock, cancellationToken); var block = await blockTask; block.Index.ShouldBe(0); block.Begin.ShouldBe(0); block.Payload.Length.ShouldBe(0); - (await amChokingTask).ShouldBe(true); + chokeState.ShouldBe(false); } } diff --git a/Netorrent/Extensions/SemaphoreSlimExtensions.cs b/Netorrent/Extensions/SemaphoreSlimExtensions.cs new file mode 100644 index 00000000..ad592379 --- /dev/null +++ b/Netorrent/Extensions/SemaphoreSlimExtensions.cs @@ -0,0 +1,15 @@ +using Netorrent.Other; + +namespace Netorrent.Extensions; + +public static class SemaphoreSlimExtensions +{ + extension(SemaphoreSlim semaphoreSlim) + { + public async Task LockAsync(CancellationToken cancellationToken) + { + await semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false); + return new SemaphoreSlimDisposable(semaphoreSlim); + } + } +} diff --git a/Netorrent/Other/SemaphoreSlimDisposable.cs b/Netorrent/Other/SemaphoreSlimDisposable.cs new file mode 100644 index 00000000..96169634 --- /dev/null +++ b/Netorrent/Other/SemaphoreSlimDisposable.cs @@ -0,0 +1,6 @@ +namespace Netorrent.Other; + +public readonly struct SemaphoreSlimDisposable(SemaphoreSlim semaphoreSlim) : IDisposable +{ + public void Dispose() => semaphoreSlim.Release(); +} diff --git a/Netorrent/P2P/Download/PeerRequestWindow.cs b/Netorrent/P2P/Download/PeerRequestWindow.cs index 00dc2295..6ae9336e 100644 --- a/Netorrent/P2P/Download/PeerRequestWindow.cs +++ b/Netorrent/P2P/Download/PeerRequestWindow.cs @@ -20,7 +20,7 @@ private void CalculateWindow(long bytesPerSecond) public Timer StartSampling(TimeSpan period, SpeedTracker speedTracker) { return new Timer( - _ => CalculateWindow((long)speedTracker.CurrentBps.Bps), + _ => CalculateWindow((long)speedTracker.Speed.Bps), null, TimeSpan.Zero, period diff --git a/Netorrent/P2P/Download/RequestScheduler.cs b/Netorrent/P2P/Download/RequestScheduler.cs index 93e3f737..5b10751e 100644 --- a/Netorrent/P2P/Download/RequestScheduler.cs +++ b/Netorrent/P2P/Download/RequestScheduler.cs @@ -18,8 +18,6 @@ ILogger logger ) : IRequestScheduler { const int MinPeersForRarity = 6; - const int MinPeers = 6; - const int MaxPeers = 10; private readonly Channel _receiveBlocksChannel = Channel.CreateBounded( new BoundedChannelOptions(256) { SingleWriter = false, SingleReader = true } @@ -30,10 +28,8 @@ ILogger logger ); private readonly HashSet _activePeers = []; - private readonly List _interestedPeers = []; private readonly Dictionary _pieceBuffers = []; private readonly Lock _activePeersLock = new(); - private int _maxCurrentPeers = MinPeers; private CancellationTokenSource? _cts; private Task? _runningTask; private bool _disposed; @@ -183,7 +179,7 @@ private async Task WarmupAsync(CancellationToken cancellationToken) lock (_activePeersLock) { minPeersReady = _activePeers.Count(i => - i.AmInterested.Value && !i.PeerChoking.Value + i.AmInterested.CurrentValue && !i.PeerChoking.CurrentValue ); } @@ -236,25 +232,13 @@ public async ValueTask RequestSlotAsync( CancellationToken cancellationToken ) { - bool shouldEnqueue; - lock (_activePeersLock) { - if (_activePeers.Count >= _maxCurrentPeers) - { - _interestedPeers.Add(peerConnection); - return; - } _activePeers.Add(peerConnection); - shouldEnqueue = true; - } - - if (shouldEnqueue) - { - await _slotsChannel - .Writer.WriteAsync(peerConnection, cancellationToken) - .ConfigureAwait(false); } + await _slotsChannel + .Writer.WriteAsync(peerConnection, cancellationToken) + .ConfigureAwait(false); } public async ValueTask FreeSlotAsync( @@ -266,15 +250,7 @@ CancellationToken cancellationToken lock (_activePeersLock) { - _interestedPeers.Remove(peerConnection); _activePeers.Remove(peerConnection); - - if (_interestedPeers.Count > 0) - { - nextPeer = _interestedPeers[0]; - _interestedPeers.RemoveAt(0); - _activePeers.Add(nextPeer); - } } if (nextPeer is not null) diff --git a/Netorrent/P2P/IPeerConnection.cs b/Netorrent/P2P/IPeerConnection.cs index 8bcdf32c..57d63188 100644 --- a/Netorrent/P2P/IPeerConnection.cs +++ b/Netorrent/P2P/IPeerConnection.cs @@ -7,19 +7,21 @@ namespace Netorrent.P2P; internal interface IPeerConnection : IAsyncDisposable { - ReactiveProperty AmChoking { get; } - ReactiveProperty AmInterested { get; } - ReactiveProperty PeerChoking { get; } - ReactiveProperty PeerInterested { get; } + ReadOnlyReactiveProperty AmChoking { get; } + ReadOnlyReactiveProperty AmInterested { get; } + ReadOnlyReactiveProperty PeerChoking { get; } + ReadOnlyReactiveProperty PeerInterested { get; } TimeSpan ConnectionDuration { get; } - SpeedTracker DownloadSpeedTracker { get; } - SpeedTracker UploadSpeedTracker { get; } + SpeedTracker DownloadTracker { get; } + SpeedTracker UploadTracker { get; } Bitfield MyBitField { get; } Bitfield? PeerBitField { get; } PeerEndpoint PeerEndpoint { get; } PeerRequestWindow PeerRequestWindow { get; } int RequestedBlocksCount { get; } int UploadRequestedBlocksCount { get; } + TimeSpan TimeSinceReceivedBlock { get; } + TimeSpan TimeSinceSentBlock { get; } int DecrementRequestedBlock(); int DecrementUploadRequested(); @@ -28,6 +30,7 @@ internal interface IPeerConnection : IAsyncDisposable bool TrySendBlock(Block block); bool TrySendCancel(RequestBlock request); bool TrySendRequest(RequestBlock nextBlock); - bool TrySendUnchoked(); + ValueTask UnchokeAsync(CancellationToken cancellationToken); + ValueTask ChokeAsync(CancellationToken cancellationToken); Task StartAsync(CancellationToken cancellationToken); } diff --git a/Netorrent/P2P/Measurement/DownloadSpeed.cs b/Netorrent/P2P/Measurement/DownloadSpeed.cs index ff1038fe..d2192a01 100644 --- a/Netorrent/P2P/Measurement/DownloadSpeed.cs +++ b/Netorrent/P2P/Measurement/DownloadSpeed.cs @@ -7,12 +7,11 @@ /// Mbps, and Gbps. The struct supports arithmetic operations and implicit conversion from a double value representing /// bits per second. /// The download speed in bits per second (bps). -public readonly struct DownloadSpeed(double bps) +public readonly record struct DownloadSpeed(double Bps) { - public double Bps => bps; - public double Kbps => bps / 1_000d; - public double Mbps => bps / 1_000_000d; - public double Gbps => bps / 1_000_000_000d; + public double Kbps => Bps / 1_000d; + public double Mbps => Bps / 1_000_000d; + public double Gbps => Bps / 1_000_000_000d; public static implicit operator DownloadSpeed(double bps) => new(bps); @@ -22,14 +21,18 @@ public readonly struct DownloadSpeed(double bps) public static DownloadSpeed operator -(DownloadSpeed left, DownloadSpeed right) => new(left.Bps - right.Bps); + public static bool operator >(DownloadSpeed left, DownloadSpeed right) => left.Bps > right.Bps; + + public static bool operator <(DownloadSpeed left, DownloadSpeed right) => left.Bps < right.Bps; + public override string ToString() { - if (bps >= 1_000_000_000) + if (Bps >= 1_000_000_000) return $"{Gbps:F2}/Gbps"; - if (bps >= 1_000_000) + if (Bps >= 1_000_000) return $"{Mbps:F2}/Mbps"; - if (bps >= 1_000) + if (Bps >= 1_000) return $"{Kbps:F2}/Kbps"; - return $"{bps:F2}/bps"; + return $"{Bps:F2}/bps"; } } diff --git a/Netorrent/P2P/Measurement/SpeedTracker.cs b/Netorrent/P2P/Measurement/SpeedTracker.cs index 4f989670..962c2cde 100644 --- a/Netorrent/P2P/Measurement/SpeedTracker.cs +++ b/Netorrent/P2P/Measurement/SpeedTracker.cs @@ -17,7 +17,7 @@ public sealed class SpeedTracker(double alpha = 0.3) private readonly double _alpha = alpha; public ByteSize TotalBytes => Interlocked.Read(ref _totalBytes); - public DownloadSpeed CurrentBps => Volatile.Read(ref _currentBps); + public DownloadSpeed Speed => Volatile.Read(ref _currentBps); internal Timer StartSampling(TimeSpan period) { diff --git a/Netorrent/P2P/PeerConnection.cs b/Netorrent/P2P/PeerConnection.cs index 201bef7b..5635babc 100644 --- a/Netorrent/P2P/PeerConnection.cs +++ b/Netorrent/P2P/PeerConnection.cs @@ -25,32 +25,45 @@ internal class PeerConnection( bool peerInterested = false ) : IPeerConnection { + private readonly SynchronizedReactiveProperty _amChoking = new(amChoking); + private readonly SynchronizedReactiveProperty _amInterested = new(amInterested); + private readonly SynchronizedReactiveProperty _peerChoking = new(peerChoking); + private readonly SynchronizedReactiveProperty _peerInterested = new(peerInterested); private readonly Lock _stateLock = new(); private DateTimeOffset _lastSentMessageTime; private DateTimeOffset _lastReceivedMessageTime; + private DateTimeOffset _lastSentBlock; + private DateTimeOffset _lastReceivedBlock; private CancellationTokenSource? _cancellationTokenSource; private DateTimeOffset _startedConnectionTime; private Task? _runTask; private bool _disposed; - public SpeedTracker DownloadSpeedTracker { get; } = new(); - public SpeedTracker UploadSpeedTracker { get; } = new(); + public SpeedTracker DownloadTracker { get; } = new(); + public SpeedTracker UploadTracker { get; } = new(); public Bitfield MyBitField { get; } = myBitField; - public ReactiveProperty AmChoking { get; private set; } = new(amChoking); - public ReactiveProperty AmInterested { get; private set; } = new(amInterested); - public ReactiveProperty PeerChoking { get; private set; } = new(peerChoking); - public ReactiveProperty PeerInterested { get; private set; } = new(peerInterested); public Bitfield? PeerBitField { get; private set; } public PeerEndpoint PeerEndpoint { get; } = peerEndpoint; public PeerRequestWindow PeerRequestWindow { get; } = peerRequestWindow; + public TimeSpan TimeSinceReceivedBlock => DateTimeOffset.UtcNow - _lastReceivedBlock; + public TimeSpan TimeSinceSentBlock => DateTimeOffset.UtcNow - _lastSentBlock; + private int _requestedBlocksCount; private int _uploadRequestedCount; public int RequestedBlocksCount => Volatile.Read(ref _requestedBlocksCount); public int UploadRequestedBlocksCount => Volatile.Read(ref _uploadRequestedCount); - public TimeSpan ConnectionDuration => DateTime.UtcNow - _startedConnectionTime; + public TimeSpan ConnectionDuration => DateTimeOffset.UtcNow - _startedConnectionTime; + + public ReadOnlyReactiveProperty AmChoking => _amChoking; + + public ReadOnlyReactiveProperty AmInterested => _amInterested; + + public ReadOnlyReactiveProperty PeerChoking => _peerChoking; + + public ReadOnlyReactiveProperty PeerInterested => _peerInterested; public Task StartAsync(CancellationToken cancellationToken) { @@ -70,20 +83,21 @@ public Task StartAsync(CancellationToken cancellationToken) private async Task RunAsync(CancellationTokenSource cancellationTokenSource) { await SendBitfieldAsync(MyBitField, cancellationTokenSource.Token).ConfigureAwait(false); + await uploadScheduler.AddPeerAsync(this, cancellationTokenSource.Token); using var stateChangedDisposable = MyBitField.StateChanged.SubscribeAwait( async (i, ct) => await SendHaveAsync(i, ct), configureAwait: false ); - await using var downloadTimer = DownloadSpeedTracker + await using var downloadTimer = DownloadTracker .StartSampling(500.Milliseconds) .ConfigureAwait(false); - await using var uploadTimer = UploadSpeedTracker + await using var uploadTimer = UploadTracker .StartSampling(500.Milliseconds) .ConfigureAwait(false); await using var requestWindowTimer = PeerRequestWindow - .StartSampling(500.Milliseconds, DownloadSpeedTracker) + .StartSampling(500.Milliseconds, DownloadTracker) .ConfigureAwait(false); try @@ -214,27 +228,25 @@ CancellationToken cancellationToken private async ValueTask ReceiveInterestedAsync(CancellationToken cancellationToken) { - if (!PeerInterested.Value) + if (!_peerInterested.Value) { - PeerInterested.Value = true; - await uploadScheduler.RequestSlotAsync(this, cancellationToken).ConfigureAwait(false); + _peerInterested.Value = true; } } private async ValueTask ReceiveNotInterestedAsync(CancellationToken cancellationToken) { - if (PeerInterested.Value) + if (_peerInterested.Value) { - PeerInterested.Value = false; - await SendChokedAsync(cancellationToken).ConfigureAwait(false); + _peerInterested.Value = false; } } private async ValueTask ReceiveChokeAsync(CancellationToken cancellationToken) { - if (!PeerChoking.Value) + if (!_peerChoking.Value) { - PeerChoking.Value = true; + _peerChoking.Value = true; await requestScheduler.FreeSlotAsync(this, cancellationToken).ConfigureAwait(false); } } @@ -257,9 +269,9 @@ private async ValueTask ReceiveHaveAsync(Message message, CancellationToken canc private async ValueTask ReceiveUnchokeAsync(CancellationToken cancellationToken) { - if (PeerChoking.Value) + if (_peerChoking.Value) { - PeerChoking.Value = false; + _peerChoking.Value = false; await requestScheduler.RequestSlotAsync(this, cancellationToken).ConfigureAwait(false); } } @@ -269,7 +281,7 @@ private async ValueTask ReceiveRequestAsync( CancellationToken cancellationToken ) { - if (AmChoking.Value) + if (_amChoking.Value) { return; } @@ -308,7 +320,8 @@ public bool TrySendBlock(Block block) var pieceMessage = Message.CreatePiece(block.Index, block.Begin, block.Payload); if (TryWriteMessage(pieceMessage)) { - UploadSpeedTracker.AddBytes(block.Payload.Length); + UploadTracker.AddBytes(block.Payload.Length); + _lastSentBlock = DateTimeOffset.UtcNow; return true; } return false; @@ -328,7 +341,8 @@ private async ValueTask ReceiveBlockAsync(Message message, CancellationToken can span[8..].CopyTo(rented.Memory.Span); var block = new Block(index, begin, rented, this); await requestScheduler.ReceiveBlockAsync(block, cancellationToken).ConfigureAwait(false); - DownloadSpeedTracker.AddBytes(payloadLength); + DownloadTracker.AddBytes(payloadLength); + _lastReceivedBlock = DateTimeOffset.UtcNow; } private void ReceiveCancel(Message message) @@ -361,9 +375,9 @@ private async ValueTask CheckInterestAsync(CancellationToken cancellationToken) lock (_stateLock) { - if (interest != AmInterested.Value) + if (interest != _amInterested.Value) { - AmInterested.Value = interest; + _amInterested.Value = interest; valueChanged = true; } } @@ -378,33 +392,30 @@ private async ValueTask CheckInterestAsync(CancellationToken cancellationToken) } if (interest) { - AmInterested.Value = interest; + _amInterested.Value = interest; var message = Message.CreateInterested(); await WriteMessageAsync(message, cancellationToken).ConfigureAwait(false); } } } - private async ValueTask SendChokedAsync(CancellationToken cancellationToken) + public async ValueTask UnchokeAsync(CancellationToken cancellationToken) { - if (AmChoking.Value != true) + if (_amChoking.Value) { - AmChoking.Value = true; - var message = Message.CreateChoke(); - await WriteMessageAsync(message, cancellationToken).ConfigureAwait(false); - await uploadScheduler.FreeSlotAsync(this, cancellationToken).ConfigureAwait(false); + _amChoking.Value = false; + await WriteMessageAsync(Message.CreateUnchoke(), cancellationToken) + .ConfigureAwait(false); } } - public bool TrySendUnchoked() + public async ValueTask ChokeAsync(CancellationToken cancellationToken) { - if (AmChoking.Value != false) + if (!_amChoking.Value) { - AmChoking.Value = false; - var message = Message.CreateUnchoke(); - return TryWriteMessage(message); + _amChoking.Value = true; + await WriteMessageAsync(Message.CreateChoke(), cancellationToken).ConfigureAwait(false); } - return false; } private async ValueTask SendBitfieldAsync( @@ -478,7 +489,7 @@ public async ValueTask DisposeAsync() UnregisterPieces(PeerBitField); } - await uploadScheduler.FreeSlotAsync(this, default).ConfigureAwait(false); + await uploadScheduler.RemovePeerAsync(this, default).ConfigureAwait(false); await requestScheduler.FreeSlotAsync(this, default).ConfigureAwait(false); _cancellationTokenSource?.Cancel(); diff --git a/Netorrent/P2P/Upload/IUploadScheduler.cs b/Netorrent/P2P/Upload/IUploadScheduler.cs index f4b5ce86..a1ca8f6b 100644 --- a/Netorrent/P2P/Upload/IUploadScheduler.cs +++ b/Netorrent/P2P/Upload/IUploadScheduler.cs @@ -7,6 +7,6 @@ internal interface IUploadScheduler : IAsyncDisposable Task StartAsync(CancellationToken cancellationToken); ValueTask AddRequestAsync(RequestBlock request, CancellationToken cancellationToken); void CancelRequest(RequestBlock request); - ValueTask RequestSlotAsync(IPeerConnection peerConnection, CancellationToken cancellationToken); - ValueTask FreeSlotAsync(IPeerConnection peerConnection, CancellationToken cancellationToken); + ValueTask AddPeerAsync(IPeerConnection peerConnection, CancellationToken cancellationToken); + ValueTask RemovePeerAsync(IPeerConnection peerConnection, CancellationToken cancellationToken); } diff --git a/Netorrent/P2P/Upload/UploadScheduler.cs b/Netorrent/P2P/Upload/UploadScheduler.cs index 9e74629a..fa1fd280 100644 --- a/Netorrent/P2P/Upload/UploadScheduler.cs +++ b/Netorrent/P2P/Upload/UploadScheduler.cs @@ -1,14 +1,19 @@ -using System.Net.NetworkInformation; +using System.Threading; using System.Threading.Channels; using Microsoft.Extensions.Logging; -using Microsoft.VisualBasic; using Netorrent.Extensions; using Netorrent.IO; using Netorrent.P2P.Messages; using Netorrent.Statistics; +using Netorrent.TorrentFile; +using R3; +using ZLinq; namespace Netorrent.P2P.Upload; +//https://www.bittorrent.org/beps/bep_0003.html +//https://read.seas.harvard.edu/~kohler/pubs/legout07clustering.pdf Section 2.3 + internal class UploadScheduler( IPieceStorage pieceStorage, Bitfield bitfield, @@ -16,16 +21,19 @@ internal class UploadScheduler( ILogger logger ) : IUploadScheduler { - const int MaxUnchokedPeers = 4; + const int MaxActivePeers = 4; //TODO Add an option for this to be changed private readonly Channel _pendingRequests = Channel.CreateBounded( new BoundedChannelOptions(256) { SingleWriter = false, SingleReader = true } ); - private readonly List _interestedPeers = []; - private readonly HashSet _unchokedPeers = []; - private readonly Lock _unchokedSlotsLock = new(); + private readonly Dictionary _interestedDisposables = []; + private readonly HashSet _connectedPeers = []; + private readonly HashSet _activePeers = []; + private readonly SemaphoreSlim _semaphore = new(1); + private readonly TimeSpan _interval = 10.Seconds; + private byte _round = 1; private CancellationTokenSource? _cts; private Task? _runningTask; private bool _disposed; @@ -37,10 +45,45 @@ public Task StartAsync(CancellationToken cancellationToken) _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _runningTask = _cts.CancelOnFirstCompletionAndAwaitAllAsync([ ProcessRequestsAsync(_cts.Token), + ProcessRegularChokingAsync(_cts.Token), ]); return _runningTask; } + public async Task ProcessRegularChokingAsync(CancellationToken cancellationToken) + { + while (true) + { + await RunRoundAsync(cancellationToken).ConfigureAwait(false); + await Task.Delay(_interval, cancellationToken).ConfigureAwait(false); + } + } + + private async Task RunRoundAsync(CancellationToken cancellationToken) + { + using (await _semaphore.LockAsync(cancellationToken).ConfigureAwait(false)) + { + try + { + await RegularChokeAsync(cancellationToken).ConfigureAwait(false); + _round++; + + if (_round == 3) + { + await OptimisticChokeAsync(cancellationToken).ConfigureAwait(false); + _round = 1; + } + } + catch (Exception ex) + { + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError(ex, "Error running round {round}", _round); + } + } + } + } + public async Task ProcessRequestsAsync(CancellationToken cancellationToken) { await foreach ( @@ -76,7 +119,7 @@ var requestBlock in _pendingRequests if (peer.TrySendBlock(block)) { - transfer.AddUploadedBytes(block.Payload.Length); //TODO Move this to message stream after data if flushed + transfer.AddUploadedBytes(block.Payload.Length); //TODO Move this to message stream after data if flushed ? } } catch (Exception ex) @@ -100,51 +143,159 @@ var requestBlock in _pendingRequests } } - public async ValueTask RequestSlotAsync( - IPeerConnection peerConnection, - CancellationToken cancellationToken - ) + private async ValueTask OptimisticChokeAsync(CancellationToken cancellationToken) { - bool canUnchoke; + var peerToUnchoke = _connectedPeers + .AsValueEnumerable() + .Where(i => i.AmChoking.CurrentValue) + .Where(i => i.PeerInterested.CurrentValue) + .Shuffle() + .FirstOrDefault(); + + if (peerToUnchoke is null) + { + return; + } - lock (_unchokedSlotsLock) + if (_activePeers.Count == MaxActivePeers) { - if (_unchokedPeers.Count >= MaxUnchokedPeers) + var worstPeer = _activePeers + .AsValueEnumerable() + .OrderBy(i => + i.MyBitField.IsComplete + ? i.UploadTracker.Speed.Bps + : i.DownloadTracker.Speed.Bps + ) + .First(); + + if (_activePeers.Remove(worstPeer)) { - _interestedPeers.Add(peerConnection); - return; + await worstPeer.ChokeAsync(cancellationToken).ConfigureAwait(false); } + } + + await peerToUnchoke.UnchokeAsync(cancellationToken).ConfigureAwait(false); + + if (!peerToUnchoke.AmChoking.CurrentValue && peerToUnchoke.PeerInterested.CurrentValue) + { + _activePeers.Add(peerToUnchoke); + } + } + + //TODO implement new choke algorithm as seeder to avoid free riders + private async ValueTask RegularChokeAsync(CancellationToken cancellationToken) + { + var bestPeersByDownload = _connectedPeers + .AsValueEnumerable() + .OrderByDescending(i => + i.MyBitField.IsComplete ? i.UploadTracker.Speed.Bps : i.DownloadTracker.Speed.Bps + ) + .ToArray(); + + var toUnchokeCount = MaxActivePeers - 1; + var activePeersCount = 0; - canUnchoke = true; + foreach (var peer in bestPeersByDownload) + { + //Choke and ignore peers that were not active in the last 30 seconds + if (peer.TimeSinceReceivedBlock >= 30.Seconds) + { + _activePeers.Remove(peer); + await peer.ChokeAsync(cancellationToken).ConfigureAwait(false); + continue; + } - if (canUnchoke && peerConnection.TrySendUnchoked()) + //If the peer is already as active downloader we do nothing + if (_activePeers.Contains(peer)) { - _unchokedPeers.Add(peerConnection); + activePeersCount++; + continue; } + + //Peer can be unchoked + if (activePeersCount < toUnchokeCount) + { + await peer.UnchokeAsync(cancellationToken).ConfigureAwait(false); + } + //Peer can't be added so it's choked and removed if it's an active downloader + else + { + await peer.ChokeAsync(cancellationToken).ConfigureAwait(false); + _activePeers.Remove(peer); + } + + //Peer is interested and unchoked (we add it) + if (!peer.AmChoking.CurrentValue && peer.PeerInterested.CurrentValue) + { + _activePeers.Add(peer); + activePeersCount++; + } + } + } + + private async ValueTask CheckRoundAsync( + IPeerConnection peerConnection, + CancellationToken cancellationToken + ) + { + if (peerConnection.PeerInterested.CurrentValue && !peerConnection.AmChoking.CurrentValue) + { + await RunRoundAsync(cancellationToken).ConfigureAwait(false); } } - public async ValueTask FreeSlotAsync( + public async ValueTask AddPeerAsync( IPeerConnection peerConnection, CancellationToken cancellationToken ) { - IPeerConnection? nextPeer = null; + var subscription = peerConnection.PeerInterested.SubscribeAwait( + (_, c) => CheckRoundAsync(peerConnection, c), + configureAwait: false + ); - lock (_unchokedSlotsLock) + using (await _semaphore.LockAsync(cancellationToken).ConfigureAwait(false)) { - _interestedPeers.Remove(peerConnection); - _unchokedPeers.Remove(peerConnection); - if (_interestedPeers.Count > 0) + if (_connectedPeers.Add(peerConnection)) { - nextPeer = _interestedPeers[0]; - _interestedPeers.RemoveAt(0); + _interestedDisposables[peerConnection] = subscription; + } + else + { + subscription.Dispose(); } } + } - if (nextPeer?.TrySendUnchoked() == true) + public async ValueTask RemovePeerAsync( + IPeerConnection peerConnection, + CancellationToken cancellationToken + ) + { + using (await _semaphore.LockAsync(cancellationToken).ConfigureAwait(false)) { - _unchokedPeers.Add(nextPeer); + if (_connectedPeers.Remove(peerConnection)) + { + if (_interestedDisposables.TryGetValue(peerConnection, out var disposable)) + { + disposable.Dispose(); + _interestedDisposables.Remove(peerConnection); + } + _activePeers.Remove(peerConnection); + } + } + + await CheckRoundAsync(peerConnection, cancellationToken).ConfigureAwait(false); + try + { + await peerConnection.ChokeAsync(cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError(ex, "Error unchoking {peer}", peerConnection.PeerEndpoint); + } } } @@ -155,21 +306,22 @@ CancellationToken cancellationToken { var from = request.RequestedFrom[0]; - lock (_unchokedSlotsLock) + if (!bitfield.HasPiece(request.Index)) { - if (!bitfield.HasPiece(request.Index)) + if (logger.IsEnabled(LogLevel.Information)) { - if (logger.IsEnabled(LogLevel.Information)) - { - logger.LogInformation( - "Peer {peer} requested a block we don't have", - from.PeerEndpoint.PeerId - ); - } - - return; + logger.LogInformation( + "Peer {peer} requested a block we don't have", + from.PeerEndpoint.PeerId + ); } - if (!_unchokedPeers.Contains(from)) + + return; + } + using (await _semaphore.LockAsync(cancellationToken).ConfigureAwait(false)) + { + //TODO penalize? + if (!_connectedPeers.TryGetValue(from, out var peer) || peer.AmChoking.CurrentValue) { if (logger.IsEnabled(LogLevel.Information)) { @@ -184,6 +336,7 @@ CancellationToken cancellationToken await _pendingRequests.Writer.WriteAsync(request, cancellationToken).ConfigureAwait(false); } + //TODO properly implement cancellation public void CancelRequest(RequestBlock request) { var from = request.RequestedFrom[0]; @@ -194,6 +347,13 @@ public void CancelRequest(RequestBlock request) private async ValueTask DrainChannelsAsync() { + using (await _semaphore.LockAsync(default).ConfigureAwait(false)) + { + foreach (var item in _interestedDisposables.Values) + { + item.Dispose(); + } + } await foreach (var _ in _pendingRequests.Reader.ReadAllAsync().ConfigureAwait(false)) { } } @@ -214,6 +374,7 @@ public async ValueTask DisposeAsync() await DrainChannelsAsync().ConfigureAwait(false); _cts?.Dispose(); + _semaphore.Dispose(); } } } diff --git a/Netorrent/Statistics/PeerStatistics.cs b/Netorrent/Statistics/PeerStatistics.cs index 43ff8a23..505e2c94 100644 --- a/Netorrent/Statistics/PeerStatistics.cs +++ b/Netorrent/Statistics/PeerStatistics.cs @@ -20,11 +20,10 @@ private ValueEnumerable< > PeersNotChocking => _peersClient .ActivePeers.Values.AsValueEnumerable() - .Where(i => !i.PeerChoking.Value && i.AmInterested.Value); + .Where(i => !i.PeerChoking.CurrentValue && i.AmInterested.CurrentValue); public int ActivePeers => PeersNotChocking.Count(); public int TotalPeers => _peersClient.ActivePeers.Values.AsValueEnumerable().Count(); - public DownloadSpeed DownloadSpeed => - PeersNotChocking.Sum(p => p.DownloadSpeedTracker.CurrentBps.Bps); + public DownloadSpeed DownloadSpeed => PeersNotChocking.Sum(p => p.DownloadTracker.Speed.Bps); }