Skip to content

Commit

Permalink
RdCall: call itself on local protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
ulex committed Jun 20, 2023
1 parent ee68e5b commit d12a171
Showing 1 changed file with 38 additions and 26 deletions.
64 changes: 38 additions & 26 deletions rd-net/RdFramework/Tasks/RdCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,32 +92,7 @@ private void OnWireReceived(SerializationCtx ctx, UnsafeReader reader, WiredRdTa

dispatchHelper.Dispatch(myHandlerScheduler, () =>
{
RdTask<TRes> rdTask;
try
{
var handler = Handler;
if (handler == null)
{
var message = $"Handler is not set for {wiredTask} :: received request: {value.PrintToString()}";
ourLogReceived.Error(message);
rdTask = RdTask.Faulted<TRes>(new Exception(message));
}
else
{
try
{
rdTask = handler(externalCancellation, value);
}
catch (Exception ex)
{
rdTask = RdTask.Faulted<TRes>(ex);
}
}
}
catch (Exception e)
{
rdTask = RdTask.Faulted<TRes>(new Exception($"Unexpected exception in {wiredTask}", e));
}
var rdTask = RunHandler(value, externalCancellation, wiredTask);
rdTask.Result.Advise(Lifetime.Eternal, result =>
{
Expand All @@ -137,6 +112,38 @@ private void OnWireReceived(SerializationCtx ctx, UnsafeReader reader, WiredRdTa
});
}

private RdTask<TRes> RunHandler(TReq value, Lifetime externalCancellation, object? moniker)
{
RdTask<TRes> rdTask;
try
{
var handler = Handler;
if (handler == null)
{
var message = $"Handler is not set for {moniker} :: received request: {value.PrintToString()}";
ourLogReceived.Error(message);
rdTask = RdTask.Faulted<TRes>(new Exception(message));
}
else
{
try
{
rdTask = handler(externalCancellation, value);
}
catch (Exception ex)
{
rdTask = RdTask.Faulted<TRes>(ex);
}
}
}
catch (Exception e)
{
rdTask = RdTask.Faulted<TRes>(new Exception($"Unexpected exception in {moniker}", e));
}

return rdTask;
}


public TRes Sync(TReq request, RpcTimeouts? timeouts = null)
{
Expand Down Expand Up @@ -186,6 +193,11 @@ private IRdTask<TRes> StartInternal(Lifetime requestLifetime, TReq request, ISch
if (proto == null || !TryGetSerializationContext(out var serializationContext))
return new WiredRdTask<TReq, TRes>.CallSite(Lifetime.Terminated, this, RdId.Nil, SynchronousScheduler.Instance);

// Short-circuit of calls on local wires. On a local protocol with stub wire the handler will
// never be executed, so we call it right now explicitly in sync mode.
if (proto.Wire.IsStub)
return RunHandler(request, requestLifetime, moniker: this);

var taskId = proto.Identities.Next(RdId.Nil);
var task = new WiredRdTask<TReq,TRes>.CallSite(Lifetime.Intersect(requestLifetime, myBindLifetime), this, taskId, scheduler ?? proto.Scheduler);

Expand Down

0 comments on commit d12a171

Please sign in to comment.