diff --git a/src/Daqifi.Core.Tests/Communication/Consumers/StreamMessageConsumerIntegrationTests.cs b/src/Daqifi.Core.Tests/Communication/Consumers/StreamMessageConsumerIntegrationTests.cs new file mode 100644 index 0000000..0c67f03 --- /dev/null +++ b/src/Daqifi.Core.Tests/Communication/Consumers/StreamMessageConsumerIntegrationTests.cs @@ -0,0 +1,378 @@ +using Daqifi.Core.Communication.Consumers; +using Daqifi.Core.Communication.Messages; +using Google.Protobuf; + +namespace Daqifi.Core.Tests.Communication.Consumers; + +/// +/// Integration tests demonstrating the full message consumer pipeline: +/// Stream -> StreamMessageConsumer -> ProtobufMessageParser -> MessageReceived event +/// +/// These tests verify that StreamMessageConsumer + ProtobufMessageParser fully encapsulates +/// protobuf internals, allowing consumers to interact only through core's abstractions. +/// +public class StreamMessageConsumerIntegrationTests +{ + /// + /// Demonstrates the complete pipeline: valid protobuf data flows from stream through + /// parser and arrives as a strongly-typed message via the MessageReceived event. + /// This proves consumers don't need direct Google.Protobuf references for basic usage. + /// + [Fact] + public void FullPipeline_ValidProtobufMessage_RaisesMessageReceivedEvent() + { + // Arrange - Create a valid protobuf message with device info + var originalMessage = new DaqifiOutMessage + { + MsgTimeStamp = 12345678, + DeviceStatus = 1, + DevicePort = 9760, + TimestampFreq = 80000000 + }; + + // Serialize with length-delimited format (as the device sends) + var protobufData = SerializeWithLengthPrefix(originalMessage); + using var stream = new MemoryStream(protobufData); + var parser = new ProtobufMessageParser(); + using var consumer = new StreamMessageConsumer(stream, parser); + + IInboundMessage? receivedMessage = null; + byte[]? receivedRawData = null; + DateTime receivedTimestamp = default; + var messageReceived = new ManualResetEventSlim(false); + + consumer.MessageReceived += (sender, args) => + { + receivedMessage = args.Message; + receivedRawData = args.RawData; + receivedTimestamp = args.Timestamp; + messageReceived.Set(); + }; + + // Act + consumer.Start(); + var eventFired = messageReceived.Wait(TimeSpan.FromSeconds(1)); + consumer.Stop(); + + // Assert + Assert.True(eventFired, "MessageReceived event should have fired"); + Assert.NotNull(receivedMessage); + Assert.IsType(receivedMessage); + + // Verify the message data is accessible through core abstractions + var data = receivedMessage.Data; + Assert.Equal(12345678u, data.MsgTimeStamp); + Assert.Equal(1u, data.DeviceStatus); + Assert.Equal(9760u, data.DevicePort); + Assert.Equal(80000000u, data.TimestampFreq); + + // Verify event args contain useful metadata + Assert.NotNull(receivedRawData); + Assert.True(receivedRawData.Length > 0); + Assert.True(receivedTimestamp > DateTime.MinValue); + } + + /// + /// Demonstrates that malformed protobuf data is handled gracefully without + /// leaking InvalidProtocolBufferException to consumers. + /// + [Fact] + public void FullPipeline_MalformedData_HandledGracefullyWithoutException() + { + // Arrange - Create intentionally malformed data + var malformedData = new byte[] { 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x01, 0x02, 0x03 }; + using var stream = new MemoryStream(malformedData); + var parser = new ProtobufMessageParser(); + using var consumer = new StreamMessageConsumer(stream, parser); + + var messagesReceived = new List>(); + var errorsReceived = new List(); + var processingComplete = new ManualResetEventSlim(false); + + consumer.MessageReceived += (sender, args) => + { + messagesReceived.Add(args.Message); + processingComplete.Set(); + }; + consumer.ErrorOccurred += (sender, args) => + { + errorsReceived.Add(args.Error); + processingComplete.Set(); + }; + + // Act - Should not throw, parser handles malformed data internally + // We wait briefly for any potential events; timeout is expected since malformed data + // produces neither valid messages nor stream errors (parser skips bad bytes internally) + consumer.Start(); + processingComplete.Wait(TimeSpan.FromMilliseconds(200)); + consumer.Stop(); + + // Assert - No messages parsed from garbage data, but no crash either + Assert.Empty(messagesReceived); + // Note: ErrorOccurred may or may not fire depending on implementation + // The key is that no InvalidProtocolBufferException propagates to consumer + } + + /// + /// Demonstrates that multiple consecutive messages are all delivered correctly. + /// + [Fact] + public void FullPipeline_MultipleMessages_AllDeliveredInOrder() + { + // Arrange - Create multiple valid protobuf messages + var messages = new List + { + new() { MsgTimeStamp = 1000, DeviceStatus = 1 }, + new() { MsgTimeStamp = 2000, DeviceStatus = 2 }, + new() { MsgTimeStamp = 3000, DeviceStatus = 3 } + }; + + using var stream = new MemoryStream(); + foreach (var msg in messages) + { + var data = SerializeWithLengthPrefix(msg); + stream.Write(data, 0, data.Length); + } + stream.Position = 0; + + var parser = new ProtobufMessageParser(); + using var consumer = new StreamMessageConsumer(stream, parser); + + var receivedMessages = new List(); + var allReceived = new CountdownEvent(3); + + consumer.MessageReceived += (sender, args) => + { + lock (receivedMessages) + { + receivedMessages.Add(args.Message.Data); + } + allReceived.Signal(); + }; + + // Act + consumer.Start(); + var allEventsReceived = allReceived.Wait(TimeSpan.FromSeconds(2)); + consumer.Stop(); + + // Assert + Assert.True(allEventsReceived, "All 3 messages should have been received"); + Assert.Equal(3, receivedMessages.Count); + Assert.Equal(1000u, receivedMessages[0].MsgTimeStamp); + Assert.Equal(2000u, receivedMessages[1].MsgTimeStamp); + Assert.Equal(3000u, receivedMessages[2].MsgTimeStamp); + } + + /// + /// Demonstrates recovery from a single corrupted byte followed by valid data. + /// The ProtobufMessageParser uses byte-skipping to recover from malformed data. + /// This test uses a single garbage byte (0x00 = empty message length) which the parser + /// handles by skipping to the next byte. + /// + [Fact] + public void FullPipeline_SingleCorruptedByte_ThenValidData_RecoversAndParsesValidMessage() + { + // Arrange - Single zero byte (interpreted as empty length) followed by valid message + var validMessage = new DaqifiOutMessage { MsgTimeStamp = 99999, DeviceStatus = 5 }; + var validData = SerializeWithLengthPrefix(validMessage); + + // Prepend single 0x00 byte - parser sees this as "length 0" message, skips it + var combinedData = new byte[1 + validData.Length]; + combinedData[0] = 0x00; + Array.Copy(validData, 0, combinedData, 1, validData.Length); + + using var stream = new MemoryStream(combinedData); + var parser = new ProtobufMessageParser(); + using var consumer = new StreamMessageConsumer(stream, parser); + + DaqifiOutMessage? receivedData = null; + var messageReceived = new ManualResetEventSlim(false); + + consumer.MessageReceived += (sender, args) => + { + receivedData = args.Message.Data; + messageReceived.Set(); + }; + + // Act + consumer.Start(); + var eventFired = messageReceived.Wait(TimeSpan.FromSeconds(1)); + consumer.Stop(); + + // Assert - Should recover and parse the valid message after skipping the zero byte + Assert.True(eventFired, "Should have recovered and parsed valid message"); + Assert.NotNull(receivedData); + Assert.Equal(99999u, receivedData.MsgTimeStamp); + Assert.Equal(5u, receivedData.DeviceStatus); + } + + /// + /// Demonstrates that ErrorOccurred event provides sufficient context for logging, + /// including the exception, raw data, and timestamp. + /// + [Fact] + public void FullPipeline_StreamError_ErrorEventProvidesLoggingContext() + { + // Arrange - Create a stream that throws on read + var errorStream = new ErrorThrowingStream(); + var parser = new ProtobufMessageParser(); + using var consumer = new StreamMessageConsumer(errorStream, parser); + + MessageConsumerErrorEventArgs? errorArgs = null; + var errorReceived = new ManualResetEventSlim(false); + + consumer.ErrorOccurred += (sender, args) => + { + errorArgs = args; + errorReceived.Set(); + }; + + // Act + consumer.Start(); + var eventFired = errorReceived.Wait(TimeSpan.FromSeconds(1)); + consumer.Stop(); + + // Assert + Assert.True(eventFired, "ErrorOccurred event should have fired"); + Assert.NotNull(errorArgs); + Assert.NotNull(errorArgs.Error); + Assert.IsType(errorArgs.Error); + Assert.True(errorArgs.Timestamp > DateTime.MinValue); + // RawData may be null for stream read errors (no data was read) + } + + /// + /// Demonstrates that ClearBuffer() is accessible via the IMessageConsumer interface + /// and clears the internal message buffer. This is essential for reconnection scenarios + /// where residual partial data from a previous session needs to be discarded. + /// + [Fact] + public void ClearBuffer_CalledViaInterface_ClearsInternalBuffer() + { + // Arrange - Create a stream with partial/incomplete message data + // This simulates residual data from a disconnected session + var partialData = new byte[] { 0x05, 0x08, 0x01 }; // Incomplete protobuf (length prefix says 5 bytes, only 2 provided) + using var stream = new MemoryStream(partialData); + var parser = new ProtobufMessageParser(); + using var consumer = new StreamMessageConsumer(stream, parser); + + // Start the consumer to let it read the partial data into internal buffer + consumer.Start(); + Thread.Sleep(50); // Brief pause to allow consumer to read data + + // Verify data was buffered (QueuedMessageCount tracks internal buffer size) + // Note: The exact count depends on timing, but buffer should have some data + var bufferCountBeforeClear = consumer.QueuedMessageCount; + + // Act - Call ClearBuffer via interface (as desktop would do during reconnection) + IMessageConsumer interfaceRef = consumer; + interfaceRef.ClearBuffer(); + + // Assert - Internal buffer should be cleared + Assert.Equal(0, consumer.QueuedMessageCount); + + // Additional verification: consumer should still be functional after clear + Assert.True(consumer.IsRunning); + + consumer.Stop(); + } + + /// + /// Demonstrates that ClearBuffer() can be called safely when consumer is not running. + /// + [Fact] + public void ClearBuffer_WhenNotRunning_DoesNotThrow() + { + // Arrange + using var stream = new MemoryStream(); + var parser = new ProtobufMessageParser(); + using var consumer = new StreamMessageConsumer(stream, parser); + + // Act & Assert - Should not throw when called on non-running consumer + IMessageConsumer interfaceRef = consumer; + var exception = Record.Exception(() => interfaceRef.ClearBuffer()); + + Assert.Null(exception); + Assert.Equal(0, consumer.QueuedMessageCount); + } + + /// + /// Demonstrates that StopSafely() returns true for clean shutdown and false on timeout. + /// + [Fact] + public void StopSafely_CleanShutdown_ReturnsTrue() + { + // Arrange + using var stream = new MemoryStream(); + var parser = new ProtobufMessageParser(); + using var consumer = new StreamMessageConsumer(stream, parser); + consumer.Start(); + + // Act + var result = consumer.StopSafely(timeoutMs: 1000); + + // Assert + Assert.True(result, "StopSafely should return true for clean shutdown"); + Assert.False(consumer.IsRunning); + } + + /// + /// Demonstrates that the consumer properly implements IDisposable. + /// + [Fact] + public void Dispose_StopsConsumerAndCleansUp() + { + // Arrange + using var stream = new MemoryStream(); + var parser = new ProtobufMessageParser(); + var consumer = new StreamMessageConsumer(stream, parser); + consumer.Start(); + Assert.True(consumer.IsRunning); + + // Act + consumer.Dispose(); + + // Assert + Assert.False(consumer.IsRunning); + Assert.Throws(() => consumer.Start()); + Assert.Throws(() => consumer.ClearBuffer()); + } + + /// + /// Helper method to serialize a protobuf message with length-delimited format. + /// This is how the DAQiFi device sends data over the wire. + /// + private static byte[] SerializeWithLengthPrefix(DaqifiOutMessage message) + { + using var stream = new MemoryStream(); + message.WriteDelimitedTo(stream); + return stream.ToArray(); + } + + /// + /// Test helper stream that throws IOException on read. + /// + private class ErrorThrowingStream : Stream + { + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => false; + public override long Length => throw new NotSupportedException(); + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override void Flush() { } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new IOException("Simulated stream read error"); + } + + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + } +} diff --git a/src/Daqifi.Core/Communication/Consumers/IMessageConsumer.cs b/src/Daqifi.Core/Communication/Consumers/IMessageConsumer.cs index 79da418..c6ae534 100644 --- a/src/Daqifi.Core/Communication/Consumers/IMessageConsumer.cs +++ b/src/Daqifi.Core/Communication/Consumers/IMessageConsumer.cs @@ -44,4 +44,10 @@ public interface IMessageConsumer : IDisposable /// Maximum time to wait for processing to complete in milliseconds. /// True if stopped cleanly, false if timeout occurred. bool StopSafely(int timeoutMs = 1000); + + /// + /// Clears any buffered data from the stream and internal buffers. + /// Useful for WiFi devices that may have residual data on connection or during reconnection. + /// + void ClearBuffer(); } \ No newline at end of file