-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Dev/toddgrun/request concurrency #66767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
7c152da
50d3040
958420b
715716b
0cca37a
02cb458
8ff459f
9f485f2
5fbffa7
ee2e97d
0a53c0d
ad617e8
f5b1620
72f6a8a
7238c19
503e379
70fb7c3
833f131
c068a8c
a6c3f8d
00557da
8ab7bbd
2bde77f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,16 +3,17 @@ | |
| // See the LICENSE file in the project root for more information. | ||
|
|
||
| using System; | ||
| using System.Collections.Concurrent; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Microsoft.VisualStudio.Threading; | ||
| using System.Collections.Immutable; | ||
|
|
||
| namespace Microsoft.CommonLanguageServerProtocol.Framework; | ||
|
|
||
| /// <summary> | ||
| /// Coordinates the exectution of LSP messages to ensure correct results are sent back. | ||
| /// Coordinates the execution of LSP messages to ensure correct results are sent back. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// <para> | ||
|
|
@@ -21,7 +22,7 @@ namespace Microsoft.CommonLanguageServerProtocol.Framework; | |
| /// (via textDocument/didChange for example). | ||
| /// </para> | ||
| /// <para> | ||
| /// This class acheives this by distinguishing between mutating and non-mutating requests, and ensuring that | ||
| /// This class achieves this by distinguishing between mutating and non-mutating requests, and ensuring that | ||
| /// when a mutating request comes in, its processing blocks all subsequent requests. As each request comes in | ||
| /// it is added to a queue, and a queue item will not be retrieved while a mutating request is running. Before | ||
| /// any request is handled the solution state is created by merging workspace solution state, which could have | ||
|
|
@@ -89,6 +90,19 @@ protected IMethodHandler GetMethodHandler<TRequest, TResponse>(string methodName | |
| return handler; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Indicates this queue requires in-progress work to be cancelled before servicing | ||
| /// a mutating request. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// This was added for WebTools consumption as they aren't resilient to | ||
| /// incomplete requests continuing execution during didChange notifications. As their | ||
| /// parse trees are mutable, a didChange notification requires all previous requests | ||
| /// to be completed before processing. This is similar to the O# | ||
| /// WithContentModifiedSupport(false) behavior. | ||
| /// </remarks> | ||
| protected virtual bool CancelInProgressWorkUponMutatingRequest => false; | ||
|
|
||
| /// <summary> | ||
| /// Queues a request to be handled by the specified handler, with mutating requests blocking subsequent requests | ||
| /// from starting until the mutation is complete. | ||
|
|
@@ -156,6 +170,8 @@ private async Task ProcessQueueAsync() | |
| ILspServices? lspServices = null; | ||
| try | ||
| { | ||
| var concurrentlyExecutingTasks = new ConcurrentDictionary<Task, CancellationTokenSource>(); | ||
|
|
||
| while (!_cancelSource.IsCancellationRequested) | ||
| { | ||
| // First attempt to de-queue the work item in its own try-catch. | ||
|
|
@@ -175,9 +191,17 @@ private async Task ProcessQueueAsync() | |
| try | ||
| { | ||
| var (work, activityId, cancellationToken) = queueItem; | ||
| CancellationTokenSource? currentWorkCts = null; | ||
| lspServices = work.LspServices; | ||
|
|
||
| var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken); | ||
| if (CancelInProgressWorkUponMutatingRequest) | ||
| { | ||
| currentWorkCts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken); | ||
|
|
||
| // Use the linked cancellation token so it's task can be cancelled if necessary during a mutating request | ||
| // on a queue that specifies CancelInProgressWorkUponMutatingRequest | ||
| cancellationToken = currentWorkCts.Token; | ||
| } | ||
|
|
||
| // Restore our activity id so that logging/tracking works across asynchronous calls. | ||
| Trace.CorrelationManager.ActivityId = activityId; | ||
|
|
@@ -186,27 +210,69 @@ private async Task ProcessQueueAsync() | |
| var context = await work.CreateRequestContextAsync(cancellationToken).ConfigureAwait(false); | ||
| if (work.MutatesServerState) | ||
| { | ||
| if (CancelInProgressWorkUponMutatingRequest) | ||
| { | ||
| // Cancel all concurrently executing tasks | ||
| var concurrentlyExecutingTasksArray = concurrentlyExecutingTasks.ToArray(); | ||
| for (var i = 0; i < concurrentlyExecutingTasksArray.Length; i++) | ||
| { | ||
| concurrentlyExecutingTasksArray[i].Value.Cancel(); | ||
| } | ||
|
|
||
| // wait for all pending tasks to complete their cancellation, ignoring any exceptions | ||
| await Task.WhenAll(concurrentlyExecutingTasksArray.Select(kvp => kvp.Key)).NoThrowAwaitableInternal(captureContext: false); | ||
| } | ||
|
|
||
| Debug.Assert(!concurrentlyExecutingTasks.Any(), "The tasks should have all been drained before continuing"); | ||
| // Mutating requests block other requests from starting to ensure an up to date snapshot is used. | ||
| // Since we're explicitly awaiting exceptions to mutating requests will bubble up here. | ||
| await WrapStartRequestTaskAsync(work.StartRequestAsync(context, cancellationToken), rethrowExceptions: true).ConfigureAwait(false); | ||
| } | ||
| else | ||
| { | ||
| // Non mutating are fire-and-forget because they are by definition readonly. Any errors | ||
| // Non mutating are fire-and-forget because they are by definition read-only. Any errors | ||
| // will be sent back to the client but they can also be captured via HandleNonMutatingRequestError, | ||
| // though these errors don't put us into a bad state as far as the rest of the queue goes. | ||
| // Furthermore we use Task.Run here to protect ourselves against synchronous execution of work | ||
| // blocking the request queue for longer periods of time (it enforces parallelizabilty). | ||
| _ = WrapStartRequestTaskAsync(Task.Run(() => work.StartRequestAsync(context, cancellationToken), cancellationToken), rethrowExceptions: false); | ||
| // blocking the request queue for longer periods of time (it enforces parallelizability). | ||
| var currentWorkTask = WrapStartRequestTaskAsync(Task.Run(() => work.StartRequestAsync(context, cancellationToken), cancellationToken), rethrowExceptions: false); | ||
|
|
||
| if (CancelInProgressWorkUponMutatingRequest) | ||
| { | ||
| if (currentWorkCts is null) | ||
| { | ||
| throw new InvalidOperationException($"unexpected null value for {nameof(currentWorkCts)}"); | ||
| } | ||
|
|
||
| if (!concurrentlyExecutingTasks.TryAdd(currentWorkTask, currentWorkCts)) | ||
| { | ||
| throw new InvalidOperationException($"unable to add {nameof(currentWorkTask)} into {nameof(concurrentlyExecutingTasks)}"); | ||
| } | ||
|
|
||
| _ = currentWorkTask.ContinueWith(t => | ||
| { | ||
| if (!concurrentlyExecutingTasks.TryRemove(t, out var concurrentlyExecutingTaskCts)) | ||
| { | ||
| throw new InvalidOperationException($"unexpected failure to remove task from {nameof(concurrentlyExecutingTasks)}"); | ||
| } | ||
|
|
||
| concurrentlyExecutingTaskCts.Dispose(); | ||
| }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); | ||
| } | ||
| } | ||
| } | ||
| catch (OperationCanceledException ex) when (ex.CancellationToken == queueItem.cancellationToken) | ||
| catch (OperationCanceledException) | ||
| { | ||
| // Explicitly ignore this exception as cancellation occured as a result of our linked cancellation token. | ||
| // Explicitly ignore this exception as cancellation occurred as a result of our linked cancellation token. | ||
| // This means either the queue is shutting down or the request itself was cancelled. | ||
| // 1. If the queue is shutting down, then while loop will exit before the next iteration since it checks for cancellation. | ||
| // 2. Request cancellations are normal so no need to report anything there. | ||
| } | ||
| catch (ObjectDisposedException) | ||
| { | ||
| // Explicitly ignore this exception as this can occur during the CreateLinkTokenSource call, and means one of the | ||
| // linked cancellationTokens has been cancelled. | ||
| } | ||
|
||
| } | ||
| } | ||
| catch (Exception ex) | ||
|
|
@@ -227,7 +293,7 @@ private async Task ProcessQueueAsync() | |
| } | ||
|
|
||
| /// <summary> | ||
| /// Provides an extensiblity point to log or otherwise inspect errors thrown from non-mutating requests, | ||
| /// Provides an extensibility point to log or otherwise inspect errors thrown from non-mutating requests, | ||
| /// which would otherwise be lost to the fire-and-forget task in the queue. | ||
| /// </summary> | ||
| /// <param name="nonMutatingRequestTask">The task to be inspected.</param> | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: doc that CT.None here is very intentional as this code is absolutely necessary to run for clearnup regardless of how the task completed, and regardless if things were cancelled.