Skip to content
139 changes: 139 additions & 0 deletions src/Daqifi.Core.Tests/Device/DaqifiDeviceTextCommandLockTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
using System;
using System.Net;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Daqifi.Core.Device;
using Xunit;

namespace Daqifi.Core.Tests.Device
{
/// <summary>
/// Tests for #186 — ExecuteTextCommandAsync must serialize concurrent
/// callers (SemaphoreSlim), reject re-entrant calls from the same
/// async flow (InvalidOperationException, not deadlock), and reject
/// calls when the device is disposed or disconnecting.
///
/// The protected method is exercised via a thin subclass that exposes
/// it. The disposed/disconnecting guards are tested by setting the
/// relevant private fields via reflection — those guards run before
/// any transport / consumer interaction, so this gives faithful
/// coverage without a transport stack. Re-entrancy is tested by
/// flipping the AsyncLocal flag from inside the same logical flow.
/// </summary>
public class DaqifiDeviceTextCommandLockTests
{
[Fact]
public async Task ExecuteTextCommandAsync_WhenAlreadyInsideAsyncFlow_ThrowsInvalidOperation()
{
var device = new TextCommandTestableDevice("TestDevice");

// Simulate "we're already inside ExecuteTextCommandAsync on this
// async flow" by setting the AsyncLocal flag. The re-entrancy
// guard runs before WaitAsync(), so this check fires immediately
// without touching any transport state.
GetIsInsideTextExchange(device).Value = true;

var ex = await Assert.ThrowsAsync<InvalidOperationException>(
() => device.CallExecuteTextCommandAsync(() => { }));
Assert.Contains("not re-entrant", ex.Message);
}

[Fact]
public async Task ExecuteTextCommandAsync_WhenDisposing_ThrowsInvalidOperation()
{
var device = new TextCommandTestableDevice("TestDevice");
SetIsDisconnecting(device, true);

var ex = await Assert.ThrowsAsync<InvalidOperationException>(
() => device.CallExecuteTextCommandAsync(() => { }));
Assert.Contains("disposing or disconnecting", ex.Message);
}

[Fact]
public async Task ExecuteTextCommandAsync_WhenDisposed_ThrowsInvalidOperation()
{
var device = new TextCommandTestableDevice("TestDevice");
SetDisposed(device, true);

var ex = await Assert.ThrowsAsync<InvalidOperationException>(
() => device.CallExecuteTextCommandAsync(() => { }));
Assert.Contains("disposing or disconnecting", ex.Message);
}

[Fact]
public async Task ExecuteTextCommandAsync_ReleasesLockAfterValidationFailure()
{
// After a validation failure (e.g. not connected), the lock
// must be released so subsequent calls don't hang. Verified
// by calling twice — second call must reach validation too,
// not block on WaitAsync.
var device = new TextCommandTestableDevice("TestDevice");

await Assert.ThrowsAsync<InvalidOperationException>(
() => device.CallExecuteTextCommandAsync(() => { }));
// Second call: also throws, but ONLY if the lock was released.
// If the lock leaked, this would deadlock and xunit's per-test
// budget would time it out instead.
await Assert.ThrowsAsync<InvalidOperationException>(
() => device.CallExecuteTextCommandAsync(() => { }));
}

[Fact]
public async Task ExecuteTextCommandAsync_AsyncLocalClearedAfterReturn()
{
// Even when the call throws, the AsyncLocal re-entrancy flag
// is cleared in the finally block so a subsequent call from
// the same flow doesn't false-positive the re-entrancy check.
var device = new TextCommandTestableDevice("TestDevice");

await Assert.ThrowsAsync<InvalidOperationException>(
() => device.CallExecuteTextCommandAsync(() => { }));

Assert.False(GetIsInsideTextExchange(device).Value);
}

// ── Reflection helpers — kept private to this test class so the
// production DaqifiDevice doesn't have to expose internals. ─────

private static AsyncLocal<bool> GetIsInsideTextExchange(DaqifiDevice device)
{
return (AsyncLocal<bool>)typeof(DaqifiDevice)
.GetField("_isInsideTextExchange", BindingFlags.Instance | BindingFlags.NonPublic)!
.GetValue(device)!;
}

private static void SetIsDisconnecting(DaqifiDevice device, bool value)
{
typeof(DaqifiDevice)
.GetField("_isDisconnecting", BindingFlags.Instance | BindingFlags.NonPublic)!
.SetValue(device, value);
}

private static void SetDisposed(DaqifiDevice device, bool value)
{
typeof(DaqifiDevice)
.GetField("_disposed", BindingFlags.Instance | BindingFlags.NonPublic)!
.SetValue(device, value);
}

/// <summary>
/// Subclass that exposes the protected ExecuteTextCommandAsync via
/// a public wrapper so tests can call it directly. Does NOT override
/// it — the real method runs, including the lock + guards.
/// </summary>
private class TextCommandTestableDevice : DaqifiDevice
{
public TextCommandTestableDevice(string name, IPAddress? ipAddress = null)
: base(name, ipAddress)
{
}

public Task<System.Collections.Generic.IReadOnlyList<string>> CallExecuteTextCommandAsync(
Action setupAction)
{
return ExecuteTextCommandAsync(setupAction, responseTimeoutMs: 100, completionTimeoutMs: 50);
}
}
}
}
156 changes: 142 additions & 14 deletions src/Daqifi.Core/Device/DaqifiDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,26 @@ public class DaqifiDevice : IDevice, IDisposable
private bool _isDisconnecting;
private bool _isInitialized;
private readonly List<IChannel> _channels = new();

// Serializes ExecuteTextCommandAsync calls device-wide (closes #186).
// Multiple callers — e.g. concurrent GetSdCardFilesAsync /
// DrainErrorQueueAsync / GetSystemInfoAsync — would otherwise race the
// protobuf-consumer pause/swap/restart sequence on the same stream and
// either intermix SCPI bytes on the wire or interleave reply lines
// between callers' returned lists. SemaphoreSlim chosen over Lock
// because the method is async; counter is (1, 1) for mutual exclusion.
private readonly SemaphoreSlim _textExchangeLock = new(1, 1);

// Async-context flag that tracks whether the current logical flow
// already holds _textExchangeLock. AsyncLocal flows across await
// resumptions on different threads, so a setupAction that re-enters
// ExecuteTextCommandAsync after a ConfigureAwait(false) hop is still
// detected and surfaced as InvalidOperationException — instead of
// wedging on _textExchangeLock.WaitAsync() (the re-entrant call
// would corrupt the consumer swap mid-flight). Plain
// Environment.CurrentManagedThreadId capture wouldn't work — the
// value seen before await may not match the value seen after.
private readonly AsyncLocal<bool> _isInsideTextExchange = new();

/// <summary>
/// Gets the current connection status of the device.
Expand Down Expand Up @@ -198,9 +218,39 @@ public void Connect()
/// <summary>
/// Disconnects from the device.
/// </summary>
/// <remarks>
/// Waits up to 1 second to acquire <c>_textExchangeLock</c> before
/// tearing down the consumer / producer / transport. This prevents
/// a race where an in-flight <see cref="ExecuteTextCommandAsync"/>
/// is mid-swap (text consumer running on the stream, protobuf
/// consumer not yet restarted) and Disconnect rips the transport
/// out from under it. If the wait times out, Disconnect proceeds
/// anyway — a stuck text exchange must not block teardown forever.
/// The 1s budget is the longest delay any normal text exchange
/// can hold the lock for (responseTimeoutMs default + safety
/// margin); callers wanting non-blocking disconnect should drive
/// this off a Task.Run.
/// </remarks>
public void Disconnect()
{
_isDisconnecting = true;
// Best-effort coordination with ExecuteTextCommandAsync —
// acquire the lock so we don't tear the transport out from
// under an in-flight text exchange. The lock IS released in
// the finally below when acquired (so a future Connect()
// followed by ExecuteTextCommandAsync isn't blocked); a
// stuck exchange that holds past the timeout drops to the
// _isDisconnecting validation path inside the exchange.
var lockAcquired = false;
try
{
lockAcquired = _textExchangeLock.Wait(TimeSpan.FromSeconds(1));
Comment thread
qodo-code-review[bot] marked this conversation as resolved.
Outdated
}
catch (ObjectDisposedException)
{
// Disconnect called after Dispose — nothing to coordinate.
}

try
{
// Unsubscribe from message consumer events
Expand All @@ -222,6 +272,16 @@ public void Disconnect()
State = DeviceState.Disconnected;
_isInitialized = false;
_isDisconnecting = false;
if (lockAcquired)
{
try
{
_textExchangeLock.Release();
}
catch (ObjectDisposedException)
{
}
}
}
}

Expand Down Expand Up @@ -328,26 +388,68 @@ protected virtual async Task<IReadOnlyList<string>> ExecuteTextCommandAsync(
if (completionTimeoutMs <= 0)
throw new ArgumentOutOfRangeException(nameof(completionTimeoutMs), completionTimeoutMs, "Timeout must be positive.");

var sw = Stopwatch.StartNew();
cancellationToken.ThrowIfCancellationRequested();

if (!IsConnected)
// Async-context re-entrancy detection: a setupAction that calls
// ExecuteTextCommandAsync on the same device would corrupt the
// consumer swap mid-flight. Surface as a clean exception rather
// than wedging on _textExchangeLock.WaitAsync() forever.
// AsyncLocal flows across await thread hops so this catches
// re-entry even when the inner call resumes on a different
// thread than the outer call.
if (_isInsideTextExchange.Value)
{
throw new InvalidOperationException("Device is not connected.");
throw new InvalidOperationException(
"ExecuteTextCommandAsync is not re-entrant on the same device; "
+ "do not call it from inside a setupAction callback.");
}

if (_transport == null)
try
{
throw new InvalidOperationException("ExecuteTextCommandAsync requires a transport-based connection.");
await _textExchangeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
// Dispose() raced ahead of us and disposed the semaphore.
// Surface the same clean failure as the post-acquisition
// _disposed check below, instead of leaking a low-level
// teardown exception to callers.
throw new InvalidOperationException(
"ExecuteTextCommandAsync cannot run because the device is disposed.");
}

cancellationToken.ThrowIfCancellationRequested();

var collectedLines = new List<string>();
var stream = _transport.Stream;
int? originalReadTimeout = null;

_isInsideTextExchange.Value = true;
try
{
// All validation runs INSIDE the lock so a competing thread
// calling DisconnectAsync() / Dispose() while we're blocked
// on WaitAsync() doesn't leave us with a stale _transport /
// _messageConsumer reference (closes the TOCTOU window
// documented in #186).
if (_disposed || _isDisconnecting)
{
throw new InvalidOperationException(
"ExecuteTextCommandAsync cannot run while the device is "
+ "disposing or disconnecting.");
}

if (!IsConnected)
{
throw new InvalidOperationException("Device is not connected.");
}

if (_transport == null)
{
throw new InvalidOperationException("ExecuteTextCommandAsync requires a transport-based connection.");
}

var sw = Stopwatch.StartNew();
var collectedLines = new List<string>();
var stream = _transport.Stream;
int? originalReadTimeout = null;

try
{
if (stream.CanTimeout)
{
try
Expand Down Expand Up @@ -388,7 +490,11 @@ protected virtual async Task<IReadOnlyList<string>> ExecuteTextCommandAsync(
};

textConsumer.Start();
await Task.Delay(50, cancellationToken);
// ConfigureAwait(false): the lock is held across this delay,
// so resuming on a captured sync context (e.g. UI thread)
// would let a sync Disconnect() call from that same thread
// deadlock waiting for the lock we hold.
await Task.Delay(50, cancellationToken).ConfigureAwait(false);

Trace.WriteLine($"[ExecuteTextCommandAsync] Text consumer started at {sw.ElapsedMilliseconds}ms");

Expand All @@ -408,7 +514,10 @@ protected virtual async Task<IReadOnlyList<string>> ExecuteTextCommandAsync(
while (DateTime.UtcNow - startTime < maxWait)
{
var previousCount = collectedLines.Count;
await Task.Delay(50, cancellationToken);
// ConfigureAwait(false): see textConsumer.Start above —
// we still hold _textExchangeLock and must not resume on
// a captured sync context.
await Task.Delay(50, cancellationToken).ConfigureAwait(false);
if (collectedLines.Count > previousCount)
{
lastMessageTime = DateTime.UtcNow;
Expand Down Expand Up @@ -468,7 +577,25 @@ protected virtual async Task<IReadOnlyList<string>> ExecuteTextCommandAsync(
Trace.WriteLine($"[ExecuteTextCommandAsync] Total elapsed: {sw.ElapsedMilliseconds}ms");
}

return collectedLines;
return collectedLines;
}
finally
{
_isInsideTextExchange.Value = false;
// Release can race with Dispose() — Dispose acquires the lock
// before disposing it, but if that acquisition timed out and
// Dispose proceeded anyway, our SemaphoreSlim handle is now
// gone. Treat that as a benign teardown signal rather than
// surfacing it from the finally and masking the original
// exception (if any) from the try body.
try
{
_textExchangeLock.Release();
}
catch (ObjectDisposedException)
{
}
}
}

/// <summary>
Expand Down Expand Up @@ -595,6 +722,7 @@ public void Dispose()
_messageConsumer?.Dispose();
_messageProducer?.Dispose();
_transport?.Dispose();
_textExchangeLock.Dispose();
_disposed = true;
Comment thread
qodo-code-review[bot] marked this conversation as resolved.
}
}
Expand Down
Loading