Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async diagnostics analyzer work queue #2351

Merged
merged 22 commits into from
Sep 25, 2023
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis;
using Microsoft.Extensions.Logging;

#nullable enable

namespace OmniSharp.Roslyn.CSharp.Workers.Diagnostics
{
public class AsyncAnalyzerWorkQueue
{
private readonly object _lock = new();
private readonly Queue _foreground = new();
private readonly Queue _background = new();
private readonly ILogger<AnalyzerWorkQueue> _logger;
private TaskCompletionSource<object?> _takeWorkWaiter = new(TaskCreationOptions.RunContinuationsAsynchronously);

public AsyncAnalyzerWorkQueue(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<AnalyzerWorkQueue>();
}

public int PendingCount
{
get
{
lock (_lock)
return _foreground.PendingCount + _background.PendingCount;
}
}

public void PutWork(IReadOnlyCollection<DocumentId> documentIds, AnalyzerWorkType workType)
{
lock (_lock)
{
foreach (var documentId in documentIds)
{
_foreground.RequestCancellationIfActive(documentId);
_background.RequestCancellationIfActive(documentId);

if (workType == AnalyzerWorkType.Foreground)
_foreground.Enqueue(documentId);
else if (workType == AnalyzerWorkType.Background)
_background.Enqueue(documentId);
}

// Complete the work waiter task to allow work to be taken from the queue.
if (!_takeWorkWaiter.Task.IsCompleted)
_takeWorkWaiter.SetResult(null);
}
}

public async Task<QueueItem> TakeWorkAsync(CancellationToken cancellationToken = default)
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();

Task awaitTask;

lock (_lock)
{
if (_foreground.TryDequeue(out var documentId, out var cancellationTokenSource))
{
return new QueueItem
(
DocumentId: documentId,
CancellationToken: cancellationTokenSource.Token,
AnalyzerWorkType: AnalyzerWorkType.Foreground,
DocumentCount: _foreground.MaximumPendingCount,
DocumentCountRemaining: _foreground.PendingCount
);
}
else if (_background.TryDequeue(out documentId, out cancellationTokenSource))
{
return new QueueItem
(
DocumentId: documentId,
CancellationToken: cancellationTokenSource.Token,
AnalyzerWorkType: AnalyzerWorkType.Background,
DocumentCount: _background.MaximumPendingCount,
DocumentCountRemaining: _background.PendingCount
);
}

if (_foreground.PendingCount == 0 && _background.PendingCount == 0 && _takeWorkWaiter.Task.IsCompleted)
_takeWorkWaiter = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);

awaitTask = _takeWorkWaiter.Task;
}

// There is no chance of the default cancellation token being cancelled, so we can
// simply wait for work to be queued. Otherwise, we need to handle the case that the
// token is cancelled before we have work to return.
if (cancellationToken == default)
{
await awaitTask.ConfigureAwait(false);
}
else
{
var tcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);

using (cancellationToken.Register(() => tcs.SetResult(null)))
{
await Task.WhenAny(awaitTask, tcs.Task).ConfigureAwait(false);
}
}
}
}

public void WorkComplete(QueueItem item)
{
lock (_lock)
{
if (item.AnalyzerWorkType == AnalyzerWorkType.Foreground)
_foreground.WorkComplete(item.DocumentId, item.CancellationToken);
else if (item.AnalyzerWorkType == AnalyzerWorkType.Background)
_background.WorkComplete(item.DocumentId, item.CancellationToken);
}
}

public async Task WaitForegroundWorkComplete(CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
return;

Task waitForgroundTask;

lock (_lock)
waitForgroundTask = _foreground.GetWaiter();

if (waitForgroundTask.IsCompleted)
return;

if (cancellationToken == default)
{
await waitForgroundTask.ConfigureAwait(false);

return;
}

var taskCompletion = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);

using (cancellationToken.Register(() => taskCompletion.SetResult(null)))
{
await Task.WhenAny(taskCompletion.Task, waitForgroundTask).ConfigureAwait(false);

if (!waitForgroundTask.IsCompleted)
_logger.LogWarning($"Timeout before work got ready for foreground analysis queue. This is assertion to prevent complete api hang in case of error.");
}
}

public bool TryPromote(DocumentId id)
{
var shouldEnqueue = false;

lock (_lock)
{
shouldEnqueue = _background.IsEnqueued(id) || _background.IsActive(id);
}

if (shouldEnqueue)
PutWork(new[] { id }, AnalyzerWorkType.Foreground);

return shouldEnqueue;
}

public record QueueItem
(
DocumentId DocumentId,
CancellationToken CancellationToken,
AnalyzerWorkType AnalyzerWorkType,
int DocumentCount,
int DocumentCountRemaining
);

private class Queue
{
private readonly HashSet<DocumentId> _pendingHash = new();
private readonly Queue<DocumentId> _pendingQueue = new();
private readonly Dictionary<DocumentId, List<CancellationTokenSource>> _active = new();
private readonly List<(HashSet<DocumentId> DocumentIds, TaskCompletionSource<object?> TaskCompletionSource)> _waiters = new();

public int PendingCount => _pendingQueue.Count;

public int ActiveCount => _active.Count;

public int MaximumPendingCount { get; private set; }

public void RequestCancellationIfActive(DocumentId documentId)
{
if (_active.TryGetValue(documentId, out var active))
{
foreach (var cts in active)
cts.Cancel();
}
}

public void Enqueue(DocumentId documentId)
{
if (_pendingHash.Add(documentId))
{
_pendingQueue.Enqueue(documentId);

if (_pendingQueue.Count > MaximumPendingCount)
MaximumPendingCount = _pendingQueue.Count;
}
}

public bool IsEnqueued(DocumentId documentId) =>
_pendingHash.Contains(documentId);

public bool IsActive(DocumentId documentId) =>
_active.ContainsKey(documentId);

public void Remove(DocumentId documentId)
{
if (_pendingHash.Contains(documentId))
{
_pendingHash.Remove(documentId);

var backgroundQueueItems = _pendingQueue.ToList();

_pendingQueue.Clear();

foreach (var item in backgroundQueueItems)
{
if (item != documentId)
_pendingQueue.Enqueue(item);
}
}
}

public bool TryDequeue([NotNullWhen(true)] out DocumentId? documentId, [NotNullWhen(true)] out CancellationTokenSource? cancellationTokenSource)
{
if (_pendingQueue.Count > 0)
{
documentId = _pendingQueue.Dequeue();

_pendingHash.Remove(documentId);

if (!_active.TryGetValue(documentId, out var cancellationTokenSources))
_active[documentId] = cancellationTokenSources = new List<CancellationTokenSource>();

cancellationTokenSource = new CancellationTokenSource();

cancellationTokenSources.Add(cancellationTokenSource);

return true;
}

documentId = null;
cancellationTokenSource = null;

return false;
}

public void WorkComplete(DocumentId documentId, CancellationToken cancellationToken)
{
if (_active.TryGetValue(documentId, out var cancellationTokenSources))
{
foreach (var cancellationTokenSource in cancellationTokenSources.ToList())
{
if (cancellationTokenSource.Token == cancellationToken)
{
cancellationTokenSource.Dispose();

cancellationTokenSources.Remove(cancellationTokenSource);

break;
}
}

if (cancellationTokenSources.Count == 0)
_active.Remove(documentId);

var isReenqueued = cancellationToken.IsCancellationRequested
&& (_pendingHash.Contains(documentId) || _active.ContainsKey(documentId));

if (!isReenqueued)
{
foreach (var waiter in _waiters.ToList())
{
if (waiter.DocumentIds.Remove(documentId) && waiter.DocumentIds.Count == 0)
{
waiter.TaskCompletionSource.SetResult(null);

_waiters.Remove(waiter);
}
}
}
}
}

public Task GetWaiter()
{
if (_active.Count == 0 && _pendingQueue.Count == 0)
return Task.CompletedTask;

var documentIds = new HashSet<DocumentId>(_pendingHash.Concat(_active.Keys));

var waiter = _waiters.FirstOrDefault(x => x.DocumentIds.SetEquals(documentIds));

if (waiter == default)
{
waiter = (documentIds, new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously));

_waiters.Add(waiter);
}

return waiter.TaskCompletionSource.Task;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
@@ -23,16 +23,16 @@ namespace OmniSharp.Roslyn.CSharp.Services.Diagnostics
{
public class CSharpDiagnosticWorkerWithAnalyzers : CSharpDiagnosticWorkerBase, IDisposable
{
private readonly AnalyzerWorkQueue _workQueue;
private readonly SemaphoreSlim _throttler;
private readonly AsyncAnalyzerWorkQueue _workQueue;
private readonly ILogger<CSharpDiagnosticWorkerWithAnalyzers> _logger;

private readonly ConcurrentDictionary<DocumentId, DocumentDiagnostics> _currentDiagnosticResultLookup = new();
private readonly ImmutableArray<ICodeActionProvider> _providers;
private readonly DiagnosticEventForwarder _forwarder;
private readonly OmniSharpOptions _options;
private readonly OmniSharpWorkspace _workspace;
private const int WorkerWait = 250;

private int _projectCount = 0;

public CSharpDiagnosticWorkerWithAnalyzers(
OmniSharpWorkspace workspace,
@@ -46,8 +46,7 @@ public CSharpDiagnosticWorkerWithAnalyzers(
{
_logger = loggerFactory.CreateLogger<CSharpDiagnosticWorkerWithAnalyzers>();
_providers = providers.ToImmutableArray();
_workQueue = new AnalyzerWorkQueue(loggerFactory, timeoutForPendingWorkMs: options.RoslynExtensionsOptions.DocumentAnalysisTimeoutMs * 3);
_throttler = new SemaphoreSlim(options.RoslynExtensionsOptions.DiagnosticWorkersThreadCount);
_workQueue = new AsyncAnalyzerWorkQueue(loggerFactory);

_forwarder = forwarder;
_options = options;
@@ -57,8 +56,8 @@ public CSharpDiagnosticWorkerWithAnalyzers(
_workspace.WorkspaceChanged += OnWorkspaceChanged;
_workspace.OnInitialized += OnWorkspaceInitialized;

Task.Factory.StartNew(() => Worker(AnalyzerWorkType.Foreground), TaskCreationOptions.LongRunning);
Task.Factory.StartNew(() => Worker(AnalyzerWorkType.Background), TaskCreationOptions.LongRunning);
for (var i = 0; i < options.RoslynExtensionsOptions.DiagnosticWorkersThreadCount; i++)
Task.Run(Worker);

OnWorkspaceInitialized(_workspace.Initialized);
}
@@ -95,87 +94,73 @@ private async Task<ImmutableArray<DocumentDiagnostics>> GetDiagnosticsByDocument
_workQueue.TryPromote(documentId);
}

await _workQueue.WaitForegroundWorkComplete();
using var cancellationTokenSource = new CancellationTokenSource(_options.RoslynExtensionsOptions.DocumentAnalysisTimeoutMs * 3);

await _workQueue.WaitForegroundWorkComplete(cancellationTokenSource.Token);
}

return documentIds
.Where(x => _currentDiagnosticResultLookup.ContainsKey(x))
.Select(x => _currentDiagnosticResultLookup[x])
.Select(x => _currentDiagnosticResultLookup.TryGetValue(x, out var value) ? value : null)
.Where(x => x != null)
.ToImmutableArray();
}

private async Task Worker(AnalyzerWorkType workType)
private async Task Worker()
{
while (true)
{
AsyncAnalyzerWorkQueue.QueueItem item = null;
DocumentId documentId;
CancellationToken? cancellationToken = null;
AnalyzerWorkType workType;
int documentCount;
int remaining;

try
{
var solution = _workspace.CurrentSolution;
item = await _workQueue.TakeWorkAsync();
(documentId, cancellationToken, workType, documentCount, remaining) = item;

var documents = _workQueue
.TakeWork(workType)
.Select(documentId => (projectId: solution.GetDocument(documentId)?.Project?.Id, documentId))
.Where(x => x.projectId != null)
.ToImmutableArray();

if (documents.IsEmpty)
if (workType == AnalyzerWorkType.Background)
{
_workQueue.WorkComplete(workType);

await Task.Delay(WorkerWait);

continue;
}

var documentCount = documents.Length;
var documentCountRemaining = documentCount;

// event every percentage increase, or every 10th if there are fewer than 1000
var eventEvery = Math.Max(10, documentCount / 100);

var documentsGroupedByProjects = documents
.GroupBy(x => x.projectId, x => x.documentId)
.ToImmutableArray();
var projectCount = documentsGroupedByProjects.Length;
// event every percentage increase, or every 10th if there are fewer than 1000
var eventEvery = Math.Max(10, documentCount / 100);

EventIfBackgroundWork(workType, BackgroundDiagnosticStatus.Started, projectCount, documentCount, documentCountRemaining);
if (documentCount == remaining + 1)
EventIfBackgroundWork(workType, BackgroundDiagnosticStatus.Started, _projectCount, documentCount, remaining);

void decrementDocumentCountRemaining()
{
var remaining = Interlocked.Decrement(ref documentCountRemaining);
var done = documentCount - remaining;
if (done % eventEvery == 0)
{
EventIfBackgroundWork(workType, BackgroundDiagnosticStatus.Progress, projectCount, documentCount, remaining);
}
if (done % eventEvery == 0 || remaining == 0)
EventIfBackgroundWork(workType, BackgroundDiagnosticStatus.Progress, _projectCount, documentCount, remaining);
}

var solution = _workspace.CurrentSolution;
var projectId = solution.GetDocument(documentId)?.Project?.Id;

try
{
var projectAnalyzerTasks =
documentsGroupedByProjects
.Select(projectGroup => Task.Run(async () =>
{
var projectPath = solution.GetProject(projectGroup.Key).FilePath;
await AnalyzeProject(solution, projectGroup, decrementDocumentCountRemaining);
}))
.ToImmutableArray();

await Task.WhenAll(projectAnalyzerTasks);
if (projectId != null)
await AnalyzeDocument(solution, projectId, documentId, cancellationToken.Value);
}
finally
{
EventIfBackgroundWork(workType, BackgroundDiagnosticStatus.Finished, projectCount, documentCount, documentCountRemaining);
if (remaining == 0)
EventIfBackgroundWork(workType, BackgroundDiagnosticStatus.Finished, _projectCount, documentCount, remaining);
}

_workQueue.WorkComplete(workType);

await Task.Delay(WorkerWait);
}
catch (OperationCanceledException) when (cancellationToken != null && cancellationToken.Value.IsCancellationRequested)
{
_logger.LogInformation($"Analyzer work cancelled.");
}
catch (Exception ex)
{
_logger.LogError($"Analyzer worker failed: {ex}");
}
finally
{
if (item != null)
_workQueue.WorkComplete(item);
}
}
}

@@ -185,8 +170,17 @@ private void EventIfBackgroundWork(AnalyzerWorkType workType, BackgroundDiagnost
_forwarder.BackgroundDiagnosticsStatus(status, numberProjects, numberFiles, numberFilesRemaining);
}

private void QueueForAnalysis(ImmutableArray<DocumentId> documentIds, AnalyzerWorkType workType) =>
private void QueueForAnalysis(ImmutableArray<DocumentId> documentIds, AnalyzerWorkType workType)
{
if (workType == AnalyzerWorkType.Background)
{
var solution = _workspace.CurrentSolution;

_projectCount = documentIds.Select(x => solution.GetDocument(x)?.Project?.Id).Distinct().Count(x => x != null);
}

_workQueue.PutWork(documentIds, workType);
}

private void OnWorkspaceChanged(object sender, WorkspaceChangeEventArgs changeEvent)
{
@@ -231,99 +225,73 @@ public override async Task<IEnumerable<Diagnostic>> AnalyzeDocumentAsync(Documen
var allAnalyzers = GetAnalyzersForProject(project);
var compilation = await project.GetCompilationAsync(cancellationToken);

cancellationToken.ThrowIfCancellationRequested();
return await AnalyzeDocument(project, allAnalyzers, compilation, CreateAnalyzerOptions(document.Project), document);
return await AnalyzeDocument(project, allAnalyzers, compilation, CreateAnalyzerOptions(document.Project), document, cancellationToken);
}

public override async Task<IEnumerable<Diagnostic>> AnalyzeProjectsAsync(Project project, CancellationToken cancellationToken)
{
var allAnalyzers = GetAnalyzersForProject(project);
var compilation = await project.GetCompilationAsync(cancellationToken);
var workspaceAnalyzerOptions = CreateAnalyzerOptions(project);
var documentAnalyzerTasks = new List<Task>();
var diagnostics = ImmutableList<Diagnostic>.Empty;

foreach (var document in project.Documents)
{
await _throttler.WaitAsync(cancellationToken);
var documentIds = project.DocumentIds.ToImmutableArray();

documentAnalyzerTasks.Add(Task.Run(async () =>
{
try
{
var documentDiagnostics = await AnalyzeDocument(project, allAnalyzers, compilation, workspaceAnalyzerOptions, document);
ImmutableInterlocked.Update(ref diagnostics, currentDiagnostics => currentDiagnostics.AddRange(documentDiagnostics));
}
finally
{
_throttler.Release();
}
}, cancellationToken));
}
QueueForAnalysis(documentIds, AnalyzerWorkType.Foreground);

await Task.WhenAll(documentAnalyzerTasks);
await _workQueue.WaitForegroundWorkComplete(cancellationToken);

return diagnostics;
return documentIds
.Select(x => _currentDiagnosticResultLookup.TryGetValue(x, out var value) ? value : null)
.Where(x => x != null)
.SelectMany(x => x.Diagnostics)
.ToImmutableArray();
}

private async Task AnalyzeProject(Solution solution, IGrouping<ProjectId, DocumentId> documentsGroupedByProject, Action decrementRemaining)
private async Task AnalyzeDocument(Solution solution, ProjectId projectId, DocumentId documentId, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

try
{
var project = solution.GetProject(documentsGroupedByProject.Key);
var project = solution.GetProject(projectId);
var allAnalyzers = GetAnalyzersForProject(project);
var compilation = await project.GetCompilationAsync();
var workspaceAnalyzerOptions = CreateAnalyzerOptions(project);
var documentAnalyzerTasks = new List<Task>();
var document = project.GetDocument(documentId);

foreach (var documentId in documentsGroupedByProject)
{
await _throttler.WaitAsync();
var diagnostics = await AnalyzeDocument(project, allAnalyzers, compilation, workspaceAnalyzerOptions, document, cancellationToken);

documentAnalyzerTasks.Add(Task.Run(async () =>
{
try
{
var document = project.GetDocument(documentId);
var diagnostics = await AnalyzeDocument(project, allAnalyzers, compilation, workspaceAnalyzerOptions, document);
UpdateCurrentDiagnostics(project, document, diagnostics);
decrementRemaining();
}
finally
{
_throttler.Release();
}
}));
}

await Task.WhenAll(documentAnalyzerTasks);
UpdateCurrentDiagnostics(project, document, diagnostics);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
_logger.LogError($"Analysis of project {documentsGroupedByProject.Key} failed, underlaying error: {ex}");
_logger.LogError($"Analysis of document {documentId} failed, underlying error: {ex}");
}
}

private async Task<ImmutableArray<Diagnostic>> AnalyzeDocument(Project project, ImmutableArray<DiagnosticAnalyzer> allAnalyzers, Compilation compilation, AnalyzerOptions workspaceAnalyzerOptions, Document document)
private async Task<ImmutableArray<Diagnostic>> AnalyzeDocument(Project project, ImmutableArray<DiagnosticAnalyzer> allAnalyzers, Compilation compilation, AnalyzerOptions workspaceAnalyzerOptions, Document document, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

// There's real possibility that bug in analyzer causes analysis hang at document.
using var perDocumentTimeout =
new CancellationTokenSource(_options.RoslynExtensionsOptions.DocumentAnalysisTimeoutMs);
using var combinedCancellation = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, perDocumentTimeout.Token);

try
{
// There's real possibility that bug in analyzer causes analysis hang at document.
CancellationToken cancellationToken = new CancellationTokenSource(
_options.RoslynExtensionsOptions.DocumentAnalysisTimeoutMs)
.Token;
var documentSemanticModel = await document.GetSemanticModelAsync(combinedCancellation.Token);

// Analyzers cannot be called with empty analyzer list.
bool canDoFullAnalysis = allAnalyzers.Length > 0
&& (!_options.RoslynExtensionsOptions.AnalyzeOpenDocumentsOnly
|| _workspace.IsDocumentOpen(document.Id));

SemanticModel documentSemanticModel = await document.GetSemanticModelAsync(cancellationToken);
SyntaxTree syntaxTree = documentSemanticModel.SyntaxTree;

SyntaxTreeOptionsProvider provider = compilation.Options.SyntaxTreeOptionsProvider;
GeneratedKind kind = provider.IsGenerated(syntaxTree, cancellationToken);
if (kind is GeneratedKind.MarkedGenerated || syntaxTree.IsAutoGenerated(cancellationToken))
GeneratedKind kind = provider.IsGenerated(syntaxTree, combinedCancellation.Token);
if (kind is GeneratedKind.MarkedGenerated || syntaxTree.IsAutoGenerated(combinedCancellation.Token))
{
return Enumerable.Empty<Diagnostic>().ToImmutableArray();
}
@@ -332,12 +300,12 @@ private async Task<ImmutableArray<Diagnostic>> AnalyzeDocument(Project project,
// Those projects are on hard coded virtual project
if (project.Name == $"{Configuration.OmniSharpMiscProjectName}.csproj")
{
return syntaxTree.GetDiagnostics().ToImmutableArray();
return syntaxTree.GetDiagnostics(cancellationToken: combinedCancellation.Token).ToImmutableArray();
}

if (!canDoFullAnalysis)
{
return documentSemanticModel.GetDiagnostics();
return documentSemanticModel.GetDiagnostics(cancellationToken: combinedCancellation.Token);
}

CompilationWithAnalyzers compilationWithAnalyzers = compilation.WithAnalyzers(allAnalyzers, new CompilationWithAnalyzersOptions(
@@ -348,12 +316,12 @@ private async Task<ImmutableArray<Diagnostic>> AnalyzeDocument(Project project,
reportSuppressedDiagnostics: false));

Task<ImmutableArray<Diagnostic>> semanticDiagnosticsWithAnalyzers = compilationWithAnalyzers
.GetAnalyzerSemanticDiagnosticsAsync(documentSemanticModel, filterSpan: null, cancellationToken);
.GetAnalyzerSemanticDiagnosticsAsync(documentSemanticModel, filterSpan: null, combinedCancellation.Token);

Task<ImmutableArray<Diagnostic>> syntaxDiagnosticsWithAnalyzers = compilationWithAnalyzers
.GetAnalyzerSyntaxDiagnosticsAsync(syntaxTree, cancellationToken);
.GetAnalyzerSyntaxDiagnosticsAsync(syntaxTree, combinedCancellation.Token);

ImmutableArray<Diagnostic> documentSemanticDiagnostics = documentSemanticModel.GetDiagnostics(null, cancellationToken);
ImmutableArray<Diagnostic> documentSemanticDiagnostics = documentSemanticModel.GetDiagnostics(null, combinedCancellation.Token);

await Task.WhenAll(syntaxDiagnosticsWithAnalyzers, semanticDiagnosticsWithAnalyzers);

@@ -363,6 +331,10 @@ private async Task<ImmutableArray<Diagnostic>> AnalyzeDocument(Project project,
.Concat(documentSemanticDiagnostics)
.ToImmutableArray();
}
catch (OperationCanceledException) when (combinedCancellation.Token.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
_logger.LogError($"Analysis of document {document.Name} failed or cancelled by timeout: {ex.Message}, analysers: {string.Join(", ", allAnalyzers)}");
363 changes: 363 additions & 0 deletions tests/OmniSharp.Roslyn.CSharp.Tests/AsyncAnalyzerWorkerQueueFacts.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,363 @@
using System;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis;
using Microsoft.Extensions.Logging;
using OmniSharp.Roslyn.CSharp.Workers.Diagnostics;
using Xunit;

namespace OmniSharp.Roslyn.CSharp.Tests
{
#pragma warning disable VSTHRD103 // Call async methods when in an async method
public class AsyncAnalyzerWorkerQueueFacts
{
private class Logger : ILogger
{
public IDisposable BeginScope<TState>(TState state)
{
return null;
}

public bool IsEnabled(LogLevel logLevel) => true;

public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
RecordedMessages = RecordedMessages.Add(state.ToString());
}

public ImmutableArray<string> RecordedMessages { get; set; } = ImmutableArray.Create<string>();
}

private class LoggerFactory : ILoggerFactory
{
public Logger Logger { get; } = new Logger();

public void AddProvider(ILoggerProvider provider)
{
}

public ILogger CreateLogger(string categoryName)
{
return Logger;
}

public void Dispose()
{
}
}

[Theory]
[InlineData(AnalyzerWorkType.Background)]
[InlineData(AnalyzerWorkType.Foreground)]
public async Task WhenWorksIsAddedToQueueThenTheyWillBeReturned(AnalyzerWorkType workType)
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document = CreateTestDocumentId();

queue.PutWork(new[] { document }, workType);

var work = await queue.TakeWorkAsyncWithTimeout();

Assert.Equal(document, work.DocumentId);
Assert.Equal(0, queue.PendingCount);
}

[Fact]
public async Task WhenForegroundWorkIsAddedThenWaitNextIterationOfItReady()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document = CreateTestDocumentId();

queue.PutWork(new[] { document }, AnalyzerWorkType.Foreground);

var pendingTask = queue.WaitForegroundWorkCompleteWithTimeout(500);

Assert.False(pendingTask.IsCompleted);

var work = await queue.TakeWorkAsyncWithTimeout();

queue.WorkComplete(work);

pendingTask.Wait(TimeSpan.FromMilliseconds(50));

Assert.True(pendingTask.IsCompleted);
}

[Fact]
public async Task WhenForegroundWorkIsUnderAnalysisOutFromQueueThenWaitUntilNextIterationOfItIsReady()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document = CreateTestDocumentId();

queue.PutWork(new[] { document }, AnalyzerWorkType.Foreground);

var work = await queue.TakeWorkAsync();

var pendingTask = queue.WaitForegroundWorkCompleteWithTimeout(500);
pendingTask.Wait(TimeSpan.FromMilliseconds(50));

Assert.False(pendingTask.IsCompleted);
queue.WorkComplete(work);
pendingTask.Wait(TimeSpan.FromMilliseconds(50));
Assert.True(pendingTask.IsCompleted);
}

[Fact]
public async Task WhenMultipleThreadsAreConsumingAnalyzerWorkerQueueItWorksAsExpected()
{
var now = DateTime.UtcNow;

var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());

var parallelQueues =
Enumerable.Range(0, 10)
.Select(_ =>
Task.Run(async () =>
{
var document = CreateTestDocumentId();

queue.PutWork(new[] { document }, AnalyzerWorkType.Foreground);

var work = await queue.TakeWorkAsync();

var pendingTask = queue.WaitForegroundWorkCompleteWithTimeout(1000);

var pendingTask2 = queue.WaitForegroundWorkCompleteWithTimeout(1000);

pendingTask.Wait(TimeSpan.FromMilliseconds(300));
}))
.ToArray();

await Task.WhenAll(parallelQueues);

Assert.Equal(0, queue.PendingCount);
}

[Fact]
public async Task WhenNewWorkIsAddedAgainWhenPreviousIsAnalysing_ThenDontWaitAnotherOneToGetReady()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document1 = CreateTestDocumentId();
var document2 = CreateTestDocumentId();

queue.PutWork(new[] { document1 }, AnalyzerWorkType.Foreground);

var work = await queue.TakeWorkAsync();
var waitingCall = Task.Run(async () => await queue.WaitForegroundWorkCompleteWithTimeout(10 * 1000));
await Task.Delay(50);

// User updates code -> document is queued again during period when theres already api call waiting
// to continue.
queue.PutWork(new[] { document2 }, AnalyzerWorkType.Foreground);

// First iteration of work is done.
queue.WorkComplete(work);

// Waiting call continues because its iteration of work is done, even when theres next
// already waiting.
waitingCall.Wait(50);

Assert.True(waitingCall.IsCompleted);
}

[Fact]
public async Task WhenWorkIsAddedAgainWhenPreviousIsAnalysing_ThenContinueWaiting()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document = CreateTestDocumentId();

queue.PutWork(new[] { document }, AnalyzerWorkType.Foreground);

var work = await queue.TakeWorkAsync();
var waitingCall = Task.Run(async () => await queue.WaitForegroundWorkCompleteWithTimeout(10 * 1000));
await Task.Delay(50);

// User updates code -> document is queued again during period when theres already api call waiting
// to continue.
queue.PutWork(new[] { document }, AnalyzerWorkType.Foreground);

// First iteration of work is done.
queue.WorkComplete(work);

// Waiting call continues because its iteration of work is done, even when theres next
// already waiting.
waitingCall.Wait(50);

Assert.False(waitingCall.IsCompleted);
}

[Fact]
public void WhenBackgroundWorkIsAdded_DontWaitIt()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document = CreateTestDocumentId();

queue.PutWork(new[] { document }, AnalyzerWorkType.Background);

Assert.True(queue.WaitForegroundWorkComplete().IsCompleted);
}

[Fact]
public void WhenSingleFileIsPromoted_ThenPromoteItFromBackgroundQueueToForeground()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document = CreateTestDocumentId();

queue.PutWork(new[] { document }, AnalyzerWorkType.Background);

queue.TryPromote(document);

Assert.NotEqual(0, queue.PendingCount);
}

[Fact]
public void WhenFileIsntAtBackgroundQueueAndTriedToBePromoted_ThenDontDoNothing()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document = CreateTestDocumentId();

queue.TryPromote(document);

Assert.Equal(0, queue.PendingCount);
}

[Fact]
public async Task WhenFileIsProcessingInBackgroundQueue_ThenPromoteItAsForeground()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document = CreateTestDocumentId();

queue.PutWork(new[] { document }, AnalyzerWorkType.Background);

await queue.TakeWorkAsyncWithTimeout();

queue.TryPromote(document);

await queue.TakeWorkAsyncWithTimeout();
}

[Fact]
public async Task WhenFileIsAddedMultipleTimes_DuplicatesAreIgnored()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document = CreateTestDocumentId();

queue.PutWork(new[] { document }, AnalyzerWorkType.Background);

queue.PutWork(new[] { document }, AnalyzerWorkType.Background);

await queue.TakeWorkAsyncWithTimeout();

await Assert.ThrowsAsync<OperationCanceledException>(() =>
queue.TakeWorkAsyncWithTimeout());
}

[Fact]
public async Task WhenFileIsAddedWhileProcessing_ThePeviousRunIsCancelled()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document = CreateTestDocumentId();

queue.PutWork(new[] { document }, AnalyzerWorkType.Background);

var result = await queue.TakeWorkAsyncWithTimeout();

Assert.False(result.CancellationToken.IsCancellationRequested);

queue.PutWork(new[] { document }, AnalyzerWorkType.Background);

Assert.True(result.CancellationToken.IsCancellationRequested);
}

[Fact]
public async Task WhenQueueIsEmpty_TakeWorkRespondsToCancellation()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document = CreateTestDocumentId();

await Assert.ThrowsAsync<OperationCanceledException>(() =>
queue.TakeWorkAsyncWithTimeout());
}

[Fact]
public async Task WhenAwaitingForForgroundWork_CancellationIsHandled()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document = CreateTestDocumentId();

queue.PutWork(new[] { document }, AnalyzerWorkType.Foreground);

var isCancelled = await queue.WaitForegroundWorkCompleteWithTimeout(50);

Assert.True(isCancelled);
}

[Fact]
public async Task WhenDequeingWork_ItsReturnedInOrderForgroundFirst()
{
var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory());
var document1 = CreateTestDocumentId();
var document2 = CreateTestDocumentId();
var document3 = CreateTestDocumentId();
var document4 = CreateTestDocumentId();

queue.PutWork(new[] { document3 }, AnalyzerWorkType.Background);

queue.PutWork(new[] { document1 }, AnalyzerWorkType.Foreground);

queue.PutWork(new[] { document4 }, AnalyzerWorkType.Background);

queue.PutWork(new[] { document2 }, AnalyzerWorkType.Foreground);

var result1 = await queue.TakeWorkAsyncWithTimeout();

Assert.Equal(document1, result1.DocumentId);

var result2 = await queue.TakeWorkAsyncWithTimeout();

Assert.Equal(document2, result2.DocumentId);

var result3 = await queue.TakeWorkAsyncWithTimeout();

Assert.Equal(document3, result3.DocumentId);

var result4 = await queue.TakeWorkAsyncWithTimeout();

Assert.Equal(document4, result4.DocumentId);
}

private static DocumentId CreateTestDocumentId()
{
var projectInfo = ProjectInfo.Create(
id: ProjectId.CreateNewId(),
version: VersionStamp.Create(),
name: "testProject",
assemblyName: "AssemblyName",
language: LanguageNames.CSharp);

return DocumentId.CreateNewId(projectInfo.Id);
}
}

public static class AsyncAnalyzerWorkerQueueFactsExtensions
{
public static async Task<AsyncAnalyzerWorkQueue.QueueItem> TakeWorkAsyncWithTimeout(this AsyncAnalyzerWorkQueue queue)
{
using var cts = new CancellationTokenSource(50);

return await queue.TakeWorkAsync(cts.Token);
}

public static async Task<bool> WaitForegroundWorkCompleteWithTimeout(this AsyncAnalyzerWorkQueue queue, int timeout)
{
using var cts = new CancellationTokenSource(timeout);

await queue.WaitForegroundWorkComplete(cts.Token);

return cts.Token.IsCancellationRequested;
}
}
#pragma warning restore VSTHRD103 // Call async methods when in an async method
}
71 changes: 50 additions & 21 deletions tests/OmniSharp.Roslyn.CSharp.Tests/TestEventEmitter.cs
Original file line number Diff line number Diff line change
@@ -1,46 +1,75 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using OmniSharp.Eventing;

namespace OmniSharp.Roslyn.CSharp.Tests
{
public class TestEventEmitter<T> : IEventEmitter
{
private readonly object _lock = new();
private readonly List<T> _messages = new();
private readonly List<(Predicate<T> Predicate, TaskCompletionSource<object> TaskCompletionSource)> _predicates = new();

public async Task ExpectForEmitted(Expression<Predicate<T>> predicate)
{
public ImmutableArray<T> Messages { get; private set; } = ImmutableArray<T>.Empty;
var asCompiledPredicate = predicate.Compile();
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);

public async Task ExpectForEmitted(Expression<Predicate<T>> predicate)
lock (_lock)
{
var asCompiledPredicate = predicate.Compile();
if (_messages.Any(m => asCompiledPredicate(m)))
return;

// May seem hacky but nothing is more painfull to debug than infinite hanging test ...
for(int i = 0; i < 100; i++)
{
if(Messages.Any(m => asCompiledPredicate(m)))
{
return;
}
_predicates.Add((asCompiledPredicate, tcs));
}

await Task.Delay(250);
}
try
{
using var cts = new CancellationTokenSource(25000);

throw new InvalidOperationException($"Timeout reached before expected event count reached before prediction {predicate} came true, current diagnostics '{String.Join(";", Messages)}'");
cts.Token.Register(() => tcs.SetCanceled());

await tcs.Task;
}
catch (OperationCanceledException)
{
var messages = string.Join(";", _messages.Select(x => JsonConvert.SerializeObject(x)));

public void Clear()
throw new InvalidOperationException($"Timeout reached before expected event count reached before prediction {predicate} came true, current diagnostics '{messages}'");
}
finally
{
Messages = ImmutableArray<T>.Empty;
lock (_lock)
_predicates.Remove((asCompiledPredicate, tcs));
}
}

public void Clear()
{
lock (_lock)
_messages.Clear();
}

public void Emit(string kind, object args)
public void Emit(string kind, object args)
{
if (args is T asT)
{
if(args is T asT)
lock (_lock)
{
Messages = Messages.Add(asT);
_messages.Add(asT);

foreach (var (predicate, tcs) in _predicates)
{
if (predicate(asT))
tcs.SetResult(null);
}
}
}
}
}
}
}