diff --git a/Netorrent.Example/Program.cs b/Netorrent.Example/Program.cs index 5d4b2b42..f86ffb9c 100644 --- a/Netorrent.Example/Program.cs +++ b/Netorrent.Example/Program.cs @@ -236,6 +236,7 @@ await AnsiConsole RemainingStyle = new Style(Color.Grey), }, new PercentageColumn(), + new TransferSpeedColumn(), new RemainingTimeColumn(), new SpinnerColumn() ) diff --git a/Netorrent.Tests.Integration/Torrents/TorrentTests.cs b/Netorrent.Tests.Integration/Torrents/TorrentTests.cs index 8bf8d841..b1d0402e 100644 --- a/Netorrent.Tests.Integration/Torrents/TorrentTests.cs +++ b/Netorrent.Tests.Integration/Torrents/TorrentTests.cs @@ -454,7 +454,7 @@ CancellationToken cancellationToken var seederTorrent = await seeder.CreateTorrentAsync( path, _fixture.AnnounceUrl, - [.. _fixture.AnnounceUrls] + [_fixture.AnnounceUrls] ); yield return (seederTorrent, seeder); diff --git a/Netorrent.Tests/Fakes/FakePeerConnection.cs b/Netorrent.Tests/Fakes/FakePeerConnection.cs index 19f851d5..31752d69 100644 --- a/Netorrent.Tests/Fakes/FakePeerConnection.cs +++ b/Netorrent.Tests/Fakes/FakePeerConnection.cs @@ -59,7 +59,7 @@ public ulong DecrementRequestedBlock() public ulong DecrementUploadRequested() => _uploadRequestedCount--; - public ValueTask DisposeAsync() => ValueTask.CompletedTask; + public async ValueTask DisposeAsync() => SentBlocks.OnCompleted(); public ulong IncrementRequestedBlock() { diff --git a/Netorrent.Tests/Fakes/FakePiecePicker.cs b/Netorrent.Tests/Fakes/FakePiecePicker.cs index ddc7f49d..1f6b1755 100644 --- a/Netorrent.Tests/Fakes/FakePiecePicker.cs +++ b/Netorrent.Tests/Fakes/FakePiecePicker.cs @@ -9,20 +9,15 @@ internal class FakePiecePicker : IPiecePicker { public int BlockSize => 16 * 1024; - public void CompletePiece(int index) - { - throw new NotImplementedException(); - } + public bool IsEndGame => throw new NotImplementedException(); - public void CompleteRequestBlock(RequestBlock requestBlock) + public void CompletePiece(int index) { throw new NotImplementedException(); } public void DecreaseRarity(int index) { } - public ValueTask DisposeAsync() => ValueTask.CompletedTask; - public long GetBitfieldSize() { throw new NotImplementedException(); @@ -55,12 +50,10 @@ public IEnumerable GetTimeoutRequestBlocks() public void IncreaseRarity(int index) { } - public void SetBlockToPending(RequestBlock requestBlock) - { - throw new NotImplementedException(); - } - - public void SetBlockToRequested(RequestBlock requestBlock, IPeerConnection peerConnection) + public bool TryGetRequestBlock( + IPeerConnection peerConnection, + [NotNullWhen(true)] out RequestBlock? requestBlock + ) { throw new NotImplementedException(); } diff --git a/Netorrent.Tests/Netorrent.Tests.csproj b/Netorrent.Tests/Netorrent.Tests.csproj index 0314e0c8..7558e417 100644 --- a/Netorrent.Tests/Netorrent.Tests.csproj +++ b/Netorrent.Tests/Netorrent.Tests.csproj @@ -16,7 +16,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/Netorrent.Tests/P2P/UploadSchedulerTests.cs b/Netorrent.Tests/P2P/UploadSchedulerTests.cs index fbf4f85a..f5ff39a3 100644 --- a/Netorrent.Tests/P2P/UploadSchedulerTests.cs +++ b/Netorrent.Tests/P2P/UploadSchedulerTests.cs @@ -13,7 +13,7 @@ public class UploadSchedulerTests { //TODO use matrix to create multiple peers [Test] - public async Task Should_Upload_To_Peer(CancellationToken cancellationToken) + public async Task Should_Upload_Block_To_Peer(CancellationToken cancellationToken) { var logger = NullLogger.Instance; var bitfield = new Bitfield(5, true); @@ -42,4 +42,35 @@ public async Task Should_Upload_To_Peer(CancellationToken cancellationToken) block.Payload.Length.ShouldBe(0); chokeState.ShouldBe(false); } + + [Test] + public async Task Should_Cancel_Block_To_Peer(CancellationToken cancellationToken) + { + var logger = NullLogger.Instance; + var bitfield = new Bitfield(5, true); + var peerConnection = new FakePeerConnection(bitfield); + await using var uploadScheduler = new UploadScheduler( + new Dictionary() + { + [peerConnection.PeerEndpoint] = peerConnection, + }, + new FakePieceStorage(), + bitfield, + new DataStatistics(10), + logger + ); + var requestBlock = new RequestBlock(0, 0, 0); + var amChokingTask = peerConnection.AmChoking.FirstAsync(i => i == false, cancellationToken); + var isEmptyTask = peerConnection.SentBlocks.IsEmptyAsync(cancellationToken); + peerConnection.PeerInterested.Value = true; + requestBlock.RequestedFrom.Add(peerConnection); + var uploadTask = uploadScheduler.StartAsync(cancellationToken); + var chokeState = await amChokingTask; + await uploadScheduler.AddRequestAsync(requestBlock, cancellationToken); + uploadScheduler.CancelRequest(requestBlock); + await Task.Delay(1000, cancellationToken); + await peerConnection.DisposeAsync(); + (await isEmptyTask).ShouldBe(true); + chokeState.ShouldBe(false); + } } diff --git a/Netorrent.Tests/PublicApi/ApiTest.My_API_Has_No_Changes.approved.txt b/Netorrent.Tests/PublicApi/ApiTest.My_API_Has_No_Changes.approved.txt index aa18629e..a7deef76 100644 --- a/Netorrent.Tests/PublicApi/ApiTest.My_API_Has_No_Changes.approved.txt +++ b/Netorrent.Tests/PublicApi/ApiTest.My_API_Has_No_Changes.approved.txt @@ -207,16 +207,16 @@ namespace Netorrent.TorrentFile.FileStructure } public class MetaInfo : System.IEquatable { - public MetaInfo(Netorrent.TorrentFile.FileStructure.Info Info, string Announce, System.Collections.Generic.List? AnnounceList = null, long? CreationDate = default, string? Comment = null, string? CreatedBy = null, string? Encoding = null, string? Title = null, System.Collections.Generic.List? UrlList = null) { } + public MetaInfo(Netorrent.TorrentFile.FileStructure.Info Info, string Announce, System.Collections.Generic.IReadOnlyList? AnnounceList = null, long? CreationDate = default, string? Comment = null, string? CreatedBy = null, string? Encoding = null, string? Title = null, System.Collections.Generic.IReadOnlyList? UrlList = null) { } public string Announce { get; init; } - public System.Collections.Generic.List? AnnounceList { get; init; } + public System.Collections.Generic.IReadOnlyList? AnnounceList { get; init; } public string? Comment { get; init; } public string? CreatedBy { get; init; } public long? CreationDate { get; init; } public string? Encoding { get; init; } public Netorrent.TorrentFile.FileStructure.Info Info { get; init; } public string? Title { get; init; } - public System.Collections.Generic.List? UrlList { get; init; } + public System.Collections.Generic.IReadOnlyList? UrlList { get; init; } public Netorrent.Bencoding.Structs.BDictionary ToBDictionary() { } } } @@ -265,7 +265,7 @@ namespace Netorrent.TorrentFile public sealed class TorrentClient : System.IAsyncDisposable { public TorrentClient(System.Func? action = null) { } - public System.Threading.Tasks.ValueTask CreateTorrentAsync(string path, string announceUrl, System.Collections.Generic.List? announceUrls = null, System.Collections.Generic.List? webUrls = null, int pieceLength = 262144, System.Threading.CancellationToken cancellationToken = default) { } + public System.Threading.Tasks.ValueTask CreateTorrentAsync(string path, string announceUrl, System.Collections.Generic.List? announceUrls = null, System.Collections.Generic.List? webUrls = null, int pieceLength = 262144, System.Threading.CancellationToken cancellationToken = default) { } public System.Threading.Tasks.ValueTask DisposeAsync() { } public Netorrent.TorrentFile.Torrent LoadTorrent(Netorrent.TorrentFile.FileStructure.MetaInfo metaInfo, string outputDirectory, int[]? downloadedPieces = null) { } public System.Threading.Tasks.ValueTask LoadTorrentAsync(string path, string outputDirectory, int[]? downloadedPieces = null, System.Threading.CancellationToken cancellationToken = default) { } diff --git a/Netorrent.Tests/Torrents/TorrentFileTests.cs b/Netorrent.Tests/Torrents/TorrentFileTests.cs index 1dc9840d..239673d3 100644 --- a/Netorrent.Tests/Torrents/TorrentFileTests.cs +++ b/Netorrent.Tests/Torrents/TorrentFileTests.cs @@ -64,7 +64,9 @@ public async Task Should_Create_Torrent_File_From_Directory(CancellationToken ca await using var torrent = await torrentClient.CreateTorrentAsync( "Data/MultifileTest", "http://test.com", - ["http://test.com"], + [ + ["http://test.com"], + ], ["http://test.com"], cancellationToken: cancellationToken ); @@ -90,7 +92,9 @@ public async Task Should_Create_Torrent_File_From_File(CancellationToken cancell await using var torrent = await torrentClient.CreateTorrentAsync( "Data/MultifileTest/test.txt", "http://test.com", - ["http://test.com"], + [ + ["http://test.com"], + ], ["http://test.com"], cancellationToken: cancellationToken ); @@ -112,7 +116,9 @@ await torrentClient .CreateTorrentAsync( "Data/ASdasd", "http://test.com", - ["http://test.com"], + [ + ["http://test.com"], + ], ["http://test.com"], cancellationToken: cancellationToken ) @@ -128,7 +134,9 @@ await torrentClient .CreateTorrentAsync( "Data/MultifileTest/adasdasd.txt", "http://test.com", - ["http://test.com"], + [ + ["http://test.com"], + ], ["http://test.com"], cancellationToken: cancellationToken ) @@ -146,7 +154,9 @@ public async Task Should_Verify_File(string path, CancellationToken cancellation await using var torrent = await torrentClient.CreateTorrentAsync( path, "http://test.com", - ["http://test.com"], + [ + ["http://test.com"], + ], ["http://test.com"], pieceLength: pieceLength, cancellationToken: cancellationToken @@ -177,7 +187,9 @@ CancellationToken cancellationToken await using var torrent = await torrentClient.CreateTorrentAsync( path, "http://test.com", - ["http://test.com"], + [ + ["http://test.com"], + ], ["http://test.com"], pieceLength: pieceLength, cancellationToken: cancellationToken diff --git a/Netorrent.Tests/Tracker/TrackerTests.cs b/Netorrent.Tests/Tracker/TrackerTests.cs index 665d538e..fa1909a8 100644 --- a/Netorrent.Tests/Tracker/TrackerTests.cs +++ b/Netorrent.Tests/Tracker/TrackerTests.cs @@ -3,7 +3,9 @@ using System.Threading.Channels; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using Netorrent.Exceptions; using Netorrent.Extensions; +using Netorrent.P2P.Messages; using Netorrent.Tests.Extensions; using Netorrent.Tests.Fakes; using Netorrent.TorrentFile.Options; @@ -63,15 +65,14 @@ public async Task Should_get_peers_from_udp_tracker(CancellationToken cancellati ); var udptracker = new UdpTracker( + new Bitfield(5), udptrackerManager, 1, new(3), new(), ctx.Channel.Writer, new byte[20], - "null", - new(IPAddress.Loopback, 1), - ctx.Logger + new(IPAddress.Loopback, 1) ); using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); @@ -96,13 +97,13 @@ public async Task Should_get_peers_from_http_tracker(CancellationToken cancellat var ctx = CreateDefaultContext(); var httpTracker = new HttpTracker( + new Bitfield(5), 1, new Statistics.DataStatistics(3), new FakeHttpTrackerHandler(ctx.Ips, ctx.Interval), new(), new byte[20], "null", - ctx.Logger, ctx.Channel ); @@ -128,29 +129,20 @@ public async Task Should_not_get_peers_from_http_tracker(CancellationToken cance var ctx = CreateDefaultContext(); var httpTracker = new HttpTracker( + new Bitfield(5), 1, new Statistics.DataStatistics(3), new FakeHttpTrackerHandler(ctx.Ips, ctx.Interval, new Exception()), new(), new byte[20], "null", - ctx.Logger, ctx.Channel ); - using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - var trackerTask = httpTracker.StartAsync(cts.Token).AsTask(); - - await Task.Delay(5.Seconds, cancellationToken); - cts.Cancel(); - ctx.Channel.Writer.TryComplete(); - - var ipendpoints = await ctx - .Channel.Reader.ReadAllAsync(cancellationToken) - .ToArrayAsync(cancellationToken: cancellationToken) - .AsTask(); - - ipendpoints.Length.ShouldBe(0); + await httpTracker + .StartAsync(cancellationToken) + .AsTask() + .ShouldThrowAsync(); } [Test] @@ -165,30 +157,74 @@ public async Task Should_not_get_peers_from_udp_tracker(CancellationToken cancel ); var udptracker = new UdpTracker( + new Bitfield(5), udptrackerManager, 1, new(3), new(), ctx.Channel.Writer, new byte[20], - "null", - new(IPAddress.Loopback, 1), - ctx.Logger + new(IPAddress.Loopback, 1) ); - using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - var trackerTask = udptracker.StartAsync(cts.Token).AsTask(); + await udptracker + .StartAsync(cancellationToken) + .AsTask() + .ShouldThrowAsync(); + } - await Task.Delay(5.Seconds, cancellationToken); - cts.Cancel(); - ctx.Channel.Writer.TryComplete(); + [Test] + public async Task Should_not_get_peers_from_tracker_client(CancellationToken cancellationToken) + { + var ctx = CreateDefaultContext(); - var ipendpoints = await ctx - .Channel.Reader.ReadAllAsync(cancellationToken) - .ToArrayAsync(cancellationToken: cancellationToken) - .AsTask(); + await using var udptrackerManagerv4 = new FakeUdpTrackerTransactionManager( + [.. ctx.Ips.Where(i => i.AddressFamily == AddressFamily.InterNetwork)], + ctx.Interval, + new Exception() + ); + + await using var udptrackerManagerv6 = new FakeUdpTrackerTransactionManager( + [.. ctx.Ips.Where(i => i.AddressFamily == AddressFamily.InterNetworkV6)], + ctx.Interval, + new Exception() + ); - ipendpoints.Length.ShouldBe(0); + using var httpTrackerHandlerv4 = new FakeHttpTrackerHandler( + [.. ctx.Ips.Where(i => i.AddressFamily == AddressFamily.InterNetwork)], + ctx.Interval, + new Exception() + ); + using var httpTrackerHandlerv6 = new FakeHttpTrackerHandler( + [.. ctx.Ips.Where(i => i.AddressFamily == AddressFamily.InterNetworkV6)], + ctx.Interval, + new Exception() + ); + + var trackerHandlers = new TrackerHandlers( + httpTrackerHandlerv4, + udptrackerManagerv4, + httpTrackerHandlerv6, + udptrackerManagerv6 + ); + + await using var trackerClient = new TrackerClient( + new Bitfield(5), + trackerHandlers, + UsedTrackers.Http | UsedTrackers.Udp, + 1, + new(3), + new(), + ctx.Channel, + [ + ["udp://localhost:1", "https://localhost:2"], + ["http://localhost:3", "aaaa://localhost:4"], + ], + new byte[20], + ctx.Logger + ); + + await trackerClient.StartAsync(cancellationToken).ShouldNotThrowAsync(); } [Test] @@ -196,21 +232,34 @@ public async Task Should_get_peers_from_tracker_client(CancellationToken cancell { var ctx = CreateDefaultContext(); - await using var udptrackerManager = new FakeUdpTrackerTransactionManager( - ctx.Ips, + await using var udptrackerManagerv4 = new FakeUdpTrackerTransactionManager( + [.. ctx.Ips.Where(i => i.AddressFamily == AddressFamily.InterNetwork)], + ctx.Interval + ); + + await using var udptrackerManagerv6 = new FakeUdpTrackerTransactionManager( + [.. ctx.Ips.Where(i => i.AddressFamily == AddressFamily.InterNetworkV6)], ctx.Interval ); - var httpTrackerHandler = new FakeHttpTrackerHandler(ctx.Ips, ctx.Interval); + using var httpTrackerHandlerv4 = new FakeHttpTrackerHandler( + [.. ctx.Ips.Where(i => i.AddressFamily == AddressFamily.InterNetwork)], + ctx.Interval + ); + using var httpTrackerHandlerv6 = new FakeHttpTrackerHandler( + [.. ctx.Ips.Where(i => i.AddressFamily == AddressFamily.InterNetworkV6)], + ctx.Interval + ); var trackerHandlers = new TrackerHandlers( - httpTrackerHandler, - udptrackerManager, - httpTrackerHandler, - udptrackerManager + httpTrackerHandlerv4, + udptrackerManagerv4, + httpTrackerHandlerv6, + udptrackerManagerv6 ); await using var trackerClient = new TrackerClient( + new Bitfield(5), trackerHandlers, UsedTrackers.Http | UsedTrackers.Udp, 1, @@ -218,10 +267,8 @@ public async Task Should_get_peers_from_tracker_client(CancellationToken cancell new(), ctx.Channel, [ - "udp://localhost:1", - "https://localhost:2", - "http://localhost:3", - "aaaa://localhost:4", + ["udp://localhost:1", "https://localhost:2"], + ["http://localhost:3", "aaaa://localhost:4"], ], new byte[20], ctx.Logger @@ -239,6 +286,7 @@ public async Task Should_get_peers_from_tracker_client(CancellationToken cancell IPEndPoint[] resultIps = [.. ctx.Ips, .. ctx.Ips, .. ctx.Ips, .. ctx.Ips]; ipendpoints.Length.ShouldBe(ctx.Ips.Length * 4); - ipendpoints.ShouldBe(resultIps); + var sorted = ipendpoints.OrderBy(i => i.ToString()).ToArray(); + sorted.ShouldBe([.. resultIps.OrderBy(i => i.ToString())]); } } diff --git a/Netorrent/Exceptions/AnnounceException.cs b/Netorrent/Exceptions/AnnounceException.cs new file mode 100644 index 00000000..f50cf0f1 --- /dev/null +++ b/Netorrent/Exceptions/AnnounceException.cs @@ -0,0 +1,3 @@ +namespace Netorrent.Exceptions; + +internal class AnnounceException(string? message = null) : Exception(message); diff --git a/Netorrent/P2P/Download/IPiecePicker.cs b/Netorrent/P2P/Download/IPiecePicker.cs index ed34f211..6471579c 100644 --- a/Netorrent/P2P/Download/IPiecePicker.cs +++ b/Netorrent/P2P/Download/IPiecePicker.cs @@ -1,24 +1,27 @@ using System.Diagnostics.CodeAnalysis; using Netorrent.P2P.Messages; +using R3; namespace Netorrent.P2P.Download; -internal interface IPiecePicker : IAsyncDisposable +internal interface IPiecePicker { int BlockSize { get; } + bool IsEndGame { get; } - void CompletePiece(int index); - void CompleteRequestBlock(RequestBlock requestBlock); + void IncreaseRarity(int index); void DecreaseRarity(int index); - RequestBlock? GetBlock(Bitfield bitfield); + void CompletePiece(int index); + bool TryGetRequestBlock( + IPeerConnection peerConnection, + [NotNullWhen(true)] out RequestBlock? requestBlock + ); + int GetBlockCountByPieceIndex(int pieceIndex); int GetPieceSize(int pieceIndex); long GetBitfieldSize(); RequestBlock GetRequestBlockByBlockIndex(int pieceIndex, int blockIndex); IEnumerable GetTimeoutRequestBlocks(); - void IncreaseRarity(int index); - void SetBlockToPending(RequestBlock requestBlock); - void SetBlockToRequested(RequestBlock requestBlock, IPeerConnection peerConnection); bool TryGetRequestedBlock( Block receiveBlock, [NotNullWhen(true)] out RequestBlock? requestBlock diff --git a/Netorrent/P2P/Download/PiecePicker.cs b/Netorrent/P2P/Download/PiecePicker.cs index 272ef65a..d740d912 100644 --- a/Netorrent/P2P/Download/PiecePicker.cs +++ b/Netorrent/P2P/Download/PiecePicker.cs @@ -1,4 +1,4 @@ -using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.CodeAnalysis; using Netorrent.Extensions; using Netorrent.P2P.Messages; using ZLinq; @@ -8,11 +8,12 @@ namespace Netorrent.P2P.Download; internal class PiecePicker(Bitfield myBitfield, int blockSize, int pieceLenght, long totalSize) : IPiecePicker { - public const int TimeoutSeconds = 10; - private readonly int[] _pieceRarity = new int[myBitfield.Length]; private readonly Dictionary _requestBlocks = []; + private readonly HashSet _requestedIndexes = []; + private bool _isEndGame = false; public int BlockSize => blockSize; + public bool IsEndGame => _isEndGame; public void IncreaseRarity(int index) { @@ -24,15 +25,9 @@ public void DecreaseRarity(int index) Interlocked.Decrement(ref _pieceRarity[index]); } - public void CompleteRequestBlock(RequestBlock requestBlock) - { - requestBlock.State = RequestBlockState.Completed; - requestBlock.RequestedAt = null; - requestBlock.RequestedFrom.Clear(); - } - public void CompletePiece(int index) { + _requestedIndexes.Remove(index); _requestBlocks.Remove(index); } @@ -63,28 +58,56 @@ public bool TryGetRequestedBlock( return false; } - public RequestBlock? GetBlock(Bitfield bitfield) + public bool TryGetRequestBlock( + IPeerConnection peerConnection, + [NotNullWhen(true)] out RequestBlock? requestBlock + ) { - HashSet excludedIndices = []; + if (peerConnection.PeerBitField is null) + { + requestBlock = null; + return false; + } - foreach (var requestBlock in _requestBlocks.Values.AsValueEnumerable().SelectMany(i => i)) + foreach (var item in _requestBlocks.Values.AsValueEnumerable().SelectMany(i => i)) { - excludedIndices.Add(requestBlock.Index); + _requestedIndexes.Add(item.Index); if ( - requestBlock.State == RequestBlockState.Pending - && bitfield.HasPiece(requestBlock.Index) + item is { State: RequestBlockState.Pending or RequestBlockState.EndgameRequested } + && peerConnection.PeerBitField.HasPiece(item.Index) + && !item.RequestedFrom.Contains(peerConnection) ) { - return requestBlock; + requestBlock = item; + return true; + } + } + + bool hasUnrequestedPiece = false; + + for (int i = 0; i < myBitfield.Length; i++) + { + if (myBitfield.HasPiece(i)) + { + continue; + } + + if (!_requestedIndexes.Contains(i)) + { + hasUnrequestedPiece = true; + break; } } - var piece = GetPiece(bitfield, excludedIndices); + _isEndGame = !hasUnrequestedPiece; + + var piece = GetPiece(peerConnection.PeerBitField, _requestedIndexes); if (piece is null) { - return null; + requestBlock = null; + return false; } var blockCount = GetBlockCountByPieceIndex(piece.Value); @@ -100,36 +123,23 @@ public bool TryGetRequestedBlock( } } - return requestBlocks[0]; + requestBlock = requestBlocks[0]; + return true; } public IEnumerable GetTimeoutRequestBlocks() { - var timeout = TimeoutSeconds.Seconds; var now = DateTime.UtcNow; return _requestBlocks .Values.SelectMany(i => i) .Where(i => - i?.State == RequestBlockState.Requested - && i.RequestedAt is not null - && (now - i.RequestedAt.Value) > timeout + i is { State: RequestBlockState.EndgameRequested or RequestBlockState.Requested } + && i.TimeoutAt is not null + && now > i.TimeoutAt.Value ); } - public void SetBlockToPending(RequestBlock requestBlock) - { - requestBlock.State = RequestBlockState.Pending; - requestBlock.RequestedAt = null; - } - - public void SetBlockToRequested(RequestBlock requestBlock, IPeerConnection peerConnection) - { - requestBlock.State = RequestBlockState.Requested; - requestBlock.RequestedFrom.Add(peerConnection); - requestBlock.RequestedAt = DateTimeOffset.UtcNow; - } - private int? GetPiece(Bitfield peerBitfield, HashSet excluded) { var posiblePieces = new List(myBitfield.Length); @@ -172,6 +182,8 @@ public int GetBlockCountByPieceIndex(int pieceIndex) return blockCount; } + //TODO Move this to bitfield class? + public int GetPieceSize(int pieceIndex) { ArgumentOutOfRangeException.ThrowIfNegative(pieceIndex); @@ -223,6 +235,4 @@ public long GetBitfieldSize() } return total; } - - public async ValueTask DisposeAsync() { } } diff --git a/Netorrent/P2P/Download/RequestScheduler.cs b/Netorrent/P2P/Download/RequestScheduler.cs index 461e49ad..9c5e7f1d 100644 --- a/Netorrent/P2P/Download/RequestScheduler.cs +++ b/Netorrent/P2P/Download/RequestScheduler.cs @@ -1,4 +1,4 @@ -using System.Threading.Channels; +using System.Threading.Channels; using Microsoft.Extensions.Logging; using Netorrent.Extensions; using Netorrent.IO; @@ -46,8 +46,10 @@ private async Task ScheduleTimeoutsBlocksAsync(CancellationToken cancellationTok { while (true) { - _downloadMessageChannel.Writer.TryWrite(_timeoutMessage); - await Task.Delay(10.Seconds, cancellationToken).ConfigureAwait(false); + await _downloadMessageChannel + .Writer.WriteAsync(_timeoutMessage, cancellationToken) + .ConfigureAwait(false); + await Task.Delay(1.Seconds, cancellationToken).ConfigureAwait(false); } } @@ -75,7 +77,14 @@ var downloadMessage in _downloadMessageChannel if (downloadMessage is DownloadMessage.ScheduleMessage scheduleMessage) { - ScheduleRequests(scheduleMessage.PeerConnection); + if (piecePicker.IsEndGame) + { + ScheduleRequests(); + } + else + { + ScheduleRequests(scheduleMessage.PeerConnection); + } continue; } } @@ -105,8 +114,10 @@ private async ValueTask ProcessBlockAsync(Block block, CancellationToken cancell } block.FromPeer.PeerRequestWindow.ReceivedBlock(block.FromPeer.DownloadTracker.Speed.Bps); - piecePicker.CompleteRequestBlock(requestedBlock); pieceBuffer.AddBlock(block); + requestedBlock.State = RequestBlockState.Completed; + requestedBlock.TimeoutAt = null; + requestedBlock.RequestedFrom.Clear(); TryRequest(block.FromPeer); if (!pieceBuffer.IsComplete) @@ -149,7 +160,8 @@ private void CheckTimeout() var currentPeers = peers.Values.AsValueEnumerable(); foreach (var requestBlock in piecePicker.GetTimeoutRequestBlocks()) { - piecePicker.SetBlockToPending(requestBlock); //TODO set ALL request blocks by lastRequestedFrom requester to pending as they are all more likely to be timed out + requestBlock.State = RequestBlockState.Pending; + requestBlock.TimeoutAt = null; //TODO set ALL request blocks by lastRequestedFrom requester to pending as they are all more likely to be timed out var freePeer = currentPeers .Where(i => !i.PeerChoking.CurrentValue) @@ -184,28 +196,38 @@ private async Task WarmupAsync(CancellationToken cancellationToken) } } - private void ScheduleRequests(IPeerConnection peerConnection) + private void ScheduleRequests() { - if (peerConnection.PeerBitField is null) + var freePeers = peers + .Values.AsValueEnumerable() + .Where(i => !i.PeerChoking.CurrentValue) + .Where(i => i.AmInterested.CurrentValue); + + foreach (var peer in freePeers) { - return; + ScheduleRequests(peer); } + } + private void ScheduleRequests(IPeerConnection peerConnection) + { while ( peerConnection.RequestedBlocksCount < peerConnection.PeerRequestWindow.MaxInFlightRequests ) { - var requestBlock = piecePicker.GetBlock(peerConnection.PeerBitField); - - if (requestBlock is null) + if (!piecePicker.TryGetRequestBlock(peerConnection, out var requestBlock)) { return; } if (peerConnection.TrySendRequest(requestBlock)) { - piecePicker.SetBlockToRequested(requestBlock, peerConnection); + requestBlock.State = piecePicker.IsEndGame + ? RequestBlockState.EndgameRequested + : RequestBlockState.Requested; + requestBlock.TimeoutAt = CalculateTimeout(peerConnection, requestBlock.Length); + requestBlock.RequestedFrom.Add(peerConnection); peerConnection.IncrementRequestedBlock(); } else @@ -236,6 +258,19 @@ public void TryRequest(IPeerConnection peerConnection) } } + private static DateTimeOffset CalculateTimeout(IPeerConnection peerConnection, int blockLength) + { + var speedBps = peerConnection.DownloadTracker.Speed.Bps; + if (speedBps <= 0) + { + return DateTimeOffset.UtcNow + 30.Seconds; + } + + var estimatedSeconds = blockLength / speedBps; + var timeoutSeconds = estimatedSeconds * 3 + 2; + return DateTimeOffset.UtcNow + (Math.Min(timeoutSeconds, 60)).Seconds; + } + public async ValueTask ReceiveBlockAsync(Block block, CancellationToken cancellationToken) { try @@ -287,7 +322,6 @@ public async ValueTask DisposeAsync() catch { } await DrainChannelsAsync().ConfigureAwait(false); - await piecePicker.DisposeAsync().ConfigureAwait(false); _cts?.Dispose(); } } diff --git a/Netorrent/P2P/Messages/Bitfield.cs b/Netorrent/P2P/Messages/Bitfield.cs index 0deaa103..c5818022 100644 --- a/Netorrent/P2P/Messages/Bitfield.cs +++ b/Netorrent/P2P/Messages/Bitfield.cs @@ -36,7 +36,7 @@ internal Bitfield(ReadOnlySpan bytes, int pieceCount) } } - public bool HasPiece(int index) => index < _bits.Length && _bits[index]; + public bool HasPiece(int index) => _bits[index]; internal void SetPiece(int index) { diff --git a/Netorrent/P2P/Messages/RequestBlock.cs b/Netorrent/P2P/Messages/RequestBlock.cs index cd32a8df..953b54dd 100644 --- a/Netorrent/P2P/Messages/RequestBlock.cs +++ b/Netorrent/P2P/Messages/RequestBlock.cs @@ -1,9 +1,10 @@ -namespace Netorrent.P2P.Messages; +namespace Netorrent.P2P.Messages; enum RequestBlockState { Pending, Requested, + EndgameRequested, Cancelled, Completed, } @@ -15,7 +16,7 @@ internal class RequestBlock(int index, int begin, int length) public readonly int Length = length; public RequestBlockState State { get; set; } = RequestBlockState.Pending; public List RequestedFrom { get; set; } = []; - public DateTimeOffset? RequestedAt { get; set; } + public DateTimeOffset? TimeoutAt { get; set; } public static bool operator ==(RequestBlock left, RequestBlock right) => left.Equals(right); @@ -26,11 +27,12 @@ public override bool Equals(object? obj) return obj is RequestBlock request && Index == request.Index && Begin == request.Begin - && Length == request.Length; + && Length == request.Length + && RequestedFrom.SequenceEqual(request.RequestedFrom); } public override int GetHashCode() { - return HashCode.Combine(Index, Begin, Length); + return HashCode.Combine(Index, Begin, Length, RequestedFrom); } } diff --git a/Netorrent/P2P/PeerConnection.cs b/Netorrent/P2P/PeerConnection.cs index fb4c6136..0d12bc85 100644 --- a/Netorrent/P2P/PeerConnection.cs +++ b/Netorrent/P2P/PeerConnection.cs @@ -1,4 +1,4 @@ -using System.Buffers; +using System.Buffers; using System.Buffers.Binary; using Netorrent.Extensions; using Netorrent.IO; @@ -352,7 +352,7 @@ CancellationToken cancellationToken var request = new RequestBlock(index, begin, length) { - RequestedAt = DateTimeOffset.UtcNow, + TimeoutAt = null, RequestedFrom = [this], }; await uploadScheduler.AddRequestAsync(request, cancellationToken).ConfigureAwait(false); @@ -383,7 +383,7 @@ private void ReceiveCancel(Message message) var begin = BinaryPrimitives.ReadInt32BigEndian(span[4..8]); var length = BinaryPrimitives.ReadInt32BigEndian(span[8..12]); - var request = new RequestBlock(index, begin, length); + var request = new RequestBlock(index, begin, length) { RequestedFrom = [this] }; uploadScheduler.CancelRequest(request); } diff --git a/Netorrent/P2P/Tcp/TcpPeersListener.cs b/Netorrent/P2P/Tcp/TcpPeersListener.cs index 5bfcec22..7b041f4b 100644 --- a/Netorrent/P2P/Tcp/TcpPeersListener.cs +++ b/Netorrent/P2P/Tcp/TcpPeersListener.cs @@ -18,7 +18,7 @@ ILogger logger { private readonly ConcurrentDictionary _peersClientByInfoHash = new(); private readonly Channel _incomingConnections = Channel.CreateBounded( - 128 + new BoundedChannelOptions(128) { SingleWriter = false, SingleReader = true } ); public int Port => ((IPEndPoint)tcpListeners[0].LocalEndpoint).Port; diff --git a/Netorrent/P2P/Upload/UploadScheduler.cs b/Netorrent/P2P/Upload/UploadScheduler.cs index 53dbe5eb..c8d915c1 100644 --- a/Netorrent/P2P/Upload/UploadScheduler.cs +++ b/Netorrent/P2P/Upload/UploadScheduler.cs @@ -25,7 +25,8 @@ ILogger logger Channel.CreateBounded( new BoundedChannelOptions(128) { SingleWriter = false, SingleReader = true } ); - + private readonly Lock _cancelLock = new(); + private readonly HashSet _requests = []; private static readonly UploadMessage.CheckRoundMessage _checkRoundMessage = new(); private readonly TimeSpan _interval = 10.Seconds; @@ -105,15 +106,21 @@ private async ValueTask ProcessRequestAsync( CancellationToken cancellationToken ) { - if ( - requestBlock.State == RequestBlockState.Cancelled - || requestBlock.RequestedFrom.Count == 0 - ) + var peer = requestBlock.RequestedFrom[0]; + + lock (_cancelLock) { - return; + if ( + _requests.TryGetValue(requestBlock, out var actualValue) + && actualValue.State == RequestBlockState.Cancelled + ) + { + _requests.Remove(actualValue); + peer.DecrementUploadRequested(); + return; + } } - var peer = requestBlock.RequestedFrom[0]; var pieceData = await pieceStorage .ReadAsync( requestBlock.Index, @@ -148,6 +155,7 @@ CancellationToken cancellationToken } finally { + _requests.Remove(requestBlock); peer.DecrementUploadRequested(); } } @@ -283,25 +291,35 @@ CancellationToken cancellationToken return; } - from.IncrementUploadRequested(); - await _uploadMessagesChannel - .Writer.WriteAsync(new UploadMessage.RequestBlockMessage(request), cancellationToken) - .ConfigureAwait(false); + if (_requests.Add(request)) + { + from.IncrementUploadRequested(); + + await _uploadMessagesChannel + .Writer.WriteAsync( + new UploadMessage.RequestBlockMessage(request), + cancellationToken + ) + .ConfigureAwait(false); + } } - //TODO properly implement cancellation - public void CancelRequest(RequestBlock request) + public void CancelRequest(RequestBlock cancelled) { - var from = request.RequestedFrom[0]; - - request.State = RequestBlockState.Cancelled; - from.DecrementUploadRequested(); + lock (_cancelLock) + { + if (_requests.TryGetValue(cancelled, out var requestBlock)) + { + cancelled.State = RequestBlockState.Cancelled; + } + } } private async ValueTask DrainChannelsAsync() { await foreach (var _ in _uploadMessagesChannel.Reader.ReadAllAsync().ConfigureAwait(false)) { } + _requests.Clear(); } public async ValueTask DisposeAsync() diff --git a/Netorrent/TorrentFile/FileStructure/MetaInfo.cs b/Netorrent/TorrentFile/FileStructure/MetaInfo.cs index b121ead8..8f916d13 100644 --- a/Netorrent/TorrentFile/FileStructure/MetaInfo.cs +++ b/Netorrent/TorrentFile/FileStructure/MetaInfo.cs @@ -5,13 +5,13 @@ namespace Netorrent.TorrentFile.FileStructure; public record MetaInfo( Info Info, string Announce, - List? AnnounceList = null, + IReadOnlyList? AnnounceList = null, long? CreationDate = null, string? Comment = null, string? CreatedBy = null, string? Encoding = null, string? Title = null, - List? UrlList = null + IReadOnlyList? UrlList = null ) { public BDictionary ToBDictionary() @@ -23,16 +23,16 @@ public BDictionary ToBDictionary() root.Elements["announce"] = new BString(Announce); } - if (AnnounceList != null && AnnounceList.Count != 0) + if (AnnounceList is not null && AnnounceList.Count != 0) { - var innerTier = new BList([ - .. AnnounceList.Select(u => (IBencodingNode)new BString(u)), + root.Elements["announce-list"] = new BList([ + .. AnnounceList.Select(u => new BList([ + .. u.Select(i => (IBencodingNode)new BString(i)), + ])), ]); - var outer = new BList([innerTier]); - root.Elements["announce-list"] = outer; } - if (UrlList != null && UrlList.Count != 0) + if (UrlList is not null && UrlList.Count != 0) { root.Elements["url-list"] = new BList([ .. UrlList.Select(u => (IBencodingNode)new BString(u)), diff --git a/Netorrent/TorrentFile/Torrent.cs b/Netorrent/TorrentFile/Torrent.cs index d2ff4d27..83a555fa 100644 --- a/Netorrent/TorrentFile/Torrent.cs +++ b/Netorrent/TorrentFile/Torrent.cs @@ -109,13 +109,18 @@ IReadOnlySet downloadedPieces torrentClientOptions.Logger ); _trackerClient = new TrackerClient( + _myBitfield, trackerHandlers, torrentClientOptions.UsedTrackers, peersListener.Port, dataStatistics, peerId, trackersChannel.Writer, - [metaInfo.Announce, .. metaInfo.AnnounceList ?? []], + metaInfo.AnnounceList?.Select(i => i.ToArray()).ToList() //Don't modify the original announce list + ?? + [ + [metaInfo.Announce], + ], metaInfo.Info.InfoHash, torrentClientOptions.Logger ); @@ -265,24 +270,42 @@ public async ValueTask CheckAsync(CancellationToken cancellationToken = default) var piecesChannel = Channel.CreateBounded<(int PieceIndex, RentedArray Piece)>( new BoundedChannelOptions(channelSize) { SingleReader = true, SingleWriter = true } ); + var indexChannel = Channel.CreateBounded( + new BoundedChannelOptions(channelSize) { SingleReader = true, SingleWriter = false } + ); var processPiecesTask = ProcessPiecesAsync(); + var processIndexesTask = ProcessIndicesAsync(); var getPiecesTask = GetPiecesAsync(); - await Task.WhenAll(processPiecesTask, getPiecesTask).ConfigureAwait(false); + await Task.WhenAll(processPiecesTask, getPiecesTask, processIndexesTask) + .ConfigureAwait(false); Statistics.Data.SetVerifiedBytes(_piecePicker.GetBitfieldSize()); + async Task ProcessIndicesAsync() + { + await foreach (var index in indexChannel.Reader.ReadAllAsync(cancellationToken)) + { + _myBitfield.SetPiece(index); + } + } + async Task ProcessPiecesAsync() { await foreach ( var items in piecesChannel.Reader.ReadAllAsync(cancellationToken).Chunk(16) ) { - Parallel.ForEach( - items, - (item) => VerifyPiece(item.PieceIndex, item.Piece, cancellationToken) - ); + await Parallel + .ForEachAsync( + items, + cancellationToken: cancellationToken, + (item, ct) => VerifyPieceAsync(item.PieceIndex, pieceData: item.Piece, ct) + ) + .ConfigureAwait(false); } + + indexChannel.Writer.TryComplete(); } async Task GetPiecesAsync() @@ -360,7 +383,7 @@ await piecesChannel piecesChannel.Writer.TryComplete(); } - void VerifyPiece( + async ValueTask VerifyPieceAsync( int pieceIndex, RentedArray pieceData, CancellationToken cancellationToken @@ -374,7 +397,9 @@ CancellationToken cancellationToken if (hasPiece) { - _myBitfield.SetPiece(pieceIndex); + await indexChannel + .Writer.WriteAsync(pieceIndex, cancellationToken) + .ConfigureAwait(false); } Statistics.Check.AddCheckedPiece(); @@ -388,9 +413,9 @@ public async ValueTask DisposeAsync() { _disposed = true; await StopAndWaitToFinishAsync().ConfigureAwait(false); - _pieceStorage.Dispose(); await _peersClient.DisposeAsync().ConfigureAwait(false); await _trackerClient.DisposeAsync().ConfigureAwait(false); + _pieceStorage.Dispose(); Completion.Dispose(); _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; diff --git a/Netorrent/TorrentFile/TorrentClient.cs b/Netorrent/TorrentFile/TorrentClient.cs index 066d2be7..67e14192 100644 --- a/Netorrent/TorrentFile/TorrentClient.cs +++ b/Netorrent/TorrentFile/TorrentClient.cs @@ -180,7 +180,7 @@ public Torrent LoadTorrent( public async ValueTask CreateTorrentAsync( string path, string announceUrl, - List? announceUrls = null, + List? announceUrls = null, List? webUrls = null, int pieceLength = 256 * 1024, // 256 KB default CancellationToken cancellationToken = default @@ -211,7 +211,7 @@ public async ValueTask CreateTorrentAsync( internal static async ValueTask CreateMetaInfoFromPathAsync( string path, string announceUrl, - List? announceUrls, + List? announceUrls, List? webUrls, int pieceLength, CancellationToken cancellationToken = default @@ -252,7 +252,7 @@ static bool IsValidUrl(string? url, bool allowHttp = true) throw new ArgumentException($"Invalid announce URL: '{announceUrl}'"); } - string[] allUrls = [.. announceUrls ?? [], .. webUrls ?? []]; + string[] allUrls = [.. announceUrls?.SelectMany(i => i) ?? [], .. webUrls ?? []]; foreach (string url in allUrls) { if (!IsValidUrl(url)) @@ -461,8 +461,7 @@ internal static MetaInfo ParseMetaInfo(BDictionary dictionary) .Elements.GetValueOrDefault("announce-list") ?.As() ?.Elements.Select(i => i.As()!.Value.Elements) - .SelectMany(i => i) - .Select(i => i.As()!.Value) + ?.Select(i => i.Select(i => i.As()!.Value.Data).ToArray()) .ToList(); var urlList = dictionary .Elements.GetValueOrDefault("url-list") diff --git a/Netorrent/Tracker/Http/HttpTracker.cs b/Netorrent/Tracker/Http/HttpTracker.cs index 5dd077e6..737300ad 100644 --- a/Netorrent/Tracker/Http/HttpTracker.cs +++ b/Netorrent/Tracker/Http/HttpTracker.cs @@ -1,34 +1,44 @@ using System.Net; -using System.Net.Sockets; using System.Threading.Channels; -using Microsoft.Extensions.Logging; +using Netorrent.Exceptions; using Netorrent.Extensions; using Netorrent.P2P.Messages; using Netorrent.Statistics; using Netorrent.TorrentFile.FileStructure; +using R3; namespace Netorrent.Tracker.Http; internal class HttpTracker( + Bitfield myBitfield, int port, DataStatistics transfer, IHttpTrackerHandler httpTrackerHandler, PeerId peerId, InfoHash infoHash, string announceUrl, - ILogger logger, ChannelWriter channelWriter ) : ITracker { public async ValueTask StartAsync(CancellationToken cancellationToken) { - var response = await TryAnnounceAsync(Events.Started, cancellationToken) + var response = await AnnounceAsync( + myBitfield.IsComplete ? Events.Completed : Events.Started, + cancellationToken + ) .ConfigureAwait(false); - if (response is null) - { - return; - } + using var completeDisposable = myBitfield.StateChanged.SubscribeAwait( + async (value, ct) => + { + if (myBitfield.IsComplete) + { + response = await AnnounceAsync(Events.Completed, cancellationToken: ct) + .ConfigureAwait(false); + } + }, + configureAwait: false + ); foreach (var iPEndPoint in response.Peers) { @@ -39,21 +49,10 @@ public async ValueTask StartAsync(CancellationToken cancellationToken) { var interval = response.Interval.Seconds; - if (logger.IsEnabled(LogLevel.Information)) - { - logger.LogInformation("Waiting {seconds} seconds", interval.TotalSeconds); - } - await Task.Delay(interval, cancellationToken).ConfigureAwait(false); - var newResponse = await TryAnnounceAsync(cancellationToken: cancellationToken) + var newResponse = await AnnounceAsync(cancellationToken: cancellationToken) .ConfigureAwait(false); - - if (newResponse is null) - { - continue; - } - response = newResponse; foreach (var iPEndPoint in response.Peers) @@ -63,16 +62,11 @@ public async ValueTask StartAsync(CancellationToken cancellationToken) } } - private async Task TryAnnounceAsync( + private async Task AnnounceAsync( string? @event = null, CancellationToken cancellationToken = default ) { - if (logger.IsEnabled(LogLevel.Information)) - { - logger.LogInformation("Announcing to {url}", announceUrl); - } - try { var request = new HttpTrackerRequest( @@ -85,7 +79,7 @@ public async ValueTask StartAsync(CancellationToken cancellationToken) true, false, @event, - 50 + 200 ); return await httpTrackerHandler @@ -94,17 +88,12 @@ public async ValueTask StartAsync(CancellationToken cancellationToken) } catch (Exception ex) { - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug(ex, "Couldn't announce to {trackerUrl}", announceUrl); - } - - return null; + throw new AnnounceException(ex.Message); } } public async ValueTask StopAsync(CancellationToken cancellationToken) { - await TryAnnounceAsync(Events.Stopped, cancellationToken).ConfigureAwait(false); + await AnnounceAsync(Events.Stopped, cancellationToken).ConfigureAwait(false); } } diff --git a/Netorrent/Tracker/TrackerClient.cs b/Netorrent/Tracker/TrackerClient.cs index 9e92ae03..d6b1a9b2 100644 --- a/Netorrent/Tracker/TrackerClient.cs +++ b/Netorrent/Tracker/TrackerClient.cs @@ -1,8 +1,10 @@ -using System.Net; +using System; +using System.Net; using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Threading.Channels; using Microsoft.Extensions.Logging; +using Netorrent.Exceptions; using Netorrent.Extensions; using Netorrent.P2P.Messages; using Netorrent.Statistics; @@ -15,54 +17,93 @@ namespace Netorrent.Tracker; internal class TrackerClient( + Bitfield myBitfield, TrackerHandlers trackerHandlers, UsedTrackers usedTrackers, int port, DataStatistics transferStatistics, PeerId peerId, ChannelWriter trackersChannel, - string[] announceList, + List announceList, InfoHash infoHash, ILogger logger ) : IAsyncDisposable { public async Task StartAsync(CancellationToken cancellationToken) { - var urls = - announceList - .Where(url => !string.IsNullOrWhiteSpace(url)) - .Distinct(StringComparer.OrdinalIgnoreCase) - ?? []; - - List trackerTasks = []; - List trackers = []; - - //The trackers should not fail by them self - //They finish successfully because of dns problems, udp timeouts, etc. - try + foreach (var urls in announceList) { + var snapshot = urls.AsValueEnumerable().Shuffle().ToArray(); + await foreach ( - var tracker in CreateTrackers(urls, cancellationToken) + var (url, (Ipv4, Ipv6)) in CreateTrackers(snapshot, cancellationToken) .WithCancellation(cancellationToken) .ConfigureAwait(false) ) { - trackers.Add(tracker); - trackerTasks.Add(tracker.StartAsync(cancellationToken).AsTask()); + try + { + Task?[] tasks = + [ + Ipv4?.StartAsync(cancellationToken).AsTask(), + Ipv6?.StartAsync(cancellationToken).AsTask(), + ]; + + await Task.WhenAll(tasks.Where(i => i is not null).Cast()) + .ConfigureAwait(false); + } + catch (AnnounceException ex) + { + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError(ex, "Error Announcing"); + } + } + catch (OperationCanceledException oce) + when (oce.CancellationToken == cancellationToken) + { + try + { + using var ct = new CancellationTokenSource(5.Seconds); + if (Ipv4 is not null) + { + await Ipv4.StopAsync(ct.Token).ConfigureAwait(false); + } + if (Ipv6 is not null) + { + await Ipv6.StopAsync(ct.Token).ConfigureAwait(false); + } + + Promote(urls, url); + } + catch (Exception stopEx) + { + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError(stopEx, "Error stopping"); + } + } + + return; + } } - - await Task.WhenAll(trackerTasks).ConfigureAwait(false); - } - finally - { - using var cts = new CancellationTokenSource(5.Seconds); - var trackerDisposeTasks = trackers.Select(i => i.StopAsync(cts.Token).AsTask()); - await Task.WhenAll(trackerDisposeTasks).ConfigureAwait(false); } } - private async IAsyncEnumerable CreateTrackers( - IEnumerable urls, + private static void Promote(string[] tier, string winner) + { + var idx = Array.IndexOf(tier, winner); + if (idx <= 0) + return; + + // Swap to front + var first = tier[0]; + tier[0] = winner; + tier[idx] = first; + } + + private async IAsyncEnumerable<(string url, (ITracker? Ipv4, ITracker? Ipv6))> CreateTrackers( + string[] urls, [EnumeratorCancellation] CancellationToken cancellationToken ) { @@ -87,19 +128,11 @@ await CreateHttpTrackersAsync(uri, cancellationToken).ConfigureAwait(false), _ => LogUnknownTracker(url), }; - foreach (var tracker in trackers) - { - if (tracker is null) - { - continue; - } - - yield return tracker; - } + yield return (url, trackers); } } - private async ValueTask CreateHttpTrackersAsync( + private async ValueTask<(HttpTracker? Ipv4, HttpTracker? Ipv6)> CreateHttpTrackersAsync( Uri uri, CancellationToken cancellationToken ) @@ -108,16 +141,19 @@ CancellationToken cancellationToken var (ipv4, ipv6) = await Dns.GetHostAdressesOrEmptyAsync(uri, cancellationToken) .ConfigureAwait(false); + HttpTracker? trackerv4 = null; + HttpTracker? trackerv6 = null; + if (trackerHandlers.HttpTrackerHandlerIpv4 is not null && ipv4 is not null) { - var trackerv4 = new HttpTracker( + trackerv4 = new HttpTracker( + myBitfield, port, transferStatistics, trackerHandlers.HttpTrackerHandlerIpv4, peerId, infoHash, uri.OriginalString, - logger, trackersChannel ); httpsTrackers.Add(trackerv4); @@ -125,23 +161,23 @@ CancellationToken cancellationToken if (trackerHandlers.HttpTrackerHandlerIpv6 is not null && ipv6 is not null) { - var trackerv6 = new HttpTracker( + trackerv6 = new HttpTracker( + myBitfield, port, transferStatistics, trackerHandlers.HttpTrackerHandlerIpv6, peerId, infoHash, uri.OriginalString, - logger, trackersChannel ); httpsTrackers.Add(trackerv6); } - return [.. httpsTrackers]; + return (trackerv4, trackerv6); } - private async ValueTask CreateUdpTrackersAsync( + private async ValueTask<(UdpTracker? Ipv4, UdpTracker? Ipv6)> CreateUdpTrackersAsync( Uri uri, CancellationToken cancellationToken ) @@ -150,19 +186,21 @@ CancellationToken cancellationToken var (ipv4, ipv6) = await Dns.GetHostAdressesOrEmptyAsync(uri, cancellationToken) .ConfigureAwait(false); + UdpTracker? trackerv4 = null; + UdpTracker? trackerv6 = null; + if (trackerHandlers.UdpTrackerHandlerIpv4 is not null && ipv4 is not null && uri.Port > 0) { var ipEndpoint = new IPEndPoint(ipv4, uri.Port); - var trackerv4 = new UdpTracker( + trackerv4 = new UdpTracker( + myBitfield, trackerHandlers.UdpTrackerHandlerIpv4, port, transferStatistics, peerId, trackersChannel, infoHash, - uri.OriginalString, - ipEndpoint, - logger + ipEndpoint ); udpTrackers.Add(trackerv4); } @@ -170,31 +208,30 @@ CancellationToken cancellationToken if (trackerHandlers.UdpTrackerHandlerIpv6 is not null && ipv6 is not null && uri.Port > 0) { var ipEndpoint = new IPEndPoint(ipv6, uri.Port); - var trackerv6 = new UdpTracker( + trackerv6 = new UdpTracker( + myBitfield, trackerHandlers.UdpTrackerHandlerIpv6, port, transferStatistics, peerId, trackersChannel, infoHash, - uri.OriginalString, - ipEndpoint, - logger + ipEndpoint ); udpTrackers.Add(trackerv6); } - return [.. udpTrackers]; + return (trackerv4, trackerv6); } - private ITracker[] LogUnknownTracker(string scheme) + private (ITracker? Ipv4, ITracker? Ipv6) LogUnknownTracker(string scheme) { if (logger.IsEnabled(LogLevel.Debug)) { logger.LogDebug("Unknown {scheme} tracker", scheme); } - return []; + return (null, null); } public async ValueTask DisposeAsync() diff --git a/Netorrent/Tracker/Udp/UdpTracker.cs b/Netorrent/Tracker/Udp/UdpTracker.cs index 2990b953..75fe5e29 100644 --- a/Netorrent/Tracker/Udp/UdpTracker.cs +++ b/Netorrent/Tracker/Udp/UdpTracker.cs @@ -1,25 +1,25 @@ using System.Net; using System.Threading.Channels; -using Microsoft.Extensions.Logging; +using Netorrent.Exceptions; using Netorrent.Extensions; using Netorrent.P2P.Messages; using Netorrent.Statistics; using Netorrent.TorrentFile.FileStructure; using Netorrent.Tracker.Udp.Request; using Netorrent.Tracker.Udp.Response; +using R3; namespace Netorrent.Tracker.Udp; internal class UdpTracker( + Bitfield myBitfield, IUdpTrackerHandler udpTrackerHandler, int port, DataStatistics transfer, PeerId peerId, ChannelWriter channelWriter, InfoHash infoHash, - string announceUrl, - IPEndPoint iPEndPoint, - ILogger logger + IPEndPoint iPEndPoint ) : ITracker { private UdpTrackerResponse? _lastResponse; @@ -27,22 +27,30 @@ ILogger logger public async ValueTask StartAsync(CancellationToken cancellationToken) { - if (await TryConnectAsync(iPEndPoint, cancellationToken).ConfigureAwait(false) is null) - { - return; - } + await ConnectAsync(iPEndPoint, cancellationToken).ConfigureAwait(false); - _lastResponse = await TryAnnounceAsync( + _lastResponse = await AnnounceAsync( iPEndPoint, - @event: Events.Started, + @event: myBitfield.IsComplete ? Events.Completed : Events.Started, cancellationToken: cancellationToken ) .ConfigureAwait(false); - if (_lastResponse is null) - { - return; - } + using var completeDisposable = myBitfield.StateChanged.SubscribeAwait( + async (value, ct) => + { + if (myBitfield.IsComplete) + { + _lastResponse = await AnnounceAsync( + iPEndPoint, + @event: Events.Completed, + cancellationToken: cancellationToken + ) + .ConfigureAwait(false); + } + }, + configureAwait: false + ); foreach (var peer in _lastResponse.Peers) { @@ -54,19 +62,9 @@ public async ValueTask StartAsync(CancellationToken cancellationToken) var interval = _lastResponse.Interval.Seconds; await Task.Delay(interval, cancellationToken).ConfigureAwait(false); - if (logger.IsEnabled(LogLevel.Information)) - { - logger.LogInformation("Waiting {seconds} seconds", interval); - } - - var newResponse = await TryAnnounceAsync(iPEndPoint, null, cancellationToken) + var newResponse = await AnnounceAsync(iPEndPoint, null, cancellationToken) .ConfigureAwait(false); - if (newResponse is null) - { - continue; - } - _lastResponse = newResponse; foreach (var peer in _lastResponse.Peers) @@ -76,7 +74,7 @@ public async ValueTask StartAsync(CancellationToken cancellationToken) } } - public async Task TryConnectAsync( + public async Task ConnectAsync( IPEndPoint iPEndPoint, CancellationToken cancellationToken ) @@ -89,16 +87,11 @@ CancellationToken cancellationToken } catch (Exception ex) { - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug(ex, "Couldn't connect to {trackerUrl}", announceUrl); - } - - return null; + throw new AnnounceException(ex.Message); } } - public async Task TryAnnounceAsync( + public async Task AnnounceAsync( IPEndPoint iPEndPoint, string? @event, CancellationToken cancellationToken @@ -106,11 +99,6 @@ CancellationToken cancellationToken { try { - if (logger.IsEnabled(LogLevel.Information)) - { - logger.LogInformation("Announcing to {url}", announceUrl); - } - var connectionId = udpTrackerHandler.GetConnectionIdOrNull(_trackerId); if (connectionId is null || udpTrackerHandler.IsOutdated(connectionId.Value)) @@ -133,7 +121,7 @@ CancellationToken cancellationToken (ushort)port, ConnectionId: connectionId.Value, TransactionId: udpTrackerHandler.MakeTransactionId(), - NumWant: 50 + NumWant: 200 ); return await udpTrackerHandler @@ -142,12 +130,7 @@ CancellationToken cancellationToken } catch (Exception ex) { - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug(ex, "Couldn't announce to {trackerUrl}", announceUrl); - } - - return null; + throw new AnnounceException(ex.Message); } } @@ -156,7 +139,7 @@ public async ValueTask StopAsync(CancellationToken cancellationToken) if (iPEndPoint is not null && _lastResponse is not null) { //Udp tracker don't respond to stop so there is no point in awaiting as it will never complete - _ = TryAnnounceAsync(iPEndPoint, Events.Stopped, cancellationToken); + _ = AnnounceAsync(iPEndPoint, Events.Stopped, cancellationToken); } } } diff --git a/Netorrent/Tracker/Udp/UdpTrackerHandler.cs b/Netorrent/Tracker/Udp/UdpTrackerHandler.cs index 145b978e..58561461 100644 --- a/Netorrent/Tracker/Udp/UdpTrackerHandler.cs +++ b/Netorrent/Tracker/Udp/UdpTrackerHandler.cs @@ -174,6 +174,10 @@ await _udpClient .ConfigureAwait(false); transaction.RetryCount++; var seconds = _retryDelay * (transaction.RetryCount + 1); + if (seconds > 60.Seconds) + { + seconds = 60.Seconds; + } transaction.NextRetryTime = DateTime.UtcNow + seconds; } }