diff --git a/src/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client/StreamingHubClientBase.cs index e6d503932..9e719f4f5 100644 --- a/src/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client/StreamingHubClientBase.cs @@ -1,4 +1,4 @@ -using Grpc.Core; +using Grpc.Core; using MagicOnion.Client.Internal; using MagicOnion.Internal; using MagicOnion.Internal.Buffers; @@ -242,20 +242,16 @@ internal async Task __ConnectAndSubscribeAsync(CancellationToken connectAndSubsc subscriptionToken = CancellationTokenSource.CreateLinkedTokenSource(heartbeatManager.TimeoutToken, subscriptionCts.Token).Token; } - var firstMoveNextTask = reader.MoveNext(CancellationTokenSource.CreateLinkedTokenSource(connectAndSubscribeCancellationToken, subscriptionToken).Token); + var firstMoveNextTask = reader.MoveNext(subscriptionToken); if (firstMoveNextTask.IsFaulted || messageVersion == null) { // NOTE: Grpc.Net: // If an error is returned from `StreamingHub.Connect` method on a server-side, // ResponseStream.MoveNext synchronously returns a task that is `IsFaulted = true`. // `ConnectAsync` method should throw an exception here immediately. - // C-core: - // `firstMoveNextTask` is incomplete task (`IsFaulted = false`) whether ResponseHeadersAsync is failed or not. - // If the channel is disconnected or the server returns an error (StatusCode != OK), awaiting the Task will throw an exception. await firstMoveNextTask.ConfigureAwait(false); - // NOTE: C-core: If the execution reaches here, Connect method returns without any error (StatusCode = OK). but MessageVersion isn't provided from the server. - throw new RpcException(new Status(StatusCode.Internal, $"The request started successfully (StatusCode = OK), but the StreamingHub client has failed to negotiate with the server.")); + throw new RpcException(new Status(StatusCode.Internal, $"The request started successfully (StatusCode = OK), but the StreamingHub client has failed to negotiate with the server. ServerVersion is missing.")); } this.subscription = StartSubscribe(syncContext, firstMoveNextTask, subscriptionToken); diff --git a/tests/MagicOnion.Client.Tests/StreamingHubTest.cs b/tests/MagicOnion.Client.Tests/StreamingHubTest.cs index 5340c1d5a..5fd8f0e65 100644 --- a/tests/MagicOnion.Client.Tests/StreamingHubTest.cs +++ b/tests/MagicOnion.Client.Tests/StreamingHubTest.cs @@ -1,8 +1,9 @@ -using MagicOnion.Client.DynamicClient; +using MagicOnion.Client.DynamicClient; using Microsoft.Extensions.Time.Testing; using System.Buffers; using System.Diagnostics; using System.Reflection; +using System.Threading; namespace MagicOnion.Client.Tests; @@ -783,6 +784,40 @@ public async Task ConnectAsync_CancellationToken_Timeout() Assert.True(disposed); } + [Fact] + public async Task ConnectAsync_CancellationToken_Timeout_On_FirstMoveNext() + { + // Verify that there is no problem when the CancellationToken passed to ConnectAsync is canceled after ConnectAsync and before the first message arrives. + + // Arrange + var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var connectTimeout = new CancellationTokenSource(); + var disposed = true; + var helper = new StreamingHubClientTestHelper( + factoryProvider: DynamicStreamingHubClientFactoryProvider.Instance, + onResponseHeaderAsync: metadata => Task.CompletedTask, + onDuplexStreamingCallDisposeAction: () => + { + disposed = true; + }); + + // Act + var client = await helper.ConnectAsync(connectTimeout.Token); + connectTimeout.Cancel(); + + // Invoke Hub Method + var t = client.Parameter_Zero().WaitAsync(timeout.Token); + { + // Read a hub method request payload + var (messageId, methodId, requestBody) = await helper.ReadRequestAsync(); + // Write a response to the stream + helper.WriteResponse(messageId, methodId, 123); + } + var result = await t; + + // Assert + Assert.Equal(123, result); + } } public interface IGreeterHubReceiver