Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public Task Send(string data)
_writer.Flush();
}
}
catch (NotSupportedException ex) when (!_writer.BaseStream.CanWrite)
{
// As we are simply creating streams around some stream passed as ctor argument, we
// end up in some unsynchronized behavior where it's possible that the outside stream
// was disposed and we are still trying to write something. In such case we would fail
// with "System.NotSupportedException: Stream does not support writing.".
// To avoid being too generic in that catch, I am checking if the stream is not writable.
EqtTrace.Verbose("LengthPrefixCommunicationChannel.Send: BaseStream is not writable (most likely it was dispose). {0}", ex);
}
catch (Exception ex)
{
EqtTrace.Error("LengthPrefixCommunicationChannel.Send: Error sending data: {0}.", ex);
Expand All @@ -64,24 +73,36 @@ public Task Send(string data)
/// <inheritdoc />
public Task NotifyDataAvailable()
{
// TODO: Review the comment below, because it says something different than what is
// actually happening, and doing what it suggests would potentially lose messages.
// For example in the case where we start testhost process, send it version, and
// it responds, we then replace the handler with a new one, and there is quite a long time
// (tens of milliseconds) when there is no handler present, which would pump the message
// and dump it.
//
// Try read data even if no one is listening to the data stream. Some server
// implementations (like Sockets) depend on the read operation to determine if a
// connection is closed.
if (MessageReceived != null)
try
{
var data = _reader.ReadString();
MessageReceived.SafeInvoke(this, new MessageReceivedEventArgs { Data = data }, "LengthPrefixCommunicationChannel: MessageReceived");
// TODO: Review the comment below, because it says something different than what is
// actually happening, and doing what it suggests would potentially lose messages.
// For example in the case where we start testhost process, send it version, and
// it responds, we then replace the handler with a new one, and there is quite a long time
// (tens of milliseconds) when there is no handler present, which would pump the message
// and dump it.
//
// Try read data even if no one is listening to the data stream. Some server
// implementations (like Sockets) depend on the read operation to determine if a
// connection is closed.
if (MessageReceived != null)
{
var data = _reader.ReadString();
MessageReceived.SafeInvoke(this, new MessageReceivedEventArgs { Data = data }, "LengthPrefixCommunicationChannel: MessageReceived");
}
else
{
EqtTrace.Verbose("LengthPrefixCommunicationChannel.NotifyDataAvailable: New data are waiting to be received, but there is no subscriber to be notified. Not reading them from the stream.");
}
}
else
catch (ObjectDisposedException ex) when (!_reader.BaseStream.CanRead)
{
EqtTrace.Verbose("LengthPrefixCommunicationChannel.NotifyDataAvailable: New data are waiting to be received, but there is no subscriber to be notified. Not reading them from the stream.");
// As we are simply creating streams around some stream passed as ctor argument, we
// end up in some unsynchronized behavior where it's possible that the outside stream
// was disposed and we are still trying to write something. In such case we would fail
// with "System.ObjectDisposedException: Cannot access a closed Stream.".
// To avoid being too generic in that catch, I am checking if the stream is not readable.
EqtTrace.Verbose("LengthPrefixCommunicationChannel.Send: BaseStream was disposed. {0}", ex);
}

return Task.FromResult(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,6 @@ public async Task SendShouldFlushTheStream()
Assert.AreEqual("a", _reader.ReadString());
}

[TestMethod]
public void SendShouldThrowIfChannelIsDisconnected()
{
_stream.Dispose();

Assert.ThrowsException<CommunicationException>(() => _channel.Send(Dummydata).Wait());
}

[TestMethod]
public async Task MessageReceivedShouldProvideDataOverStream()
{
Expand Down Expand Up @@ -127,6 +119,24 @@ public void DisposeShouldNotCloseTheStream()
Assert.IsTrue(_stream.CanWrite);
}

[TestMethod]
public async Task DoNotFailWhenWritingOnADisposedBaseStream()
{
// Dispose base stream
_stream.Dispose();
await _channel.Send(Dummydata);
}

[TestMethod]
public async Task DoNotFailWhenReadingFromADisposedBaseStream()
{
var data = string.Empty;
_channel.MessageReceived += (sender, messageEventArgs) => data = messageEventArgs.Data;
// Dispose base stream
_stream.Dispose();
await _channel.NotifyDataAvailable();
}

// TODO
// WriteFromMultilpleThreadShouldBeInSequence
private static void SeekToBeginning(Stream stream)
Expand Down