Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions src/SharpCompress/Common/Tar/TarFilePart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal override Stream GetCompressedStream()
if (_seekableStream != null)
{
_seekableStream.Position = Header.DataStartPosition ?? 0;
return new TarReadOnlySubStream(_seekableStream, Header.Size, false);
return new TarReadOnlySubStream(_seekableStream, Header.Size);
}
return Header.PackedStream.NotNull();
}
Expand All @@ -36,14 +36,8 @@ internal override Stream GetCompressedStream()
{
if (_seekableStream != null)
{
var useSyncOverAsync = false;
#if LEGACY_DOTNET
useSyncOverAsync = true;
#endif
_seekableStream.Position = Header.DataStartPosition ?? 0;
return new ValueTask<Stream?>(
new TarReadOnlySubStream(_seekableStream, Header.Size, useSyncOverAsync)
);
return new ValueTask<Stream?>(new TarReadOnlySubStream(_seekableStream, Header.Size));
}
return new ValueTask<Stream?>(Header.PackedStream.NotNull());
}
Expand Down
10 changes: 1 addition & 9 deletions src/SharpCompress/Common/Tar/TarHeaderFactory.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,7 @@ IArchiveEncoding archiveEncoding
break;
case StreamingMode.Streaming:
{
var useSyncOverAsync = false;
#if LEGACY_DOTNET
useSyncOverAsync = true;
#endif
header.PackedStream = new TarReadOnlySubStream(
stream,
header.Size,
useSyncOverAsync
);
header.PackedStream = new TarReadOnlySubStream(stream, header.Size);
}
break;
default:
Expand Down
6 changes: 1 addition & 5 deletions src/SharpCompress/Common/Tar/TarHeaderFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ IArchiveEncoding archiveEncoding
break;
case StreamingMode.Streaming:
{
header.PackedStream = new TarReadOnlySubStream(
stream,
header.Size,
false
);
header.PackedStream = new TarReadOnlySubStream(stream, header.Size);
}
break;
default:
Expand Down
114 changes: 88 additions & 26 deletions src/SharpCompress/Common/Tar/TarReadOnlySubStream.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.IO;
using System.Threading.Tasks;

namespace SharpCompress.Common.Tar;

Expand All @@ -8,9 +9,10 @@ internal class TarReadOnlySubStream : Stream
private readonly Stream _stream;

private bool _isDisposed;
private bool _isPositionedAtNextEntry;
private long _amountRead;

public TarReadOnlySubStream(Stream stream, long bytesToRead, bool useSyncOverAsyncDispose)
public TarReadOnlySubStream(Stream stream, long bytesToRead)
{
_stream = stream;
BytesLeftToRead = bytesToRead;
Expand All @@ -27,27 +29,17 @@ protected override void Dispose(bool disposing)
_isDisposed = true;
if (disposing)
{
// Ensure we read all remaining blocks for this entry.
_stream.Skip(BytesLeftToRead);
_amountRead += BytesLeftToRead;

// If the last block wasn't a full 512 bytes, skip the remaining padding bytes.
var bytesInLastBlock = _amountRead % 512;

if (bytesInLastBlock != 0)
if (Utility.UseSyncOverAsyncDispose())
{
if (Utility.UseSyncOverAsyncDispose())
{
#pragma warning disable VSTHRD002 // Avoid problematic synchronous waits
#pragma warning disable CA2012
_stream.SkipAsync(512 - bytesInLastBlock).GetAwaiter().GetResult();
AdvanceToNextHeaderAsync().GetAwaiter().GetResult();
#pragma warning restore CA2012
#pragma warning restore VSTHRD002 // Avoid problematic synchronous waits
}
else
{
_stream.Skip(512 - bytesInLastBlock);
}
}
else
{
AdvanceToNextHeader();
}
}
base.Dispose(disposing);
Expand All @@ -63,24 +55,62 @@ public override async System.Threading.Tasks.ValueTask DisposeAsync()
}

_isDisposed = true;
// Ensure we read all remaining blocks for this entry.
await _stream.SkipAsync(BytesLeftToRead).ConfigureAwait(false);
_amountRead += BytesLeftToRead;
await AdvanceToNextHeaderAsync().ConfigureAwait(false);

GC.SuppressFinalize(this);
await base.DisposeAsync().ConfigureAwait(false);
}
#endif

private long BytesLeftToRead { get; set; }

private void AdvanceToNextHeader()
{
if (_isPositionedAtNextEntry)
{
return;
}

if (BytesLeftToRead > 0)
{
_stream.Skip(BytesLeftToRead);
_amountRead += BytesLeftToRead;
BytesLeftToRead = 0;
}

// If the last block wasn't a full 512 bytes, skip the remaining padding bytes.
// Tar entry data is padded to 512-byte blocks, so callers that read to EOF
// should still leave the shared archive stream positioned at the next header.
var bytesInLastBlock = _amountRead % 512;
if (bytesInLastBlock != 0)
{
_stream.Skip(512 - bytesInLastBlock);
}

_isPositionedAtNextEntry = true;
}

private async ValueTask AdvanceToNextHeaderAsync()
{
if (_isPositionedAtNextEntry)
{
return;
}

if (BytesLeftToRead > 0)
{
await _stream.SkipAsync(BytesLeftToRead).ConfigureAwait(false);
_amountRead += BytesLeftToRead;
BytesLeftToRead = 0;
}

var bytesInLastBlock = _amountRead % 512;
if (bytesInLastBlock != 0)
{
await _stream.SkipAsync(512 - bytesInLastBlock).ConfigureAwait(false);
}

GC.SuppressFinalize(this);
await base.DisposeAsync().ConfigureAwait(false);
_isPositionedAtNextEntry = true;
}
#endif

private long BytesLeftToRead { get; set; }

public override bool CanRead => true;

Expand All @@ -104,6 +134,11 @@ public override long Position

public override int Read(byte[] buffer, int offset, int count)
{
if (BytesLeftToRead <= 0)
{
AdvanceToNextHeader();
return 0;
}
if (BytesLeftToRead < count)
{
count = (int)BytesLeftToRead;
Expand All @@ -113,6 +148,10 @@ public override int Read(byte[] buffer, int offset, int count)
{
BytesLeftToRead -= read;
_amountRead += read;
if (BytesLeftToRead == 0)
{
AdvanceToNextHeader();
}
}
return read;
}
Expand All @@ -121,13 +160,18 @@ public override int ReadByte()
{
if (BytesLeftToRead <= 0)
{
AdvanceToNextHeader();
return -1;
}
var value = _stream.ReadByte();
if (value != -1)
{
--BytesLeftToRead;
++_amountRead;
if (BytesLeftToRead == 0)
{
AdvanceToNextHeader();
}
}
return value;
}
Expand All @@ -139,6 +183,11 @@ public override async System.Threading.Tasks.Task<int> ReadAsync(
System.Threading.CancellationToken cancellationToken
)
{
if (BytesLeftToRead <= 0)
{
await AdvanceToNextHeaderAsync().ConfigureAwait(false);
return 0;
}
if (BytesLeftToRead < count)
{
count = (int)BytesLeftToRead;
Expand All @@ -150,6 +199,10 @@ System.Threading.CancellationToken cancellationToken
{
BytesLeftToRead -= read;
_amountRead += read;
if (BytesLeftToRead == 0)
{
await AdvanceToNextHeaderAsync().ConfigureAwait(false);
}
}
return read;
}
Expand All @@ -160,6 +213,11 @@ public override async System.Threading.Tasks.ValueTask<int> ReadAsync(
System.Threading.CancellationToken cancellationToken = default
)
{
if (BytesLeftToRead <= 0)
{
await AdvanceToNextHeaderAsync().ConfigureAwait(false);
return 0;
}
if (BytesLeftToRead < buffer.Length)
{
buffer = buffer.Slice(0, (int)BytesLeftToRead);
Expand All @@ -169,6 +227,10 @@ public override async System.Threading.Tasks.ValueTask<int> ReadAsync(
{
BytesLeftToRead -= read;
_amountRead += read;
if (BytesLeftToRead == 0)
{
await AdvanceToNextHeaderAsync().ConfigureAwait(false);
}
}
return read;
}
Expand Down
62 changes: 62 additions & 0 deletions tests/SharpCompress.Test/Tar/TarArchiveAsyncTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
Expand Down Expand Up @@ -416,4 +417,65 @@ public async ValueTask Tar_PaxGlobalHeader_Link_Archive_Async()
Assert.Equal(5100, localOverrideLink.GroupId);
Assert.Equal(Convert.ToInt64("777", 8), localOverrideLink.Mode);
}

[Fact]
public async ValueTask Tar_Read_One_At_A_Time_Without_Disposing_Entry_Stream_Async()
{
var archiveEncoding = new ArchiveEncoding { Default = Encoding.UTF8 };
var tarWriterOptions = new TarWriterOptions(CompressionType.None, true)
{
ArchiveEncoding = archiveEncoding,
};
var testBytes = Encoding.UTF8.GetBytes("This is a test.");

using var memoryStream = new MemoryStream();
using (var tarWriter = new TarWriter(memoryStream, tarWriterOptions))
using (var testFileStream = new MemoryStream(testBytes))
{
await tarWriter.WriteAsync("file0.txt", testFileStream, null);
testFileStream.Position = 0;
await tarWriter.WriteAsync("file1.txt", testFileStream, null);
tarWriter.WriteDirectory("folder0", null);
testFileStream.Position = 0;
await tarWriter.WriteAsync("folder0/file_in_folder0.txt", testFileStream, null);
}

memoryStream.Position = 0;

var entryKeys = new List<string?>();
var openEntryStreams = new List<Stream>();

await using (
var archive = await TarArchive.OpenAsyncArchive(
new AsyncOnlyStream(memoryStream),
ReaderOptions.ForExternalStream
)
)
{
await foreach (var entry in archive.EntriesAsync)
{
entryKeys.Add(entry.Key);
if (entry.IsDirectory)
{
continue;
}

var tarEntryStream = await entry.OpenEntryStreamAsync();
openEntryStreams.Add(tarEntryStream);

using var testFileStream = new MemoryStream();
await tarEntryStream.CopyToAsync(testFileStream);
Assert.Equal(testBytes.Length, testFileStream.Length);
}

Assert.Equal(4, await archive.EntriesAsync.CountAsync());
}

openEntryStreams.ForEach(stream => stream.Dispose());

Assert.Equal(
["file0.txt", "file1.txt", "folder0/", "folder0/file_in_folder0.txt"],
entryKeys
);
}
}
Loading
Loading