diff --git a/.gitignore b/.gitignore index ef7de78c..aef64a09 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,9 @@ obj/ # Visual Studio IDE directory .vs/ +# VSCode directories that are not at the repository root +/**/.vscode/ + # Project Rider IDE files .idea.powershell/ diff --git a/src/Logging/ILogger.cs b/src/Logging/ILogger.cs index b8cb2522..1513cc27 100644 --- a/src/Logging/ILogger.cs +++ b/src/Logging/ILogger.cs @@ -11,5 +11,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Utility internal interface ILogger { void Log(LogLevel logLevel, string message, Exception exception = null, bool isUserLog = false); + void SetContext(string requestId, string invocationId); + void ResetContext(); } } diff --git a/src/Logging/RpcLogger.cs b/src/Logging/RpcLogger.cs index d44b50a0..e856e378 100644 --- a/src/Logging/RpcLogger.cs +++ b/src/Logging/RpcLogger.cs @@ -12,7 +12,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Utility { - internal class RpcLogger : ILogger, IDisposable + internal class RpcLogger : ILogger { private const string SystemLogPrefix = "LanguageWorkerConsoleLog"; private readonly MessagingStream _msgStream; @@ -20,26 +20,25 @@ internal class RpcLogger : ILogger, IDisposable private string _invocationId; private string _requestId; - public RpcLogger(MessagingStream msgStream) + internal RpcLogger(MessagingStream msgStream) { _msgStream = msgStream; _systemLogMsg = new StringBuilder(); } - public IDisposable BeginScope(string requestId, string invocationId) + public void SetContext(string requestId, string invocationId) { _requestId = requestId; _invocationId = invocationId; - return this; } - public void Dispose() + public void ResetContext() { _requestId = null; _invocationId = null; } - public async void Log(LogLevel logLevel, string message, Exception exception = null, bool isUserLog = false) + public void Log(LogLevel logLevel, string message, Exception exception = null, bool isUserLog = false) { if (isUserLog) { @@ -56,7 +55,7 @@ public async void Log(LogLevel logLevel, string message, Exception exception = n } }; - await _msgStream.WriteAsync(logMessage); + _msgStream.Write(logMessage); } else { diff --git a/src/Messaging/MessagingStream.cs b/src/Messaging/MessagingStream.cs index 6b3bb199..9cbf3b75 100644 --- a/src/Messaging/MessagingStream.cs +++ b/src/Messaging/MessagingStream.cs @@ -4,6 +4,7 @@ // using System; +using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; @@ -12,47 +13,44 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Messaging { - internal class MessagingStream : IDisposable + internal class MessagingStream { - private SemaphoreSlim _writeSemaphore = new SemaphoreSlim(1, 1); - private AsyncDuplexStreamingCall _call; - private bool isDisposed; + private readonly AsyncDuplexStreamingCall _call; + private readonly BlockingCollection _msgQueue; - public MessagingStream(string host, int port) + internal MessagingStream(string host, int port) { Channel channel = new Channel(host, port, ChannelCredentials.Insecure); _call = new FunctionRpc.FunctionRpcClient(channel).EventStream(); - } - public void Dispose() - { - if (!isDisposed) - { - isDisposed = true; - _call.Dispose(); - } + _msgQueue = new BlockingCollection(); + Task.Run(WriteImplAsync); } - public StreamingMessage GetCurrentMessage() => - isDisposed ? null : _call.ResponseStream.Current; - - public async Task MoveNext() => - !isDisposed && await _call.ResponseStream.MoveNext(CancellationToken.None); - - public async Task WriteAsync(StreamingMessage message) + /// + /// Get the current message. + /// + internal StreamingMessage GetCurrentMessage() => _call.ResponseStream.Current; + + /// + /// Move to the next message. + /// + internal async Task MoveNext() => await _call.ResponseStream.MoveNext(CancellationToken.None); + + /// + /// Write the outgoing message to the buffer. + /// + internal void Write(StreamingMessage message) => _msgQueue.Add(message); + + /// + /// Take a message from the buffer and write to the gRPC channel. + /// + private async Task WriteImplAsync() { - if(isDisposed) return; - - // Wait for the handle to be released because we can't have - // more than one message being sent at the same time - await _writeSemaphore.WaitAsync(); - try - { - await _call.RequestStream.WriteAsync(message); - } - finally + while (true) { - _writeSemaphore.Release(); + StreamingMessage msg = _msgQueue.Take(); + await _call.RequestStream.WriteAsync(msg); } } } diff --git a/src/PowerShell/PowerShellManager.cs b/src/PowerShell/PowerShellManager.cs index 1a929a78..9bd52ae7 100644 --- a/src/PowerShell/PowerShellManager.cs +++ b/src/PowerShell/PowerShellManager.cs @@ -28,6 +28,11 @@ internal class PowerShellManager /// internal Guid InstanceId => _pwsh.Runspace.InstanceId; + /// + /// Gets the associated logger. + /// + internal ILogger Logger => _logger; + static PowerShellManager() { // Set the type accelerators for 'HttpResponseContext' and 'HttpResponseContext'. @@ -48,6 +53,7 @@ internal PowerShellManager(ILogger logger) } var initialSessionState = InitialSessionState.CreateDefault(); + initialSessionState.ThreadOptions = PSThreadOptions.ReuseThread; initialSessionState.EnvironmentVariables.Add( new SessionStateVariableEntry("PSModulePath", FunctionLoader.FunctionModulePath, null)); @@ -154,16 +160,11 @@ internal Hashtable InvokeFunction( // Gives access to additional Trigger Metadata if the user specifies TriggerMetadata if(functionInfo.FuncParameters.Contains(AzFunctionInfo.TriggerMetadata)) { - _logger.Log(LogLevel.Debug, "Parameter '-TriggerMetadata' found."); _pwsh.AddParameter(AzFunctionInfo.TriggerMetadata, triggerMetadata); } - Collection pipelineItems = null; - using (ExecutionTimer.Start(_logger, "Execution of the user's function completed.")) - { - pipelineItems = _pwsh.AddCommand("Microsoft.Azure.Functions.PowerShellWorker\\Trace-PipelineObject") - .InvokeAndClearCommands(); - } + Collection pipelineItems = _pwsh.AddCommand("Microsoft.Azure.Functions.PowerShellWorker\\Trace-PipelineObject") + .InvokeAndClearCommands(); var result = _pwsh.AddCommand("Microsoft.Azure.Functions.PowerShellWorker\\Get-OutputBinding") .AddParameter("Purge", true) diff --git a/src/PowerShell/PowerShellManagerPool.cs b/src/PowerShell/PowerShellManagerPool.cs index 06b0c583..4ac5dc71 100644 --- a/src/PowerShell/PowerShellManagerPool.cs +++ b/src/PowerShell/PowerShellManagerPool.cs @@ -4,7 +4,12 @@ // using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.PowerShellWorker.Messaging; using Microsoft.Azure.Functions.PowerShellWorker.Utility; +using Microsoft.Azure.WebJobs.Script.Grpc.Messages; namespace Microsoft.Azure.Functions.PowerShellWorker.PowerShell { @@ -15,37 +20,80 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.PowerShell /// internal class PowerShellManagerPool { - private readonly ILogger _logger; - // Today we don't really support the in-proc concurrency. We just hold an instance of PowerShellManager in this field. - private PowerShellManager _psManager; + private readonly int _upperBound; + private readonly MessagingStream _msgStream; + private readonly BlockingCollection _pool; + private int _poolSize; /// /// Constructor of the pool. /// - internal PowerShellManagerPool(ILogger logger) + internal PowerShellManagerPool(MessagingStream msgStream) { - _logger = logger; + string upperBound = Environment.GetEnvironmentVariable("PSWorkerInProcConcurrencyUpperBound"); + if (string.IsNullOrEmpty(upperBound) || !int.TryParse(upperBound, out _upperBound)) + { + _upperBound = 1; + } + + _msgStream = msgStream; + _pool = new BlockingCollection(_upperBound); } /// /// Initialize the pool and populate it with PowerShellManager instances. - /// When it's time to really implement this pool, we probably should instantiate PowerShellManager instances in a lazy way. - /// Maybe start from size 1 and increase the number of workers as needed. + /// We instantiate PowerShellManager instances in a lazy way, starting from size 1 and increase the number of workers as needed. /// - internal void Initialize() + internal void Initialize(string requestId) { - _psManager = new PowerShellManager(_logger); + var logger = new RpcLogger(_msgStream); + + try + { + logger.SetContext(requestId, invocationId: null); + _pool.Add(new PowerShellManager(logger)); + _poolSize = 1; + } + finally + { + logger.ResetContext(); + } } /// - /// Checkout an idle PowerShellManager instance. - /// When it's time to really implement this pool, this method is supposed to block when there is no idle instance available. + /// Checkout an idle PowerShellManager instance in a non-blocking asynchronous way. /// - internal PowerShellManager CheckoutIdleWorker(AzFunctionInfo functionInfo) + internal PowerShellManager CheckoutIdleWorker(StreamingMessage request, AzFunctionInfo functionInfo) { + PowerShellManager psManager = null; + string requestId = request.RequestId; + string invocationId = request.InvocationRequest?.InvocationId; + + // If the pool has an idle one, just use it. + if (!_pool.TryTake(out psManager)) + { + // The pool doesn't have an idle one. + if (_poolSize < _upperBound && + Interlocked.Increment(ref _poolSize) <= _upperBound) + { + // If the pool hasn't reached its bounded capacity yet, then + // we create a new item and return it. + var logger = new RpcLogger(_msgStream); + logger.SetContext(requestId, invocationId); + psManager = new PowerShellManager(logger); + } + else + { + // If the pool has reached its bounded capacity, then the thread + // should be blocked until an idle one becomes available. + psManager = _pool.Take(); + } + } + // Register the function with the Runspace before returning the idle PowerShellManager. - FunctionMetadata.RegisterFunctionMetadata(_psManager.InstanceId, functionInfo); - return _psManager; + FunctionMetadata.RegisterFunctionMetadata(psManager.InstanceId, functionInfo); + psManager.Logger.SetContext(requestId, invocationId); + return psManager; } /// @@ -57,6 +105,8 @@ internal void ReclaimUsedWorker(PowerShellManager psManager) { // Unregister the Runspace before reclaiming the used PowerShellManager. FunctionMetadata.UnregisterFunctionMetadata(psManager.InstanceId); + psManager.Logger.ResetContext(); + _pool.Add(psManager); } } } diff --git a/src/RequestProcessor.cs b/src/RequestProcessor.cs index 6f005d2e..b3df0627 100644 --- a/src/RequestProcessor.cs +++ b/src/RequestProcessor.cs @@ -18,7 +18,6 @@ namespace Microsoft.Azure.Functions.PowerShellWorker internal class RequestProcessor { private readonly FunctionLoader _functionLoader; - private readonly RpcLogger _logger; private readonly MessagingStream _msgStream; private readonly PowerShellManagerPool _powershellPool; @@ -28,38 +27,34 @@ internal class RequestProcessor internal RequestProcessor(MessagingStream msgStream) { _msgStream = msgStream; - _logger = new RpcLogger(msgStream); - _powershellPool = new PowerShellManagerPool(_logger); + _powershellPool = new PowerShellManagerPool(msgStream); _functionLoader = new FunctionLoader(); } internal async Task ProcessRequestLoop() { - using (_msgStream) + StreamingMessage request, response; + while (await _msgStream.MoveNext()) { - StreamingMessage request, response; - while (await _msgStream.MoveNext()) + request = _msgStream.GetCurrentMessage(); + switch (request.ContentCase) { - request = _msgStream.GetCurrentMessage(); + case StreamingMessage.ContentOneofCase.WorkerInitRequest: + response = ProcessWorkerInitRequest(request); + break; + case StreamingMessage.ContentOneofCase.FunctionLoadRequest: + response = ProcessFunctionLoadRequest(request); + break; + case StreamingMessage.ContentOneofCase.InvocationRequest: + response = ProcessInvocationRequest(request); + break; + default: + throw new InvalidOperationException($"Unsupported message type: {request.ContentCase}"); + } - using (_logger.BeginScope(request.RequestId, request.InvocationRequest?.InvocationId)) - { - switch (request.ContentCase) - { - case StreamingMessage.ContentOneofCase.WorkerInitRequest: - response = ProcessWorkerInitRequest(request); - break; - case StreamingMessage.ContentOneofCase.FunctionLoadRequest: - response = ProcessFunctionLoadRequest(request); - break; - case StreamingMessage.ContentOneofCase.InvocationRequest: - response = ProcessInvocationRequest(request); - break; - default: - throw new InvalidOperationException($"Not supportted message type: {request.ContentCase}"); - } - } - await _msgStream.WriteAsync(response); + if (response != null) + { + _msgStream.Write(response); } } } @@ -98,7 +93,7 @@ internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request) if (!_isFunctionAppInitialized) { FunctionLoader.SetupWellKnownPaths(functionLoadRequest); - _powershellPool.Initialize(); + _powershellPool.Initialize(request.RequestId); _isFunctionAppInitialized = true; } @@ -116,13 +111,45 @@ internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request) /// /// Method to process a InvocationRequest. - /// InvocationRequest should be processed in parallel eventually. + /// This method checks out a worker from the pool, and then starts the actual invocation in a threadpool thread. /// internal StreamingMessage ProcessInvocationRequest(StreamingMessage request) { + AzFunctionInfo functionInfo = null; PowerShellManager psManager = null; - InvocationRequest invocationRequest = request.InvocationRequest; + try + { + functionInfo = _functionLoader.GetFunctionInfo(request.InvocationRequest.FunctionId); + psManager = _powershellPool.CheckoutIdleWorker(request, functionInfo); + Task.Run(() => ProcessInvocationRequestImpl(request, functionInfo, psManager)); + } + catch (Exception e) + { + _powershellPool.ReclaimUsedWorker(psManager); + + StreamingMessage response = NewStreamingMessageTemplate( + request.RequestId, + StreamingMessage.ContentOneofCase.InvocationResponse, + out StatusResult status); + + response.InvocationResponse.InvocationId = request.InvocationRequest.InvocationId; + status.Status = StatusResult.Types.Status.Failure; + status.Exception = e.ToRpcException(); + + return response; + } + + return null; + } + + /// + /// Implementation method to actual invoke the corresponding function. + /// InvocationRequest messages are processed in parallel when there are multiple PowerShellManager instances in the pool. + /// + private void ProcessInvocationRequestImpl(StreamingMessage request, AzFunctionInfo functionInfo, PowerShellManager psManager) + { + InvocationRequest invocationRequest = request.InvocationRequest; StreamingMessage response = NewStreamingMessageTemplate( request.RequestId, StreamingMessage.ContentOneofCase.InvocationResponse, @@ -131,10 +158,6 @@ internal StreamingMessage ProcessInvocationRequest(StreamingMessage request) try { - // Load information about the function - var functionInfo = _functionLoader.GetFunctionInfo(invocationRequest.FunctionId); - psManager = _powershellPool.CheckoutIdleWorker(functionInfo); - // Invoke the function and return a hashtable of out binding data Hashtable results = functionInfo.Type == AzFunctionType.OrchestrationFunction ? InvokeOrchestrationFunction(psManager, functionInfo, invocationRequest) @@ -152,7 +175,7 @@ internal StreamingMessage ProcessInvocationRequest(StreamingMessage request) _powershellPool.ReclaimUsedWorker(psManager); } - return response; + _msgStream.Write(response); } #region Helper_Methods diff --git a/src/Utility/ExecutionTimer.cs b/src/Utility/ExecutionTimer.cs deleted file mode 100644 index 33769236..00000000 --- a/src/Utility/ExecutionTimer.cs +++ /dev/null @@ -1,78 +0,0 @@ -// -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. -// - -using System; -using System.Diagnostics; -using System.Text; -using LogLevel = Microsoft.Azure.WebJobs.Script.Grpc.Messages.RpcLog.Types.Level; - -namespace Microsoft.Azure.Functions.PowerShellWorker.Utility -{ - /// - /// Simple timer to be used with `using` to time executions. - /// - /// - /// An example showing how ExecutionTimer is intended to be used - /// - /// using (ExecutionTimer.Start(logger, "Execution of MyMethod completed.")) - /// { - /// MyMethod(various, arguments); - /// } - /// - /// This will print a message like "Execution of MyMethod completed. [50ms]" to the logs. - /// - internal struct ExecutionTimer : IDisposable - { - static Stopwatch s_stopwatch => s_threadStaticStopwatch ?? (s_threadStaticStopwatch = new Stopwatch()); - [ThreadStatic] - static Stopwatch s_threadStaticStopwatch; - - readonly ILogger _logger; - readonly string _message; - - /// - /// Create a new execution timer and start it. - /// - /// The logger to log the execution timer message in. - /// The message to prefix the execution time with. - /// A new, started execution timer. - public static ExecutionTimer Start( - ILogger logger, - string message) - { - var timer = new ExecutionTimer(logger, message); - s_stopwatch.Start(); - return timer; - } - - internal ExecutionTimer( - ILogger logger, - string message) - { - _logger = logger; - _message = message; - } - - /// - /// Dispose of the execution timer by stopping the stopwatch and then printing - /// the elapsed time in the logs. - /// - public void Dispose() - { - s_stopwatch.Stop(); - - string logMessage = new StringBuilder() - .Append(_message) - .Append(" [") - .Append(s_stopwatch.ElapsedMilliseconds) - .Append("ms]") - .ToString(); - - _logger.Log(LogLevel.Trace, logMessage); - - s_stopwatch.Reset(); - } - } -} diff --git a/src/Worker.cs b/src/Worker.cs index 15d4d7cf..75cd8051 100644 --- a/src/Worker.cs +++ b/src/Worker.cs @@ -38,7 +38,7 @@ public async static Task Main(string[] args) StartStream = new StartStream() { WorkerId = arguments.WorkerId } }; - await msgStream.WriteAsync(startedMessage); + msgStream.Write(startedMessage); await requestProcessor.ProcessRequestLoop(); } } diff --git a/test/Microsoft.Azure.Functions.PowerShellWorker.Test.csproj b/test/Microsoft.Azure.Functions.PowerShellWorker.Test.csproj index b3c7de12..2f796c86 100644 --- a/test/Microsoft.Azure.Functions.PowerShellWorker.Test.csproj +++ b/test/Microsoft.Azure.Functions.PowerShellWorker.Test.csproj @@ -29,5 +29,10 @@ PreserveNewest PreserveNewest + + xunit.runner.json + PreserveNewest + PreserveNewest + diff --git a/test/Unit/Logging/ConsoleLogger.cs b/test/Unit/Logging/ConsoleLogger.cs index 02cee12f..96c1eeca 100644 --- a/test/Unit/Logging/ConsoleLogger.cs +++ b/test/Unit/Logging/ConsoleLogger.cs @@ -21,5 +21,8 @@ public void Log(LogLevel logLevel, string message, Exception exception = null, b Console.WriteLine(log); FullLog.Add(log); } + + public void SetContext(string requestId, string invocationId) {} + public void ResetContext() {} } } diff --git a/test/Unit/Requests/ProcessWorkerInitRequestTests.cs b/test/Unit/Requests/ProcessWorkerInitRequestTests.cs deleted file mode 100644 index d64dae4c..00000000 --- a/test/Unit/Requests/ProcessWorkerInitRequestTests.cs +++ /dev/null @@ -1,42 +0,0 @@ -// -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. -// - -using Microsoft.Azure.WebJobs.Script.Grpc.Messages; -using Xunit; - -namespace Microsoft.Azure.Functions.PowerShellWorker.Test -{ - public class ProcessWorkerInitRequestTests - { - [Fact] - public void HandleWorkerInitRequestSuccess() - { - var requestId = "testRequest"; - var status = StatusResult.Types.Status.Success; - var expectedResponse = new StreamingMessage() - { - RequestId = requestId, - WorkerInitResponse = new WorkerInitResponse() - { - Result = new StatusResult() - { - Status = status - } - } - }; - - var requestProcessor = new RequestProcessor(null); - StreamingMessage result = requestProcessor.ProcessWorkerInitRequest( - new StreamingMessage() - { - RequestId = requestId - } - ); - - Assert.Equal(requestId, result.RequestId); - Assert.Equal(status, result.WorkerInitResponse.Result.Status); - } - } -} diff --git a/test/Unit/xunit.runner.json b/test/Unit/xunit.runner.json new file mode 100644 index 00000000..84c0c9b6 --- /dev/null +++ b/test/Unit/xunit.runner.json @@ -0,0 +1,3 @@ +{ + "parallelizeTestCollections": false +}