Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions src/Daqifi.Core.Tests/Device/DaqifiDeviceDrainErrorQueueTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
using Daqifi.Core.Communication.Messages;
using Daqifi.Core.Device;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace Daqifi.Core.Tests.Device
{
public class DaqifiDeviceDrainErrorQueueTests
{
[Fact]
public async Task DrainErrorQueueAsync_WhenQueueIsClean_ReturnsEmpty()
{
var device = new SequencedReplyDevice("TestDevice");
device.Replies.Enqueue(new[] { "0,\"No error\"" });
device.Connect();

var popped = await device.DrainErrorQueueAsync();

Assert.Empty(popped);
Assert.Equal(1, device.ExecuteTextCommandCallCount);
}

[Fact]
public async Task DrainErrorQueueAsync_WhenSeveralErrorsQueued_ReturnsAllInOrder()
{
var device = new SequencedReplyDevice("TestDevice");
device.Replies.Enqueue(new[] { "-200,\"Execution error\"" });
device.Replies.Enqueue(new[] { "-113,\"Undefined header\"" });
device.Replies.Enqueue(new[] { "-410,\"Query INTERRUPTED\"" });
device.Replies.Enqueue(new[] { "0,\"No error\"" });
device.Connect();

var popped = await device.DrainErrorQueueAsync();

Assert.Equal(new[]
{
"-200,\"Execution error\"",
"-113,\"Undefined header\"",
"-410,\"Query INTERRUPTED\"",
}, popped);
Assert.Equal(4, device.ExecuteTextCommandCallCount);
}

[Fact]
public async Task DrainErrorQueueAsync_AcceptsPlusZeroNoErrorTerminator()
{
var device = new SequencedReplyDevice("TestDevice");
device.Replies.Enqueue(new[] { "-200,\"Execution error\"" });
device.Replies.Enqueue(new[] { "+0,\"No error\"" });
device.Connect();

var popped = await device.DrainErrorQueueAsync();

Assert.Single(popped);
Assert.Equal("-200,\"Execution error\"", popped[0]);
}

[Fact]
public async Task DrainErrorQueueAsync_WhenReplyHasTrailingWhitespace_TrimsBeforeReturning()
{
var device = new SequencedReplyDevice("TestDevice");
device.Replies.Enqueue(new[] { " -200,\"Execution error\"\r\n" });
device.Replies.Enqueue(new[] { "0,\"No error\"" });
device.Connect();

var popped = await device.DrainErrorQueueAsync();

Assert.Single(popped);
Assert.Equal("-200,\"Execution error\"", popped[0]);
}

[Fact]
public async Task DrainErrorQueueAsync_TreatsErrorMessageContainingNoErrorPhraseAsRealError()
{
// Terminator detection must use the numeric SCPI code, not a
// substring match — otherwise a real error whose message text
// happens to contain "No error" would falsely terminate the drain.
var device = new SequencedReplyDevice("TestDevice");
device.Replies.Enqueue(new[] { "-200,\"Execution error: No error handler installed\"" });
device.Replies.Enqueue(new[] { "0,\"No error\"" });
device.Connect();

var popped = await device.DrainErrorQueueAsync();

Assert.Equal(new[] { "-200,\"Execution error: No error handler installed\"" }, popped);
Assert.Equal(2, device.ExecuteTextCommandCallCount);
}

[Fact]
public async Task DrainErrorQueueAsync_OnEmptyReply_TerminatesEarly()
{
// Empty reply should NOT be treated as an error and should NOT cause
// the loop to keep hammering the device. Instead, drain stops and
// returns whatever it has so far.
var device = new SequencedReplyDevice("TestDevice");
device.Replies.Enqueue(new[] { "-200,\"Execution error\"" });
device.Replies.Enqueue(Array.Empty<string>()); // simulated timeout
device.Replies.Enqueue(new[] { "0,\"No error\"" }); // should never be consumed
device.Connect();

var popped = await device.DrainErrorQueueAsync();

Assert.Equal(new[] { "-200,\"Execution error\"" }, popped);
Assert.Equal(2, device.ExecuteTextCommandCallCount);
Assert.Single(device.Replies); // unconsumed reply remains
}

[Fact]
public async Task DrainErrorQueueAsync_WhenQueueExceedsCap_ReturnsCapManyAndStops()
{
var device = new SequencedReplyDevice("TestDevice");
for (int i = 0; i < 10; i++)
{
device.Replies.Enqueue(new[] { $"-200,\"Execution error #{i}\"" });
}
device.Replies.Enqueue(new[] { "0,\"No error\"" });
device.Connect();

var popped = await device.DrainErrorQueueAsync(maxIterations: 3);

Assert.Equal(3, popped.Count);
Assert.Equal(3, device.ExecuteTextCommandCallCount);
Assert.Equal(new[]
{
"-200,\"Execution error #0\"",
"-200,\"Execution error #1\"",
"-200,\"Execution error #2\"",
}, popped);
}

[Fact]
public async Task DrainErrorQueueAsync_SendsSystemErrorQueryEachIteration()
{
var device = new SequencedReplyDevice("TestDevice");
device.Replies.Enqueue(new[] { "-200,\"Execution error\"" });
device.Replies.Enqueue(new[] { "-113,\"Undefined header\"" });
device.Replies.Enqueue(new[] { "0,\"No error\"" });
device.Connect();

await device.DrainErrorQueueAsync();

var commands = device.SentMessages.Select(m => m.Data).ToList();
Assert.Equal(3, commands.Count);
Assert.All(commands, c => Assert.Equal("SYSTem:ERRor?", c));
}

[Fact]
public async Task DrainErrorQueueAsync_WhenCancelled_ThrowsOperationCanceled()
{
var device = new SequencedReplyDevice("TestDevice");
// Pre-queue a long stream of errors. The cancellation should fire
// before any of them is processed.
for (int i = 0; i < 10; i++)
{
device.Replies.Enqueue(new[] { $"-200,\"Execution error #{i}\"" });
}
device.Connect();

using var cts = new CancellationTokenSource();
cts.Cancel();

await Assert.ThrowsAnyAsync<OperationCanceledException>(
() => device.DrainErrorQueueAsync(cancellationToken: cts.Token));
}

[Fact]
public async Task DrainErrorQueueAsync_WhenMaxIterationsNotPositive_Throws()
{
var device = new SequencedReplyDevice("TestDevice");

await Assert.ThrowsAsync<ArgumentOutOfRangeException>(
() => device.DrainErrorQueueAsync(maxIterations: 0));
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(
() => device.DrainErrorQueueAsync(maxIterations: -1));
}

/// <summary>
/// A testable DaqifiDevice that returns a different canned response on each
/// successive ExecuteTextCommandAsync call, so drain-style tests can verify
/// per-iteration behavior without a real transport.
/// </summary>
private class SequencedReplyDevice : DaqifiDevice
{
public List<IOutboundMessage<string>> SentMessages { get; } = new();
public Queue<IReadOnlyList<string>> Replies { get; } = new();
public int ExecuteTextCommandCallCount { get; private set; }

public SequencedReplyDevice(string name, IPAddress? ipAddress = null)
: base(name, ipAddress)
{
}

public override void Send<T>(IOutboundMessage<T> message)
{
if (message is IOutboundMessage<string> stringMessage)
{
SentMessages.Add(stringMessage);
}
}

protected override Task<IReadOnlyList<string>> ExecuteTextCommandAsync(
Action setupAction,
int responseTimeoutMs = 1000,
int completionTimeoutMs = 250,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
setupAction();
ExecuteTextCommandCallCount++;
var reply = Replies.Count > 0 ? Replies.Dequeue() : Array.Empty<string>();
return Task.FromResult(reply);
}
}
}
}
10 changes: 10 additions & 0 deletions src/Daqifi.Core/Communication/Producers/ScpiMessageProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ public class ScpiMessageProducer
/// </remarks>
public static IOutboundMessage<string> GetDeviceInfo => new ScpiMessage("SYSTem:SYSInfoPB?");

/// <summary>
/// Creates a query message to pop the next entry from the device's SCPI error queue.
/// </summary>
/// <remarks>
/// Returns the oldest queued error in standard SCPI format (e.g., <c>-200,"Execution error"</c>),
/// or <c>0,"No error"</c> when the queue is empty. Each call removes one entry.
/// Command: SYSTem:ERRor?
/// </remarks>
public static IOutboundMessage<string> GetSystemError => new ScpiMessage("SYSTem:ERRor?");

/// <summary>
/// Creates a command message to force the device into bootloader mode.
/// </summary>
Expand Down
83 changes: 83 additions & 0 deletions src/Daqifi.Core/Device/DaqifiDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Threading;
Expand Down Expand Up @@ -470,6 +471,88 @@ protected virtual async Task<IReadOnlyList<string>> ExecuteTextCommandAsync(
return collectedLines;
}

/// <summary>
/// Pops <c>SYSTem:ERRor?</c> entries from the device until the queue reports
/// <c>"No error"</c> and returns the popped entries to the caller.
/// </summary>
/// <remarks>
/// <para>
/// This is the queue-inspection counterpart to the inline last-command
/// error check used elsewhere in the codebase (e.g. <c>ContainsScpiError</c>
/// in <see cref="DaqifiStreamingDevice"/>): that helper tells you whether
/// the captured response from a single command contained an error,
/// while this method tells you what is currently queued on the
/// device — including stale errors from prior commands or sessions.
/// </para>
/// <para>
/// Ownership of the popped entries is transferred to the caller so
/// they can log them, surface them in a health-check report, throw
/// on hardware faults, or discard them if known-stale.
/// </para>
/// <para>
/// Each iteration uses <see cref="ExecuteTextCommandAsync"/>, which
/// pauses the protobuf consumer for the duration of the text exchange.
/// Avoid calling this during active streaming or concurrently with
/// other text commands.
/// </para>
/// </remarks>
/// <param name="maxIterations">
/// Safety cap on the number of <c>SYSTem:ERRor?</c> queries. Defaults to 256
/// — large enough to drain a deeply queued device, small enough that a
/// runaway loop is bounded. If the cap is reached without seeing
/// <c>"No error"</c>, a warning is traced and the popped entries
/// collected so far are returned; callers that want to treat this as a
/// failure can compare <c>Count</c> to <paramref name="maxIterations"/>.
/// </param>
/// <param name="cancellationToken">A cancellation token to observe while waiting for the task to complete.</param>
/// <returns>The list of error strings popped from the queue (empty if the queue was already clean).</returns>
/// <exception cref="ArgumentOutOfRangeException">Thrown when <paramref name="maxIterations"/> is not positive.</exception>
/// <exception cref="InvalidOperationException">Thrown when the device is not connected or has no transport.</exception>
/// <exception cref="OperationCanceledException">Thrown when the operation is canceled.</exception>
public virtual async Task<IReadOnlyList<string>> DrainErrorQueueAsync(
int maxIterations = 256,
CancellationToken cancellationToken = default)
{
if (maxIterations <= 0)
throw new ArgumentOutOfRangeException(nameof(maxIterations), maxIterations, "Must be positive.");

var popped = new List<string>();
for (int i = 0; i < maxIterations; i++)
{
cancellationToken.ThrowIfCancellationRequested();

var lines = await ExecuteTextCommandAsync(
() => Send(ScpiMessageProducer.GetSystemError),
cancellationToken: cancellationToken).ConfigureAwait(false);

var reply = lines.FirstOrDefault(l => !string.IsNullOrWhiteSpace(l))?.Trim();
if (reply == null)
{
// Empty reply means timeout or unresponsive device, not a
// queued error — terminate rather than spin to maxIterations.
Trace.WriteLine($"[DrainErrorQueueAsync] Empty reply on iteration {i}; terminating after {popped.Count} popped entries.");
return popped;
}

// SCPI error replies are formatted as <code>,"<message>". Code 0
// (or +0) indicates an empty queue; anything else is a real
// error to capture. Parse the numeric prefix rather than
// substring-matching "No error" so a hypothetical error message
// containing that phrase can't be mistaken for the terminator.
var commaIndex = reply.IndexOf(',');
var codeSpan = commaIndex >= 0 ? reply.AsSpan(0, commaIndex).Trim() : reply.AsSpan().Trim();
if (int.TryParse(codeSpan, NumberStyles.Integer, CultureInfo.InvariantCulture, out var code) && code == 0)
{
return popped;
}

popped.Add(reply);
}

Trace.WriteLine($"[DrainErrorQueueAsync] Did not converge after {maxIterations} iterations; queue may still contain entries.");
return popped;
}

/// <summary>
/// Raises the <see cref="MessageReceived"/> event when a message is received from the device.
/// </summary>
Expand Down
Loading