Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions src/MagicOnion.Client/StreamingHubClientBase.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Grpc.Core;
using Grpc.Core;
using MagicOnion.Client.Internal;
using MagicOnion.Internal;
using MagicOnion.Internal.Buffers;
Expand Down Expand Up @@ -242,20 +242,16 @@ internal async Task __ConnectAndSubscribeAsync(CancellationToken connectAndSubsc
subscriptionToken = CancellationTokenSource.CreateLinkedTokenSource(heartbeatManager.TimeoutToken, subscriptionCts.Token).Token;
}

var firstMoveNextTask = reader.MoveNext(CancellationTokenSource.CreateLinkedTokenSource(connectAndSubscribeCancellationToken, subscriptionToken).Token);
var firstMoveNextTask = reader.MoveNext(subscriptionToken);
if (firstMoveNextTask.IsFaulted || messageVersion == null)
{
// NOTE: Grpc.Net:
// If an error is returned from `StreamingHub.Connect` method on a server-side,
// ResponseStream.MoveNext synchronously returns a task that is `IsFaulted = true`.
// `ConnectAsync` method should throw an exception here immediately.
// C-core:
// `firstMoveNextTask` is incomplete task (`IsFaulted = false`) whether ResponseHeadersAsync is failed or not.
// If the channel is disconnected or the server returns an error (StatusCode != OK), awaiting the Task will throw an exception.
await firstMoveNextTask.ConfigureAwait(false);

// NOTE: C-core: If the execution reaches here, Connect method returns without any error (StatusCode = OK). but MessageVersion isn't provided from the server.
throw new RpcException(new Status(StatusCode.Internal, $"The request started successfully (StatusCode = OK), but the StreamingHub client has failed to negotiate with the server."));
throw new RpcException(new Status(StatusCode.Internal, $"The request started successfully (StatusCode = OK), but the StreamingHub client has failed to negotiate with the server. ServerVersion is missing."));
}

this.subscription = StartSubscribe(syncContext, firstMoveNextTask, subscriptionToken);
Expand Down
37 changes: 36 additions & 1 deletion tests/MagicOnion.Client.Tests/StreamingHubTest.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using MagicOnion.Client.DynamicClient;
using MagicOnion.Client.DynamicClient;
using Microsoft.Extensions.Time.Testing;
using System.Buffers;
using System.Diagnostics;
using System.Reflection;
using System.Threading;

namespace MagicOnion.Client.Tests;

Expand Down Expand Up @@ -783,6 +784,40 @@ public async Task ConnectAsync_CancellationToken_Timeout()
Assert.True(disposed);
}

[Fact]
public async Task ConnectAsync_CancellationToken_Timeout_On_FirstMoveNext()
{
// Verify that there is no problem when the CancellationToken passed to ConnectAsync is canceled after ConnectAsync and before the first message arrives.

// Arrange
var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var connectTimeout = new CancellationTokenSource();
var disposed = true;
var helper = new StreamingHubClientTestHelper<IGreeterHub, IGreeterHubReceiver>(
factoryProvider: DynamicStreamingHubClientFactoryProvider.Instance,
onResponseHeaderAsync: metadata => Task.CompletedTask,
onDuplexStreamingCallDisposeAction: () =>
{
disposed = true;
});

// Act
var client = await helper.ConnectAsync(connectTimeout.Token);
connectTimeout.Cancel();

// Invoke Hub Method
var t = client.Parameter_Zero().WaitAsync(timeout.Token);
{
// Read a hub method request payload
var (messageId, methodId, requestBody) = await helper.ReadRequestAsync<Nil>();
// Write a response to the stream
helper.WriteResponse(messageId, methodId, 123);
}
var result = await t;

// Assert
Assert.Equal(123, result);
}
}

public interface IGreeterHubReceiver
Expand Down
Loading