Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Netorrent.Tests.Integration/Torrents/TorrentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace Netorrent.Tests.Integration.Torrents;

[ClassDataSource<OpenTrackerFixture>(Shared = SharedType.PerClass)]
[Timeout(2 * 60_000)]
[Timeout(4 * 60_000)]
public class TorrentTests(OpenTrackerFixture fixture)
{
private readonly OpenTrackerFixture _fixture = fixture;
Expand Down Expand Up @@ -80,8 +80,8 @@ public async Task Should_Download_Torrent_With_Different_Ips_And_Trackers(
UsedAddressProtocol.Ipv4 | UsedAddressProtocol.Ipv6
)]
UsedAddressProtocol usedAdressProtocol,
[Matrix(3)] int seedersCount,
[Matrix(12)] int leechersCount,
[Matrix(4)] int seedersCount,
[Matrix(30)] int leechersCount,
CancellationToken cancellationToken
)
{
Expand Down
37 changes: 27 additions & 10 deletions Netorrent.Tests/Fakes/FakePeerConnection.cs
Original file line number Diff line number Diff line change
@@ -1,38 +1,51 @@
using Netorrent.Extensions;
using Netorrent.P2P;
using Netorrent.P2P.Download;
using Netorrent.P2P.Measurement;
using Netorrent.P2P.Messages;
using R3;

internal class FakePeerConnection : IPeerConnection
internal class FakePeerConnection(Bitfield myBitfield) : IPeerConnection
{
public Subject<Block> SentBlocks = new();
public ReactiveProperty<bool> AmChoking => new(true);
public SynchronizedReactiveProperty<bool> AmChoking { get; } = new(true);

public ReactiveProperty<bool> AmInterested => new(false);
public SynchronizedReactiveProperty<bool> AmInterested { get; } = new(false);

public ReactiveProperty<bool> PeerChoking => new(true);
public SynchronizedReactiveProperty<bool> PeerChoking { get; } = new(true);

public ReactiveProperty<bool> PeerInterested => new(false);
public SynchronizedReactiveProperty<bool> PeerInterested { get; } = new(false);

public TimeSpan ConnectionDuration => throw new NotImplementedException();

public SpeedTracker DownloadSpeedTracker => throw new NotImplementedException();
public SpeedTracker DownloadTracker => new();

public SpeedTracker UploadSpeedTracker => throw new NotImplementedException();
public SpeedTracker UploadTracker => new();

public PeerEndpoint PeerEndpoint => throw new NotImplementedException();

public int RequestedBlocksCount => throw new NotImplementedException();

public int UploadRequestedBlocksCount => _uploadRequestedCount;

public Bitfield MyBitField => throw new NotImplementedException();
public Bitfield MyBitField => myBitfield;

public Bitfield? PeerBitField => throw new NotImplementedException();

public PeerRequestWindow PeerRequestWindow => throw new NotImplementedException();

ReadOnlyReactiveProperty<bool> IPeerConnection.AmChoking => AmChoking;

ReadOnlyReactiveProperty<bool> IPeerConnection.AmInterested => AmInterested;

ReadOnlyReactiveProperty<bool> IPeerConnection.PeerChoking => PeerChoking;

ReadOnlyReactiveProperty<bool> IPeerConnection.PeerInterested => PeerInterested;

public TimeSpan TimeSinceReceivedBlock => 0.Seconds;

public TimeSpan TimeSinceSentBlock => 0.Seconds;

private int _uploadRequestedCount = 0;

public int DecrementRequestedBlock()
Expand Down Expand Up @@ -72,9 +85,13 @@ public bool TrySendRequest(RequestBlock nextBlock)
throw new NotImplementedException();
}

public bool TrySendUnchoked()
public async ValueTask UnchokeAsync(CancellationToken cancellationToken)
{
AmChoking.Value = false;
return true;
}

public async ValueTask ChokeAsync(CancellationToken cancellationToken)
{
AmChoking.Value = true;
}
}
17 changes: 7 additions & 10 deletions Netorrent.Tests/Fakes/FakeUploadScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ namespace Netorrent.Tests.Fakes;

internal class FakeUploadScheduler : IUploadScheduler
{
public async ValueTask AddPeerAsync(
IPeerConnection peerConnection,
CancellationToken cancellationToken
) { }

public ValueTask AddRequestAsync(RequestBlock request, CancellationToken cancellationToken)
{
throw new NotImplementedException();
Expand All @@ -18,18 +23,10 @@ public void CancelRequest(RequestBlock request)

public ValueTask DisposeAsync() => ValueTask.CompletedTask;

public ValueTask FreeSlotAsync(
public async ValueTask RemovePeerAsync(
IPeerConnection peerConnection,
CancellationToken cancellationToken
) => ValueTask.CompletedTask;

public ValueTask RequestSlotAsync(
IPeerConnection peerConnection,
CancellationToken cancellationToken
)
{
throw new NotImplementedException();
}
) { }

public Task StartAsync(CancellationToken cancellationToken) =>
Task.Delay(-1, cancellationToken);
Expand Down
2 changes: 1 addition & 1 deletion Netorrent.Tests/P2P/PeerConectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public async Task Should_Send_Unchoke(CancellationToken cancellationToken)

_ = peerConnection.StartAsync(cancellationToken);
var statesTask = peerConnection.AmChoking.Take(2).ToListAsync(cancellationToken);
peerConnection.TrySendUnchoked();
await peerConnection.UnchokeAsync(cancellationToken);
var states = await statesTask;

states.First().ShouldBeTrue();
Expand Down
15 changes: 9 additions & 6 deletions Netorrent.Tests/P2P/UploadSchedulerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,27 @@ public class UploadSchedulerTests
public async Task Should_Upload_To_Peer(CancellationToken cancellationToken)
{
var logger = NullLogger.Instance;
await using var peerConnection = new FakePeerConnection();
var bitfield = new Bitfield(5, true);
await using var peerConnection = new FakePeerConnection(bitfield);
await using var uploadScheduler = new UploadScheduler(
new FakePieceStorage(),
new Bitfield(5, true),
bitfield,
new TransferStatistics(10),
logger
);
var requestBlock = new RequestBlock(0, 0, 0);
var amChokingTask = peerConnection.AmChoking.FirstAsync(cancellationToken);
var amChokingTask = peerConnection.AmChoking.FirstAsync(i => i == false, cancellationToken);
var blockTask = peerConnection.SentBlocks.FirstAsync(cancellationToken);
peerConnection.PeerInterested.Value = true;
requestBlock.RequestedFrom.Add(peerConnection);
await uploadScheduler.RequestSlotAsync(peerConnection, cancellationToken);
await uploadScheduler.AddRequestAsync(requestBlock, cancellationToken);
await uploadScheduler.AddPeerAsync(peerConnection, cancellationToken);
var uploadTask = uploadScheduler.StartAsync(cancellationToken);
var chokeState = await amChokingTask;
await uploadScheduler.AddRequestAsync(requestBlock, cancellationToken);
var block = await blockTask;
block.Index.ShouldBe(0);
block.Begin.ShouldBe(0);
block.Payload.Length.ShouldBe(0);
(await amChokingTask).ShouldBe(true);
chokeState.ShouldBe(false);
}
}
15 changes: 15 additions & 0 deletions Netorrent/Extensions/SemaphoreSlimExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Netorrent.Other;

namespace Netorrent.Extensions;

public static class SemaphoreSlimExtensions
{
extension(SemaphoreSlim semaphoreSlim)
{
public async Task<SemaphoreSlimDisposable> LockAsync(CancellationToken cancellationToken)
{
await semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
return new SemaphoreSlimDisposable(semaphoreSlim);
}
}
}
6 changes: 6 additions & 0 deletions Netorrent/Other/SemaphoreSlimDisposable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Netorrent.Other;

public readonly struct SemaphoreSlimDisposable(SemaphoreSlim semaphoreSlim) : IDisposable
{
public void Dispose() => semaphoreSlim.Release();
}
2 changes: 1 addition & 1 deletion Netorrent/P2P/Download/PeerRequestWindow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ private void CalculateWindow(long bytesPerSecond)
public Timer StartSampling(TimeSpan period, SpeedTracker speedTracker)
{
return new Timer(
_ => CalculateWindow((long)speedTracker.CurrentBps.Bps),
_ => CalculateWindow((long)speedTracker.Speed.Bps),
null,
TimeSpan.Zero,
period
Expand Down
32 changes: 4 additions & 28 deletions Netorrent/P2P/Download/RequestScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ ILogger logger
) : IRequestScheduler
{
const int MinPeersForRarity = 6;
const int MinPeers = 6;
const int MaxPeers = 10;

private readonly Channel<Block> _receiveBlocksChannel = Channel.CreateBounded<Block>(
new BoundedChannelOptions(256) { SingleWriter = false, SingleReader = true }
Expand All @@ -30,10 +28,8 @@ ILogger logger
);

private readonly HashSet<IPeerConnection> _activePeers = [];
private readonly List<IPeerConnection> _interestedPeers = [];
private readonly Dictionary<int, PieceBuffer> _pieceBuffers = [];
private readonly Lock _activePeersLock = new();
private int _maxCurrentPeers = MinPeers;
private CancellationTokenSource? _cts;
private Task? _runningTask;
private bool _disposed;
Expand Down Expand Up @@ -183,7 +179,7 @@ private async Task WarmupAsync(CancellationToken cancellationToken)
lock (_activePeersLock)
{
minPeersReady = _activePeers.Count(i =>
i.AmInterested.Value && !i.PeerChoking.Value
i.AmInterested.CurrentValue && !i.PeerChoking.CurrentValue
);
}

Expand Down Expand Up @@ -236,25 +232,13 @@ public async ValueTask RequestSlotAsync(
CancellationToken cancellationToken
)
{
bool shouldEnqueue;

lock (_activePeersLock)
{
if (_activePeers.Count >= _maxCurrentPeers)
{
_interestedPeers.Add(peerConnection);
return;
}
_activePeers.Add(peerConnection);
shouldEnqueue = true;
}

if (shouldEnqueue)
{
await _slotsChannel
.Writer.WriteAsync(peerConnection, cancellationToken)
.ConfigureAwait(false);
}
await _slotsChannel
.Writer.WriteAsync(peerConnection, cancellationToken)
.ConfigureAwait(false);
}

public async ValueTask FreeSlotAsync(
Expand All @@ -266,15 +250,7 @@ CancellationToken cancellationToken

lock (_activePeersLock)
{
_interestedPeers.Remove(peerConnection);
_activePeers.Remove(peerConnection);

if (_interestedPeers.Count > 0)
{
nextPeer = _interestedPeers[0];
_interestedPeers.RemoveAt(0);
_activePeers.Add(nextPeer);
}
}

if (nextPeer is not null)
Expand Down
17 changes: 10 additions & 7 deletions Netorrent/P2P/IPeerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@ namespace Netorrent.P2P;

internal interface IPeerConnection : IAsyncDisposable
{
ReactiveProperty<bool> AmChoking { get; }
ReactiveProperty<bool> AmInterested { get; }
ReactiveProperty<bool> PeerChoking { get; }
ReactiveProperty<bool> PeerInterested { get; }
ReadOnlyReactiveProperty<bool> AmChoking { get; }
ReadOnlyReactiveProperty<bool> AmInterested { get; }
ReadOnlyReactiveProperty<bool> PeerChoking { get; }
ReadOnlyReactiveProperty<bool> PeerInterested { get; }
TimeSpan ConnectionDuration { get; }
SpeedTracker DownloadSpeedTracker { get; }
SpeedTracker UploadSpeedTracker { get; }
SpeedTracker DownloadTracker { get; }
SpeedTracker UploadTracker { get; }
Bitfield MyBitField { get; }
Bitfield? PeerBitField { get; }
PeerEndpoint PeerEndpoint { get; }
PeerRequestWindow PeerRequestWindow { get; }
int RequestedBlocksCount { get; }
int UploadRequestedBlocksCount { get; }
TimeSpan TimeSinceReceivedBlock { get; }
TimeSpan TimeSinceSentBlock { get; }

int DecrementRequestedBlock();
int DecrementUploadRequested();
Expand All @@ -28,6 +30,7 @@ internal interface IPeerConnection : IAsyncDisposable
bool TrySendBlock(Block block);
bool TrySendCancel(RequestBlock request);
bool TrySendRequest(RequestBlock nextBlock);
bool TrySendUnchoked();
ValueTask UnchokeAsync(CancellationToken cancellationToken);
ValueTask ChokeAsync(CancellationToken cancellationToken);
Task StartAsync(CancellationToken cancellationToken);
}
21 changes: 12 additions & 9 deletions Netorrent/P2P/Measurement/DownloadSpeed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
/// Mbps, and Gbps. The struct supports arithmetic operations and implicit conversion from a double value representing
/// bits per second.</remarks>
/// <param name="bps">The download speed in bits per second (bps).</param>
public readonly struct DownloadSpeed(double bps)
public readonly record struct DownloadSpeed(double Bps)
{
public double Bps => bps;
public double Kbps => bps / 1_000d;
public double Mbps => bps / 1_000_000d;
public double Gbps => bps / 1_000_000_000d;
public double Kbps => Bps / 1_000d;
public double Mbps => Bps / 1_000_000d;
public double Gbps => Bps / 1_000_000_000d;

public static implicit operator DownloadSpeed(double bps) => new(bps);

Expand All @@ -22,14 +21,18 @@ public readonly struct DownloadSpeed(double bps)
public static DownloadSpeed operator -(DownloadSpeed left, DownloadSpeed right) =>
new(left.Bps - right.Bps);

public static bool operator >(DownloadSpeed left, DownloadSpeed right) => left.Bps > right.Bps;

public static bool operator <(DownloadSpeed left, DownloadSpeed right) => left.Bps < right.Bps;

public override string ToString()
{
if (bps >= 1_000_000_000)
if (Bps >= 1_000_000_000)
return $"{Gbps:F2}/Gbps";
if (bps >= 1_000_000)
if (Bps >= 1_000_000)
return $"{Mbps:F2}/Mbps";
if (bps >= 1_000)
if (Bps >= 1_000)
return $"{Kbps:F2}/Kbps";
return $"{bps:F2}/bps";
return $"{Bps:F2}/bps";
}
}
2 changes: 1 addition & 1 deletion Netorrent/P2P/Measurement/SpeedTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public sealed class SpeedTracker(double alpha = 0.3)
private readonly double _alpha = alpha;

public ByteSize TotalBytes => Interlocked.Read(ref _totalBytes);
public DownloadSpeed CurrentBps => Volatile.Read(ref _currentBps);
public DownloadSpeed Speed => Volatile.Read(ref _currentBps);

internal Timer StartSampling(TimeSpan period)
{
Expand Down
Loading