diff --git a/src/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client/StreamingHubClientBase.cs index 9e719f4f5..4cb6f8290 100644 --- a/src/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client/StreamingHubClientBase.cs @@ -4,6 +4,7 @@ using MagicOnion.Internal.Buffers; using MagicOnion.Serialization; using System.Buffers; +using System.Diagnostics.CodeAnalysis; using System.Threading.Channels; namespace MagicOnion.Client; @@ -126,6 +127,7 @@ public abstract class StreamingHubClientBase : IStream #pragma warning disable IDE1006 // Naming Styles const string StreamingHubVersionHeaderKey = "x-magiconion-streaminghub-version"; const string StreamingHubVersionHeaderValue = "2"; + static readonly TimeSpan CleanupSubscriptionWait = TimeSpan.FromMilliseconds(100); #pragma warning restore IDE1006 // Naming Styles readonly CallInvoker callInvoker; @@ -146,11 +148,11 @@ public abstract class StreamingHubClientBase : IStream readonly Channel writerQueue = Channel.CreateUnbounded(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false }); Task? writerTask; - IClientStreamWriter writer = default!; - IAsyncStreamReader reader = default!; + IClientStreamWriter? writer; + IAsyncStreamReader? reader; - StreamingHubClientHeartbeatManager heartbeatManager = default!; - Task subscription = default!; + StreamingHubClientHeartbeatManager? heartbeatManager; + Task? subscription; protected readonly TReceiver receiver; @@ -272,8 +274,21 @@ protected T Deserialize(ReadOnlyMemory data) static Method CreateConnectMethod(string serviceName) => new (MethodType.DuplexStreaming, serviceName, "Connect", MagicOnionMarshallers.StreamingHubMarshaller, MagicOnionMarshallers.StreamingHubMarshaller); + [MemberNotNull(nameof(reader))] + [MemberNotNull(nameof(writer))] + [MemberNotNull(nameof(heartbeatManager))] + void EnsureConnected() + { + if (reader is null || writer is null || heartbeatManager is null) + { + throw new InvalidOperationException("The client must be connected to the server before subscribing."); + } + } + async Task StartSubscribe(SynchronizationContext? syncContext, Task firstMoveNext, CancellationToken subscriptionToken) { + EnsureConnected(); + var disconnectionReason = new DisconnectionReason(DisconnectionType.CompletedNormally, null); writerTask = RunWriterLoopAsync(subscriptionToken); @@ -373,10 +388,10 @@ void ConsumeData(SynchronizationContext? syncContext, StreamingHubPayload payloa ProcessClientResultRequest(syncContext, payload, ref messageReader); break; case StreamingHubMessageType.ServerHeartbeat: - heartbeatManager.ProcessServerHeartbeat(payload); + heartbeatManager!.ProcessServerHeartbeat(payload); break; case StreamingHubMessageType.ClientHeartbeatResponse: - heartbeatManager.ProcessClientHeartbeatResponse(payload); + heartbeatManager!.ProcessClientHeartbeatResponse(payload); break; } } @@ -487,6 +502,7 @@ void ProcessClientResultRequest(SynchronizationContext? syncContext, StreamingHu async Task RunWriterLoopAsync(CancellationToken cancellationToken) { + EnsureConnected(); try { while (!cancellationToken.IsCancellationRequested) @@ -499,6 +515,10 @@ async Task RunWriterLoopAsync(CancellationToken cancellationToken) await writer.WriteAsync(payload).ConfigureAwait(false); } } + else + { + break; // The writer has completed. + } } } catch { /* Ignore */ } @@ -681,7 +701,9 @@ async ValueTask CleanupAsync(bool waitSubscription) catch { } // ignore error? finally { - subscriptionCts.Cancel(); + // When it is necessary to wait for subscription (message reading) completion, add a small delay before cancellation. + // This prevents throwing an IOException (non-error) on the server side when the stream is reset if `Cancel` is performed immediately while message reading is incomplete. + subscriptionCts.CancelAfter(CleanupSubscriptionWait); try { if (waitSubscription) @@ -698,7 +720,7 @@ async ValueTask CleanupAsync(bool waitSubscription) { try { - (item.Value as IStreamingHubResponseTaskSource).TrySetCanceled(); + item.Value.TrySetCanceled(); } catch (Exception ex) { diff --git a/tests/MagicOnion.Client.Tests/StreamingHubTest.cs b/tests/MagicOnion.Client.Tests/StreamingHubTest.cs index 5fd8f0e65..2fecbe36a 100644 --- a/tests/MagicOnion.Client.Tests/StreamingHubTest.cs +++ b/tests/MagicOnion.Client.Tests/StreamingHubTest.cs @@ -757,7 +757,7 @@ public async Task ConnectAsync_CancellationToken_Timeout() { // Arrange var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var connectTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + var connectTimeout = new CancellationTokenSource(); var disposed = true; var helper = new StreamingHubClientTestHelper( factoryProvider: DynamicStreamingHubClientFactoryProvider.Instance, @@ -772,6 +772,7 @@ public async Task ConnectAsync_CancellationToken_Timeout() }); // Act + connectTimeout.CancelAfter(TimeSpan.FromSeconds(1)); var begin = Stopwatch.GetTimestamp(); var ex = await Record.ExceptionAsync(async () => await helper.ConnectAsync(connectTimeout.Token)); var elapsed = Stopwatch.GetElapsedTime(begin);