-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Add request concurrency information for processing by the RequestExec… #66727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
7c152da
50d3040
958420b
715716b
0cca37a
02cb458
8ff459f
9f485f2
5fbffa7
ee2e97d
0a53c0d
ad617e8
f5b1620
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,16 +3,17 @@ | |
| // See the LICENSE file in the project root for more information. | ||
|
|
||
| using System; | ||
| using System.Collections.Concurrent; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Microsoft.VisualStudio.Threading; | ||
| using System.Collections.Immutable; | ||
|
|
||
| namespace Microsoft.CommonLanguageServerProtocol.Framework; | ||
|
|
||
| /// <summary> | ||
| /// Coordinates the exectution of LSP messages to ensure correct results are sent back. | ||
| /// Coordinates the execution of LSP messages to ensure correct results are sent back. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// <para> | ||
|
|
@@ -21,7 +22,7 @@ namespace Microsoft.CommonLanguageServerProtocol.Framework; | |
| /// (via textDocument/didChange for example). | ||
| /// </para> | ||
| /// <para> | ||
| /// This class acheives this by distinguishing between mutating and non-mutating requests, and ensuring that | ||
| /// This class achieves this by distinguishing between mutating and non-mutating requests, and ensuring that | ||
| /// when a mutating request comes in, its processing blocks all subsequent requests. As each request comes in | ||
| /// it is added to a queue, and a queue item will not be retrieved while a mutating request is running. Before | ||
| /// any request is handled the solution state is created by merging workspace solution state, which could have | ||
|
|
@@ -89,6 +90,19 @@ protected IMethodHandler GetMethodHandler<TRequest, TResponse>(string methodName | |
| return handler; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Indicates this queue requires in-progress work to be cancelled before servicing | ||
| /// a mutating request. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// This was added for WebTools consumption as they aren't resilient to | ||
| /// incomplete requests continuing execution during didChange notifications. As their | ||
| /// parse trees are mutable, a didChange notification requires all previous requests | ||
| /// to be completed before processing. This is similar to the O# | ||
| /// WithContentModifiedSupport(false) behavior. | ||
| /// </remarks> | ||
| protected virtual bool CancelInProgressWorkUponMutatingRequest => false; | ||
|
|
||
| /// <summary> | ||
| /// Queues a request to be handled by the specified handler, with mutating requests blocking subsequent requests | ||
| /// from starting until the mutation is complete. | ||
|
|
@@ -156,6 +170,8 @@ private async Task ProcessQueueAsync() | |
| ILspServices? lspServices = null; | ||
| try | ||
| { | ||
| var concurrentlyExecutingTasks = new ConcurrentDictionary<Task, CancellationTokenSource>(); | ||
|
|
||
| while (!_cancelSource.IsCancellationRequested) | ||
| { | ||
| // First attempt to de-queue the work item in its own try-catch. | ||
|
|
@@ -175,9 +191,21 @@ private async Task ProcessQueueAsync() | |
| try | ||
| { | ||
| var (work, activityId, cancellationToken) = queueItem; | ||
| CancellationTokenSource? currentWorkCts = null; | ||
| lspServices = work.LspServices; | ||
|
|
||
| var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken); | ||
ToddGrun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (CancelInProgressWorkUponMutatingRequest) | ||
| { | ||
| // Verify queueItem hasn't already been cancelled before creating a linked | ||
| // CancellationTokenSource based on it. | ||
| cancellationToken.ThrowIfCancellationRequested(); | ||
ToddGrun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| currentWorkCts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken); | ||
|
|
||
| // Use the linked cancellation token so it's task can be cancelled if necessary during a mutating request | ||
| // on a queue that specifies CancelInProgressWorkUponMutatingRequest | ||
| cancellationToken = currentWorkCts.Token; | ||
| } | ||
|
|
||
| // Restore our activity id so that logging/tracking works across asynchronous calls. | ||
| Trace.CorrelationManager.ActivityId = activityId; | ||
|
|
@@ -186,6 +214,25 @@ private async Task ProcessQueueAsync() | |
| var context = await work.CreateRequestContextAsync(cancellationToken).ConfigureAwait(false); | ||
| if (work.MutatesServerState) | ||
| { | ||
| if (CancelInProgressWorkUponMutatingRequest) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add some unit tests that verify that the queue behaves as you expect? e.g. verify previous work completes/cancelled before the mutating item starts, verify queue cancellation still functions correctly with this option, etc. we have some existing tests here - https://sourceroslyn.io/#Microsoft.CommonLanguageServerProtocol.Framework.UnitTests/RequestExecutionQueueTests.cs,8166c4c6b96c4379 |
||
| { | ||
| // Cancel all concurrently executing tasks | ||
| var concurrentlyExecutingTasksArray = concurrentlyExecutingTasks.ToArray(); | ||
| for (var i = 0; i < concurrentlyExecutingTasksArray.Length; i++) | ||
| { | ||
| concurrentlyExecutingTasksArray[i].Value.Cancel(); | ||
| } | ||
|
|
||
| try | ||
| { | ||
| // wait for all pending tasks to complete their cancellation | ||
| await Task.WhenAll(concurrentlyExecutingTasksArray.Select(kvp => kvp.Key)).ConfigureAwait(false); | ||
| } | ||
| catch (TaskCanceledException) | ||
ToddGrun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
ToddGrun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| { | ||
| } | ||
| } | ||
|
|
||
| // Mutating requests block other requests from starting to ensure an up to date snapshot is used. | ||
| // Since we're explicitly awaiting exceptions to mutating requests will bubble up here. | ||
| await WrapStartRequestTaskAsync(work.StartRequestAsync(context, cancellationToken), rethrowExceptions: true).ConfigureAwait(false); | ||
|
|
@@ -196,13 +243,36 @@ private async Task ProcessQueueAsync() | |
| // will be sent back to the client but they can also be captured via HandleNonMutatingRequestError, | ||
| // though these errors don't put us into a bad state as far as the rest of the queue goes. | ||
| // Furthermore we use Task.Run here to protect ourselves against synchronous execution of work | ||
| // blocking the request queue for longer periods of time (it enforces parallelizabilty). | ||
| _ = WrapStartRequestTaskAsync(Task.Run(() => work.StartRequestAsync(context, cancellationToken), cancellationToken), rethrowExceptions: false); | ||
| // blocking the request queue for longer periods of time (it enforces parallelizability). | ||
| var currentWorkTask = WrapStartRequestTaskAsync(Task.Run(() => work.StartRequestAsync(context, cancellationToken), cancellationToken), rethrowExceptions: false); | ||
|
|
||
| if (CancelInProgressWorkUponMutatingRequest) | ||
| { | ||
| if (currentWorkCts is null) | ||
| { | ||
| throw new InvalidOperationException($"unexpected null value for {nameof(currentWorkCts)}"); | ||
| } | ||
ToddGrun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if (!concurrentlyExecutingTasks.TryAdd(currentWorkTask, currentWorkCts)) | ||
ToddGrun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| { | ||
| throw new InvalidOperationException($"unable to add {currentWorkTask} into {concurrentlyExecutingTasks}"); | ||
|
||
| } | ||
|
|
||
| _ = currentWorkTask.ContinueWith(t => | ||
| { | ||
| if (!concurrentlyExecutingTasks.TryRemove(t, out var concurrentlyExecutingTaskCts)) | ||
| { | ||
| throw new InvalidOperationException($"unexpected failure to remove task from {nameof(concurrentlyExecutingTasks)}"); | ||
| } | ||
ToddGrun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| concurrentlyExecutingTaskCts.Dispose(); | ||
| }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); | ||
| } | ||
| } | ||
| } | ||
| catch (OperationCanceledException ex) when (ex.CancellationToken == queueItem.cancellationToken) | ||
ToddGrun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| { | ||
| // Explicitly ignore this exception as cancellation occured as a result of our linked cancellation token. | ||
| // Explicitly ignore this exception as cancellation occurred as a result of our linked cancellation token. | ||
| // This means either the queue is shutting down or the request itself was cancelled. | ||
| // 1. If the queue is shutting down, then while loop will exit before the next iteration since it checks for cancellation. | ||
| // 2. Request cancellations are normal so no need to report anything there. | ||
|
|
@@ -227,7 +297,7 @@ private async Task ProcessQueueAsync() | |
| } | ||
|
|
||
| /// <summary> | ||
| /// Provides an extensiblity point to log or otherwise inspect errors thrown from non-mutating requests, | ||
| /// Provides an extensibility point to log or otherwise inspect errors thrown from non-mutating requests, | ||
| /// which would otherwise be lost to the fire-and-forget task in the queue. | ||
| /// </summary> | ||
| /// <param name="nonMutatingRequestTask">The task to be inspected.</param> | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.