Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions src/MagicOnion.Client/StreamingHubClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using MagicOnion.Internal.Buffers;
using MagicOnion.Serialization;
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Channels;

namespace MagicOnion.Client;
Expand Down Expand Up @@ -126,6 +127,7 @@ public abstract class StreamingHubClientBase<TStreamingHub, TReceiver> : IStream
#pragma warning disable IDE1006 // Naming Styles
const string StreamingHubVersionHeaderKey = "x-magiconion-streaminghub-version";
const string StreamingHubVersionHeaderValue = "2";
static readonly TimeSpan CleanupSubscriptionWait = TimeSpan.FromMilliseconds(100);
#pragma warning restore IDE1006 // Naming Styles

readonly CallInvoker callInvoker;
Expand All @@ -146,11 +148,11 @@ public abstract class StreamingHubClientBase<TStreamingHub, TReceiver> : IStream

readonly Channel<StreamingHubPayload> writerQueue = Channel.CreateUnbounded<StreamingHubPayload>(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false });
Task? writerTask;
IClientStreamWriter<StreamingHubPayload> writer = default!;
IAsyncStreamReader<StreamingHubPayload> reader = default!;
IClientStreamWriter<StreamingHubPayload>? writer;
IAsyncStreamReader<StreamingHubPayload>? reader;

StreamingHubClientHeartbeatManager heartbeatManager = default!;
Task subscription = default!;
StreamingHubClientHeartbeatManager? heartbeatManager;
Task? subscription;

protected readonly TReceiver receiver;

Expand Down Expand Up @@ -272,8 +274,21 @@ protected T Deserialize<T>(ReadOnlyMemory<byte> data)
static Method<StreamingHubPayload, StreamingHubPayload> CreateConnectMethod(string serviceName)
=> new (MethodType.DuplexStreaming, serviceName, "Connect", MagicOnionMarshallers.StreamingHubMarshaller, MagicOnionMarshallers.StreamingHubMarshaller);

[MemberNotNull(nameof(reader))]
[MemberNotNull(nameof(writer))]
[MemberNotNull(nameof(heartbeatManager))]
void EnsureConnected()
{
if (reader is null || writer is null || heartbeatManager is null)
{
throw new InvalidOperationException("The client must be connected to the server before subscribing.");
}
}

async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstMoveNext, CancellationToken subscriptionToken)
{
EnsureConnected();

var disconnectionReason = new DisconnectionReason(DisconnectionType.CompletedNormally, null);
writerTask = RunWriterLoopAsync(subscriptionToken);

Expand Down Expand Up @@ -373,10 +388,10 @@ void ConsumeData(SynchronizationContext? syncContext, StreamingHubPayload payloa
ProcessClientResultRequest(syncContext, payload, ref messageReader);
break;
case StreamingHubMessageType.ServerHeartbeat:
heartbeatManager.ProcessServerHeartbeat(payload);
heartbeatManager!.ProcessServerHeartbeat(payload);
break;
case StreamingHubMessageType.ClientHeartbeatResponse:
heartbeatManager.ProcessClientHeartbeatResponse(payload);
heartbeatManager!.ProcessClientHeartbeatResponse(payload);
break;
}
}
Expand Down Expand Up @@ -487,6 +502,7 @@ void ProcessClientResultRequest(SynchronizationContext? syncContext, StreamingHu

async Task RunWriterLoopAsync(CancellationToken cancellationToken)
{
EnsureConnected();
try
{
while (!cancellationToken.IsCancellationRequested)
Expand All @@ -499,6 +515,10 @@ async Task RunWriterLoopAsync(CancellationToken cancellationToken)
await writer.WriteAsync(payload).ConfigureAwait(false);
}
}
else
{
break; // The writer has completed.
}
}
}
catch { /* Ignore */ }
Expand Down Expand Up @@ -681,7 +701,9 @@ async ValueTask CleanupAsync(bool waitSubscription)
catch { } // ignore error?
finally
{
subscriptionCts.Cancel();
// When it is necessary to wait for subscription (message reading) completion, add a small delay before cancellation.
// This prevents throwing an IOException (non-error) on the server side when the stream is reset if `Cancel` is performed immediately while message reading is incomplete.
subscriptionCts.CancelAfter(CleanupSubscriptionWait);
try
{
if (waitSubscription)
Expand All @@ -698,7 +720,7 @@ async ValueTask CleanupAsync(bool waitSubscription)
{
try
{
(item.Value as IStreamingHubResponseTaskSource).TrySetCanceled();
item.Value.TrySetCanceled();
}
catch (Exception ex)
{
Expand Down
3 changes: 2 additions & 1 deletion tests/MagicOnion.Client.Tests/StreamingHubTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ public async Task ConnectAsync_CancellationToken_Timeout()
{
// Arrange
var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var connectTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(1));
var connectTimeout = new CancellationTokenSource();
var disposed = true;
var helper = new StreamingHubClientTestHelper<IGreeterHub, IGreeterHubReceiver>(
factoryProvider: DynamicStreamingHubClientFactoryProvider.Instance,
Expand All @@ -772,6 +772,7 @@ public async Task ConnectAsync_CancellationToken_Timeout()
});

// Act
connectTimeout.CancelAfter(TimeSpan.FromSeconds(1));
var begin = Stopwatch.GetTimestamp();
var ex = await Record.ExceptionAsync(async () => await helper.ConnectAsync(connectTimeout.Token));
var elapsed = Stopwatch.GetElapsedTime(begin);
Expand Down
Loading