Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a few issues with PipeStream #1399

Merged
merged 5 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
413 changes: 118 additions & 295 deletions src/Renci.SshNet/Common/PipeStream.cs

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions src/Renci.SshNet/ScpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public void Upload(Stream source, string path)
using (var channel = Session.CreateChannelSession())
{
channel.DataReceived += (sender, e) => input.Write(e.Data, 0, e.Data.Length);
channel.Closed += (sender, e) => input.Dispose();
channel.Open();

// Pass only the directory part of the path to the server, and use the (hidden) -d option to signal
Expand Down Expand Up @@ -307,6 +308,7 @@ public void Upload(FileInfo fileInfo, string path)
using (var channel = Session.CreateChannelSession())
{
channel.DataReceived += (sender, e) => input.Write(e.Data, 0, e.Data.Length);
channel.Closed += (sender, e) => input.Dispose();
channel.Open();

// Pass only the directory part of the path to the server, and use the (hidden) -d option to signal
Expand Down Expand Up @@ -364,6 +366,7 @@ public void Upload(DirectoryInfo directoryInfo, string path)
using (var channel = Session.CreateChannelSession())
{
channel.DataReceived += (sender, e) => input.Write(e.Data, 0, e.Data.Length);
channel.Closed += (sender, e) => input.Dispose();
channel.Open();

// start copy with the following options:
Expand Down Expand Up @@ -413,6 +416,7 @@ public void Download(string filename, FileInfo fileInfo)
using (var channel = Session.CreateChannelSession())
{
channel.DataReceived += (sender, e) => input.Write(e.Data, 0, e.Data.Length);
channel.Closed += (sender, e) => input.Dispose();
channel.Open();

// Send channel command request
Expand Down Expand Up @@ -459,6 +463,7 @@ public void Download(string directoryName, DirectoryInfo directoryInfo)
using (var channel = Session.CreateChannelSession())
{
channel.DataReceived += (sender, e) => input.Write(e.Data, 0, e.Data.Length);
channel.Closed += (sender, e) => input.Dispose();
channel.Open();

// Send channel command request
Expand Down Expand Up @@ -505,6 +510,7 @@ public void Download(string filename, Stream destination)
using (var channel = Session.CreateChannelSession())
{
channel.DataReceived += (sender, e) => input.Write(e.Data, 0, e.Data.Length);
channel.Closed += (sender, e) => input.Dispose();
channel.Open();

// Send channel command request
Expand Down
4 changes: 1 addition & 3 deletions src/Renci.SshNet/ShellStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,7 @@ public override bool CanWrite
get { return !_disposed; }
}

/// <summary>
/// This method does nothing.
/// </summary>
/// <inheritdoc/>
public override void Flush()
{
ThrowIfDisposed();
Expand Down
107 changes: 47 additions & 60 deletions src/Renci.SshNet/SshCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public class SshCommand : IDisposable
private EventWaitHandle _sessionErrorOccuredWaitHandle;
private EventWaitHandle _commandCancelledWaitHandle;
private Exception _exception;
private StringBuilder _result;
private StringBuilder _error;
private string _result;
private string _error;
private bool _hasError;
private bool _isDisposed;
private bool _isCancelled;
Expand Down Expand Up @@ -109,21 +109,22 @@ public string Result
{
get
{
_result ??= new StringBuilder();
if (_result is not null)
{
return _result;
}

if (OutputStream != null && OutputStream.Length > 0)
if (OutputStream is null)
{
using (var sr = new StreamReader(OutputStream,
_encoding,
detectEncodingFromByteOrderMarks: true,
bufferSize: 1024,
leaveOpen: true))
{
_ = _result.Append(sr.ReadToEnd());
}
return string.Empty;
}

return _result.ToString();
using (var sr = new StreamReader(OutputStream,
_encoding,
detectEncodingFromByteOrderMarks: true))
{
return _result = sr.ReadToEnd();
}
}
}

Expand All @@ -134,26 +135,22 @@ public string Error
{
get
{
if (_hasError)
if (_error is not null)
{
return _error;
}

if (ExtendedOutputStream is null || !_hasError)
{
_error ??= new StringBuilder();

if (ExtendedOutputStream != null && ExtendedOutputStream.Length > 0)
{
using (var sr = new StreamReader(ExtendedOutputStream,
_encoding,
detectEncodingFromByteOrderMarks: true,
bufferSize: 1024,
leaveOpen: true))
{
_ = _error.Append(sr.ReadToEnd());
}
}

return _error.ToString();
return string.Empty;
}

return string.Empty;
using (var sr = new StreamReader(ExtendedOutputStream,
_encoding,
detectEncodingFromByteOrderMarks: true))
{
return _error = sr.ReadToEnd();
}
}
}

Expand Down Expand Up @@ -265,26 +262,16 @@ public IAsyncResult BeginExecute(AsyncCallback callback, object state)
throw new ArgumentException("CommandText property is empty.");
}

var outputStream = OutputStream;
if (outputStream is not null)
{
outputStream.Dispose();
OutputStream = null;
}

var extendedOutputStream = ExtendedOutputStream;
if (extendedOutputStream is not null)
{
extendedOutputStream.Dispose();
ExtendedOutputStream = null;
}
OutputStream?.Dispose();
ExtendedOutputStream?.Dispose();

// Initialize output streams
OutputStream = new PipeStream();
ExtendedOutputStream = new PipeStream();

_result = null;
_error = null;
_hasError = false;
_callback = callback;

_channel = CreateChannel();
Expand Down Expand Up @@ -341,13 +328,21 @@ public string EndExecute(IAsyncResult asyncResult)

_inputStream?.Close();

// wait for operation to complete (or time out)
WaitOnHandle(_asyncResult.AsyncWaitHandle);
try
{
// wait for operation to complete (or time out)
WaitOnHandle(_asyncResult.AsyncWaitHandle);
}
finally
{
UnsubscribeFromEventsAndDisposeChannel(_channel);
_channel = null;

UnsubscribeFromEventsAndDisposeChannel(_channel);
_channel = null;
OutputStream?.Dispose();
ExtendedOutputStream?.Dispose();

commandAsyncResult.EndCalled = true;
commandAsyncResult.EndCalled = true;
}

if (!_isCancelled)
{
Expand Down Expand Up @@ -437,8 +432,8 @@ private void Session_ErrorOccured(object sender, ExceptionEventArgs e)

private void SetAsyncComplete()
{
OutputStream?.Flush();
ExtendedOutputStream?.Flush();
OutputStream?.Dispose();
ExtendedOutputStream?.Dispose();

_asyncResult.IsCompleted = true;

Expand Down Expand Up @@ -480,11 +475,7 @@ private void Channel_RequestReceived(object sender, ChannelRequestEventArgs e)

private void Channel_ExtendedDataReceived(object sender, ChannelExtendedDataEventArgs e)
{
if (ExtendedOutputStream != null)
{
ExtendedOutputStream.Write(e.Data, 0, e.Data.Length);
ExtendedOutputStream.Flush();
}
ExtendedOutputStream?.Write(e.Data, 0, e.Data.Length);

if (e.DataTypeCode == 1)
{
Expand All @@ -494,11 +485,7 @@ private void Channel_ExtendedDataReceived(object sender, ChannelExtendedDataEven

private void Channel_DataReceived(object sender, ChannelDataEventArgs e)
{
if (OutputStream != null)
{
OutputStream.Write(e.Data, 0, e.Data.Length);
OutputStream.Flush();
}
OutputStream?.Write(e.Data, 0, e.Data.Length);

if (_asyncResult != null)
{
Expand Down
12 changes: 12 additions & 0 deletions test/Renci.SshNet.IntegrationBenchmarks/SshClientBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ public string RunCommand()
return _sshClient!.RunCommand("echo $'test !@#$%^&*()_+{}:,./<>[];\\|'").Result;
}

[Benchmark]
public string RunBigCommand()
{
using var command = _sshClient!.CreateCommand("head -c 10000000 /dev/urandom | base64"); // 10MB of data please

var asyncResult = command.BeginExecute();

command.OutputStream.CopyTo(Stream.Null);

return command.EndExecute(asyncResult);
}

[Benchmark]
public string ShellStreamReadLine()
{
Expand Down
22 changes: 22 additions & 0 deletions test/Renci.SshNet.IntegrationTests/SshClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,28 @@ public void Echo_Command_with_all_characters()
Assert.AreEqual("test !@#$%^&*()_+{}:,./<>[];\\|\n", response.Result);
}

[TestMethod]
public void Test_BigCommand()
{
using var command = _sshClient.CreateCommand("head -c 10000000 /dev/urandom | base64"); // 10MB of data please

var asyncResult = command.BeginExecute();

long totalBytesRead = 0;
int bytesRead;
byte[] buffer = new byte[4096];

while ((bytesRead = command.OutputStream.Read(buffer, 0, buffer.Length)) != 0)
{
totalBytesRead += bytesRead;
}

var result = command.EndExecute(asyncResult);

Assert.AreEqual(13_508_775, totalBytesRead);
Assert.AreEqual(0, result.Length);
}

[TestMethod]
public void Send_InputStream_to_Command()
{
Expand Down
24 changes: 12 additions & 12 deletions test/Renci.SshNet.IntegrationTests/SshTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void Ssh_CreateShell()
}

[TestMethod]
public void Ssh_Command_IntermittendOutput_EndExecute()
public void Ssh_Command_IntermittentOutput_EndExecute()
{
const string remoteFile = "/home/sshnet/test.sh";

Expand Down Expand Up @@ -229,16 +229,8 @@ public void Ssh_Command_IntermittendOutput_EndExecute()
}
}

/// <summary>
/// Ignored for now, because:
/// * OutputStream.Read(...) does not block when no data is available
/// * SshCommand.(Begin)Execute consumes *OutputStream*, advancing its position.
///
/// https://github.com/sshnet/SSH.NET/issues/650
/// </summary>
[TestMethod]
[Ignore]
public void Ssh_Command_IntermittendOutput_OutputStream()
public void Ssh_Command_IntermittentOutput_OutputStream()
{
const string remoteFile = "/home/sshnet/test.sh";

Expand Down Expand Up @@ -297,8 +289,16 @@ public void Ssh_Command_IntermittendOutput_OutputStream()

var actualResult = command.EndExecute(asyncResult);

Assert.AreEqual(expectedResult, actualResult);
Assert.AreEqual(expectedResult, command.Result);
// command.Result (also returned from EndExecute) consumes OutputStream,
// which we've already read from, so Result will be empty.
// TODO consider the suggested changes in https://github.com/sshnet/SSH.NET/issues/650

//Assert.AreEqual(expectedResult, actualResult);
//Assert.AreEqual(expectedResult, command.Result);

// For now just assert the current behaviour.
Assert.AreEqual(0, actualResult.Length);
Assert.AreEqual(0, command.Result.Length);
Comment on lines +292 to +301
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was initially planning on getting this PR in, then getting a release out. Now I am thinking of addressing #650 beforehand by taking the break changing string Execute() to void Execute() in order to stop Execute from consuming OutputStream.

That way, we can get the CancelAsync changes (#1345), this PR with its fixes and subtle behaviour changes, and the source-breaking changes wrapped up in one release. Probably easier for consumers.

}
}
finally
Expand Down
10 changes: 9 additions & 1 deletion test/Renci.SshNet.Tests/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ dotnet_diagnostic.MA0110.severity = silent
# https://github.com/meziantou/Meziantou.Analyzer/blob/main/docs/Rules/MA0026.md
dotnet_diagnostic.MA0026.severity = silent

# MA0042: Do not use blocking calls in an async method
# https://github.com/meziantou/Meziantou.Analyzer/blob/main/docs/Rules/MA0042.md
dotnet_diagnostic.MA0042.severity = silent

#### .NET Compiler Platform analysers rules ####

# CA1031: Do not catch general exception types
Expand Down Expand Up @@ -397,4 +401,8 @@ dotnet_diagnostic.IDE0032.severity = silent

# CA1812: Avoid uninstantiated internal classes
# https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca1812
dotnet_diagnostic.CA1812.severity = silent
dotnet_diagnostic.CA1812.severity = silent

# CA1849: Call async methods when in an async method
# https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/CA1849
dotnet_diagnostic.CA1849.severity = silent
Loading