Skip to content

Commit

Permalink
Refactor request invocation to be extensible. (#641)
Browse files Browse the repository at this point in the history
* Refactor request invocation to be extensible.

- The initial motivation for this change spawned from an investigation in the Razor language server where transitioning from Parallel -> Serial tasks would wait on Parallel work to finish. In scenarios when that parallel work took a long time this could result in significant editor delays in completion where you'd have Parallel (long) -> Serial -> Parallel (completion, short). I played around with changing the System.Reactive bits to have an option to not "wait" for the parallel stacks but System.Reatcive as a library wasn't truly built to handle that type of "change your mind after-the-fact" flow.
- Prior to this change the routing & scheduling aspects of the JsonRpc stack are bound to our ProcessScheduler & InputHandler.RouteRequest & InputHandler.RouteNotification endspoints. This change allows that entire stack to be extensible so consumers can plug & play.
- Added a `RequestInvoker` type which represents the core logic of how the framework invokes a handler for a request. This encapsulates the control flow for invoking, scheduling and handling fallout from invoking a handler.
- Added a `RequestInvokerOptions` type to represent what sort of settings should be applied for the request invoker paradigm.
- Expanded `InputHandler` & `Connection` to have two new constructors that take in a request invoker and obsoleted the old ones. Updated tests to account for this.
- Registered the default request invoker type (the one that uses System.Reactive) if a request invoker was not already registered.

* Make existing request response types fully public

- For consumers who are creating their own `RequestInvoker` they need to manually construct many of our response types. Therefore, the constructors need to also be puclic.

* Fix test.

* Addressed code review comments
  • Loading branch information
NTaylorMullen authored Aug 27, 2021
1 parent de1591e commit a361160
Show file tree
Hide file tree
Showing 12 changed files with 394 additions and 199 deletions.
41 changes: 35 additions & 6 deletions src/JsonRpc/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class Connection : IDisposable
private readonly InputHandler _inputHandler;
public bool IsOpen { get; private set; }

[Obsolete("Use the other constructor that takes a request invoker")]
public Connection(
PipeReader input,
IOutputHandler outputHandler,
Expand All @@ -25,21 +26,49 @@ public Connection(
int concurrency,
IScheduler scheduler,
CreateResponseExceptionHandler? getException = null
) : this(
input,
outputHandler,
receiver,
requestRouter,
responseRouter,
new DefaultRequestInvoker(
requestRouter,
outputHandler,
requestProcessIdentifier,
new RequestInvokerOptions(
requestTimeout,
supportContentModified,
concurrency),
loggerFactory,
scheduler),
loggerFactory,
onUnhandledException,
getException)
{
}

public Connection(
PipeReader input,
IOutputHandler outputHandler,
IReceiver receiver,
IRequestRouter<IHandlerDescriptor?> requestRouter,
IResponseRouter responseRouter,
RequestInvoker requestInvoker,
ILoggerFactory loggerFactory,
OnUnhandledExceptionHandler onUnhandledException,
CreateResponseExceptionHandler? getException = null
) =>
_inputHandler = new InputHandler(
input,
outputHandler,
receiver,
requestProcessIdentifier,
requestRouter,
responseRouter,
requestInvoker,
loggerFactory,
onUnhandledException,
getException,
requestTimeout,
supportContentModified,
concurrency > 1 ? (int?) concurrency : null,
scheduler
getException
);

public void Open()
Expand Down
6 changes: 3 additions & 3 deletions src/JsonRpc/ContentModified.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
using OmniSharp.Extensions.JsonRpc.Server;
using OmniSharp.Extensions.JsonRpc.Server;
using OmniSharp.Extensions.JsonRpc.Server.Messages;

namespace OmniSharp.Extensions.JsonRpc
{
public class ContentModified : RpcError
{
internal ContentModified(string method) : base(null, method, new ErrorMessage(ErrorCodes.ContentModified, "Content Modified"))
public ContentModified(string method) : base(null, method, new ErrorMessage(ErrorCodes.ContentModified, "Content Modified"))
{
}

internal ContentModified(object id, string method) : base(id, method, new ErrorMessage(ErrorCodes.ContentModified, "Content Modified"))
public ContentModified(object id, string method) : base(id, method, new ErrorMessage(ErrorCodes.ContentModified, "Content Modified"))
{
}
}
Expand Down
181 changes: 181 additions & 0 deletions src/JsonRpc/DefaultRequestInvoker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Microsoft.Extensions.Logging;
using OmniSharp.Extensions.JsonRpc.Server;
using OmniSharp.Extensions.JsonRpc.Server.Messages;
using Notification = OmniSharp.Extensions.JsonRpc.Server.Notification;

namespace OmniSharp.Extensions.JsonRpc
{
public class DefaultRequestInvoker : RequestInvoker
{
private readonly IRequestRouter<IHandlerDescriptor?> _requestRouter;
private readonly IOutputHandler _outputHandler;
private readonly ProcessScheduler _processScheduler;
private readonly IRequestProcessIdentifier _requestProcessIdentifier;
private readonly RequestInvokerOptions _options;
private readonly ILogger<DefaultRequestInvoker> _logger;

public DefaultRequestInvoker(
IRequestRouter<IHandlerDescriptor?> requestRouter,
IOutputHandler outputHandler,
IRequestProcessIdentifier requestProcessIdentifier,
RequestInvokerOptions options,
ILoggerFactory loggerFactory,
IScheduler scheduler)
{
_requestRouter = requestRouter;
_outputHandler = outputHandler;
_requestProcessIdentifier = requestProcessIdentifier;
_options = options;
_processScheduler = new ProcessScheduler(loggerFactory, _options.SupportContentModified, _options.Concurrency, scheduler);
_logger = loggerFactory.CreateLogger<DefaultRequestInvoker>();
}

public override RequestInvocationHandle InvokeRequest(IRequestDescriptor<IHandlerDescriptor?> descriptor, Request request)
{
if (descriptor.Default is null)
{
throw new ArgumentNullException(nameof(descriptor.Default));
}

var handle = new RequestInvocationHandle(request);
var type = _requestProcessIdentifier.Identify(descriptor.Default);

var schedulerDelegate = RouteRequest(descriptor, request, handle);
_processScheduler.Add(type, $"{request.Method}:{request.Id}", schedulerDelegate);

return handle;
}

public override void InvokeNotification(IRequestDescriptor<IHandlerDescriptor?> descriptor, Notification notification)
{
if (descriptor.Default is null)
{
throw new ArgumentNullException(nameof(descriptor.Default));
}

var type = _requestProcessIdentifier.Identify(descriptor.Default);
var schedulerDelegate = RouteNotification(descriptor, notification);
_processScheduler.Add(type, notification.Method, schedulerDelegate);
}

public override void Dispose()
{
_processScheduler.Dispose();
}

private SchedulerDelegate RouteRequest(
IRequestDescriptor<IHandlerDescriptor?> descriptor,
Request request,
RequestInvocationHandle handle)
{
var cts = handle.CancellationTokenSource;
return (contentModifiedToken, scheduler) =>
Observable.Create<ErrorResponse>(
observer => {
// ITS A RACE!
var sub = Observable.Amb(
contentModifiedToken.Select(
_ => {
_logger.LogTrace(
"Request {Id} was abandoned due to content be modified", request.Id
);
return new ErrorResponse(
new ContentModified(request.Id, request.Method)
);
}
),
Observable.Timer(_options.RequestTimeout, scheduler).Select(
_ => new ErrorResponse(new RequestCancelled(request.Id, request.Method))
),
Observable.FromAsync(
async ct => {
using var timer = _logger.TimeDebug(
"Processing request {Method} {ResponseId}", request.Method,
request.Id
);
ct.Register(cts.Cancel);
// ObservableToToken(contentModifiedToken).Register(cts.Cancel);
try
{
var result = await _requestRouter.RouteRequest(
descriptor, request, cts.Token
).ConfigureAwait(false);
return result;
}
catch (OperationCanceledException)
{
_logger.LogTrace("Request {Id} was cancelled", request.Id);
return new RequestCancelled(request.Id, request.Method);
}
catch (RpcErrorException e)
{
_logger.LogCritical(
Events.UnhandledRequest, e,
"Failed to handle request {Method} {RequestId}", request.Method,
request.Id
);
return new RpcError(
request.Id, request.Method,
new ErrorMessage(e.Code, e.Message, e.Error)
);
}
catch (Exception e)
{
_logger.LogCritical(
Events.UnhandledRequest, e,
"Failed to handle request {Method} {RequestId}", request.Method,
request.Id
);
return new InternalError(request.Id, request.Method, e.ToString());
}
}
)
)
.Subscribe(observer);
return new CompositeDisposable(sub, handle);
}
)
.Select(
response => {
_outputHandler.Send(response.Value);
return Unit.Default;
}
);
}

private SchedulerDelegate RouteNotification(
IRequestDescriptor<IHandlerDescriptor?> descriptors,
Notification notification) =>
(_, scheduler) =>
// ITS A RACE!
Observable.Amb(
Observable.Timer(_options.RequestTimeout, scheduler)
.Select(_ => Unit.Default)
.Do(
_ => _logger.LogTrace("Notification was cancelled due to timeout")
),
Observable.FromAsync(
async ct => {
using var timer = _logger.TimeDebug("Processing notification {Method}", notification.Method);
try
{
await _requestRouter.RouteNotification(descriptors, notification, ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_logger.LogTrace("Notification was cancelled");
}
catch (Exception e)
{
_logger.LogCritical(Events.UnhandledRequest, e, "Failed to handle request {Method}", notification.Method);
}
}
)
);
}
}
Loading

0 comments on commit a361160

Please sign in to comment.